任务
"Task" 是一个执行业务逻辑的函数定义,其输入和输出都具有强类型约束。
你可以在 Payload 配置中注册 Tasks,然后创建使用它们的 Jobs 或 Workflows。可以将 Tasks 视为整洁、独立的"只做一件特定事情的函数"。
Payload Tasks 可以配置为在失败时自动重试,这使得它们对于"持久性"工作流(如 AI 应用程序)非常有价值,在这些应用中,LLM 可能返回非确定性结果,可能需要重试。
Tasks 可以在 Payload 配置的 jobs.tasks
数组中定义,也可以在工作流中内联定义。
在配置中定义任务
只需在 Payload 配置的 jobs.tasks
数组中添加任务即可。一个任务包含以下字段:
选项 | 描述 |
---|---|
slug | 为任务定义一个基于 slug 的名称。该 slug 在所有任务和工作流中必须是唯一的。 |
handler | 负责运行任务的函数。你可以传递任务函数文件的字符串路径,或者直接传递任务函数本身。如果任务中使用了大体积依赖项,建议传递字符串路径,这样可以避免将这些大依赖项打包到 Next.js 应用中。传递字符串路径是一个高级功能,可能需要复杂的构建管道才能正常工作。 |
inputSchema | 定义输入字段的 schema - Payload 会为此 schema 生成类型。 |
interfaceName | 可以使用 interfaceName 来更改为此任务生成的接口名称。默认为 "Task" + 首字母大写的任务 slug。 |
outputSchema | 定义输出字段的 schema - Payload 会为此 schema 生成类型。 |
label | 为此任务定义一个人性化的标签。 |
onFail | 任务失败时执行的函数。 |
onSuccess | 任务成功时执行的函数。 |
retries | 指定任务失败时应重试的次数。如果未定义,任务将继承工作流的重试次数或不重试。如果设为 0,任务将不会重试。默认为 undefined。 |
任务的逻辑在 handler
中定义 - 可以是一个函数,也可以是函数的路径。当 worker 接收到包含此任务的 Job 时,handler
就会运行。
它应该返回一个包含 output
键的对象,其中应包含你定义的任务输出。
示例:
export default buildConfig({
// ...
jobs: {
tasks: [
{
// 配置此任务自动重试最多两次
retries: 2,
// 这是任务的唯一标识符
slug: 'createPost',
// 这是任务接受的参数
inputSchema: [
{
name: 'title',
type: 'text',
required: true,
},
],
// 这是函数应该输出的属性
outputSchema: [
{
name: 'postID',
type: 'text',
required: true,
},
],
// 这是任务被调用时运行的函数
handler: async ({ input, job, req }) => {
const newPost = await req.payload.create({
collection: 'post',
req,
data: {
title: input.title,
},
})
return {
output: {
postID: newPost.id,
},
}
},
} as TaskConfig<'createPost'>,
],
},
})
除了直接在 Payload 配置中提供处理函数外,你还可以传递处理函数定义的_绝对路径_。如果你的任务有大体积依赖项,并且计划在可以访问文件系统的单独进程中执行任务,这可能是一种确保 Payload + Next.js 应用保持快速编译且依赖项最少的有用方法。
请注意这是一个高级功能,可能需要复杂的构建管道,特别是在生产环境或 Next.js 中使用时,例如通过调用 /api/payload-jobs/run
端点。你需要单独转译处理程序文件,并确保在任务运行时它们位于相同位置。如果使用端点来执行任务,建议直接在 Payload 配置中定义处理函数,或者在 Next.js 之外使用导入路径的处理程序。
一般来说,这是一个高级用例。示例如下:
payload.config.ts:
import { fileURLToPath } from 'node:url'
import path from 'path'
const filename = fileURLToPath(import.meta.url)
const dirname = path.dirname(filename)
export default buildConfig({
jobs: {
tasks: [
{
// ...
// #createPostHandler 是 `createPost.ts` 文件中的一个命名导出
handler:
path.resolve(dirname, 'src/tasks/createPost.ts') +
'#createPostHandler',
},
],
},
})
然后,createPost
文件本身:
src/tasks/createPost.ts:
import type { TaskHandler } from 'payload'
export const createPostHandler: TaskHandler<'createPost'> = async ({
input,
job,
req,
}) => {
const newPost = await req.payload.create({
collection: 'post',
req,
data: {
title: input.title,
},
})
return {
output: {
postID: newPost.id,
},
}
}
配置任务恢复
默认情况下,如果任务之前已成功执行且工作流重新运行,该任务将不会再次执行。相反,系统会返回之前任务运行的输出结果。这是为了避免对已成功任务进行不必要的重新运行。
你可以通过 retries.shouldRestore
属性来配置此行为。该属性接受布尔值或函数。
如果 shouldRestore
设置为 true,则仅当任务之前失败时才会重新运行。这是默认行为。
如果 shouldRestore
设置为 false,即使任务之前已成功,也会重新运行,并忽略最大重试次数限制。
如果 shouldRestore
是一个函数,函数的返回值将决定是否应重新运行任务。这可用于实现更复杂的恢复逻辑,例如你可能希望任务最多重试 X 次,然后在后续运行中恢复它,或者仅当输入发生变化时才重新运行任务。
示例:
export default buildConfig({
// ...
jobs: {
tasks: [
{
slug: 'myTask',
retries: {
shouldRestore: false,
},
// ...
} as TaskConfig<'myTask'>,
],
},
})
示例 - 根据输入数据决定是否应恢复任务:
export default buildConfig({
// ...
jobs: {
tasks: [
{
slug: 'myTask',
inputSchema: [
{
name: 'someDate',
type: 'date',
required: true,
},
],
retries: {
shouldRestore: ({ input }) => {
if (new Date(input.someDate) > new Date()) {
return false
}
return true
},
},
// ...
} as TaskConfig<'myTask'>,
],
},
})
嵌套任务
你可以在现有任务中运行子任务,通过使用传递给任务 handler
函数的 tasks
或 inlineTask
参数:
export default buildConfig({
// ...
jobs: {
// 当使用嵌套任务时,建议将 `addParentToTaskLog` 设置为 `true`,这样父任务会被包含在任务日志中
// 这可以更好地观察和调试任务执行过程
addParentToTaskLog: true,
tasks: [
{
slug: 'parentTask',
inputSchema: [
{
name: 'text',
type: 'text',
},
],
handler: async ({ input, req, tasks, inlineTask }) => {
await inlineTask('Sub Task 1', {
task: () => {
// 执行某些操作
return {
output: {},
}
},
})
await tasks.CreateSimple('Sub Task 2', {
input: { message: 'hello' },
})
return {
output: {},
}
},
} as TaskConfig<'parentTask'>,
],
},
})