任务

"Task" 是一个执行业务逻辑的函数定义,其输入和输出都具有强类型约束。

你可以在 Payload 配置中注册 Tasks,然后创建使用它们的 JobsWorkflows。可以将 Tasks 视为整洁、独立的"只做一件特定事情的函数"。

Payload Tasks 可以配置为在失败时自动重试,这使得它们对于"持久性"工作流(如 AI 应用程序)非常有价值,在这些应用中,LLM 可能返回非确定性结果,可能需要重试。

Tasks 可以在 Payload 配置的 jobs.tasks 数组中定义,也可以在工作流中内联定义。

在配置中定义任务

只需在 Payload 配置的 jobs.tasks 数组中添加任务即可。一个任务包含以下字段:

选项描述
slug为任务定义一个基于 slug 的名称。该 slug 在所有任务和工作流中必须是唯一的。
handler负责运行任务的函数。你可以传递任务函数文件的字符串路径,或者直接传递任务函数本身。如果任务中使用了大体积依赖项,建议传递字符串路径,这样可以避免将这些大依赖项打包到 Next.js 应用中。传递字符串路径是一个高级功能,可能需要复杂的构建管道才能正常工作。
inputSchema定义输入字段的 schema - Payload 会为此 schema 生成类型。
interfaceName可以使用 interfaceName 来更改为此任务生成的接口名称。默认为 "Task" + 首字母大写的任务 slug。
outputSchema定义输出字段的 schema - Payload 会为此 schema 生成类型。
label为此任务定义一个人性化的标签。
onFail任务失败时执行的函数。
onSuccess任务成功时执行的函数。
retries指定任务失败时应重试的次数。如果未定义,任务将继承工作流的重试次数或不重试。如果设为 0,任务将不会重试。默认为 undefined。

任务的逻辑在 handler 中定义 - 可以是一个函数,也可以是函数的路径。当 worker 接收到包含此任务的 Job 时,handler 就会运行。

它应该返回一个包含 output 键的对象,其中应包含你定义的任务输出。

示例:

export default buildConfig({
  // ...
  jobs: {
    tasks: [
      {
        // 配置此任务自动重试最多两次
        retries: 2,

        // 这是任务的唯一标识符
        slug: 'createPost',

        // 这是任务接受的参数
        inputSchema: [
          {
            name: 'title',
            type: 'text',
            required: true,
          },
        ],

        // 这是函数应该输出的属性
        outputSchema: [
          {
            name: 'postID',
            type: 'text',
            required: true,
          },
        ],

        // 这是任务被调用时运行的函数
        handler: async ({ input, job, req }) => {
          const newPost = await req.payload.create({
            collection: 'post',
            req,
            data: {
              title: input.title,
            },
          })
          return {
            output: {
              postID: newPost.id,
            },
          }
        },
      } as TaskConfig<'createPost'>,
    ],
  },
})

除了直接在 Payload 配置中提供处理函数外,你还可以传递处理函数定义的_绝对路径_。如果你的任务有大体积依赖项,并且计划在可以访问文件系统的单独进程中执行任务,这可能是一种确保 Payload + Next.js 应用保持快速编译且依赖项最少的有用方法。

请注意这是一个高级功能,可能需要复杂的构建管道,特别是在生产环境或 Next.js 中使用时,例如通过调用 /api/payload-jobs/run 端点。你需要单独转译处理程序文件,并确保在任务运行时它们位于相同位置。如果使用端点来执行任务,建议直接在 Payload 配置中定义处理函数,或者在 Next.js 之外使用导入路径的处理程序。

一般来说,这是一个高级用例。示例如下:

payload.config.ts:

import { fileURLToPath } from 'node:url'
import path from 'path'

const filename = fileURLToPath(import.meta.url)
const dirname = path.dirname(filename)

export default buildConfig({
  jobs: {
    tasks: [
      {
        // ...
        // #createPostHandler 是 `createPost.ts` 文件中的一个命名导出
        handler:
          path.resolve(dirname, 'src/tasks/createPost.ts') +
          '#createPostHandler',
      },
    ],
  },
})

然后,createPost 文件本身:

src/tasks/createPost.ts:

import type { TaskHandler } from 'payload'

export const createPostHandler: TaskHandler<'createPost'> = async ({
  input,
  job,
  req,
}) => {
  const newPost = await req.payload.create({
    collection: 'post',
    req,
    data: {
      title: input.title,
    },
  })
  return {
    output: {
      postID: newPost.id,
    },
  }
}

配置任务恢复

默认情况下,如果任务之前已成功执行且工作流重新运行,该任务将不会再次执行。相反,系统会返回之前任务运行的输出结果。这是为了避免对已成功任务进行不必要的重新运行。

你可以通过 retries.shouldRestore 属性来配置此行为。该属性接受布尔值或函数。

如果 shouldRestore 设置为 true,则仅当任务之前失败时才会重新运行。这是默认行为。

如果 shouldRestore 设置为 false,即使任务之前已成功,也会重新运行,并忽略最大重试次数限制。

如果 shouldRestore 是一个函数,函数的返回值将决定是否应重新运行任务。这可用于实现更复杂的恢复逻辑,例如你可能希望任务最多重试 X 次,然后在后续运行中恢复它,或者仅当输入发生变化时才重新运行任务。

示例:

export default buildConfig({
  // ...
  jobs: {
    tasks: [
      {
        slug: 'myTask',
        retries: {
          shouldRestore: false,
        },
        // ...
      } as TaskConfig<'myTask'>,
    ],
  },
})

示例 - 根据输入数据决定是否应恢复任务:

export default buildConfig({
  // ...
  jobs: {
    tasks: [
      {
        slug: 'myTask',
        inputSchema: [
          {
            name: 'someDate',
            type: 'date',
            required: true,
          },
        ],
        retries: {
          shouldRestore: ({ input }) => {
            if (new Date(input.someDate) > new Date()) {
              return false
            }
            return true
          },
        },
        // ...
      } as TaskConfig<'myTask'>,
    ],
  },
})

嵌套任务

你可以在现有任务中运行子任务,通过使用传递给任务 handler 函数的 tasksinlineTask 参数:

export default buildConfig({
  // ...
  jobs: {
    // 当使用嵌套任务时,建议将 `addParentToTaskLog` 设置为 `true`,这样父任务会被包含在任务日志中
    // 这可以更好地观察和调试任务执行过程
    addParentToTaskLog: true,
    tasks: [
      {
        slug: 'parentTask',
        inputSchema: [
          {
            name: 'text',
            type: 'text',
          },
        ],
        handler: async ({ input, req, tasks, inlineTask }) => {
          await inlineTask('Sub Task 1', {
            task: () => {
              // 执行某些操作
              return {
                output: {},
              }
            },
          })

          await tasks.CreateSimple('Sub Task 2', {
            input: { message: 'hello' },
          })

          return {
            output: {},
          }
        },
      } as TaskConfig<'parentTask'>,
    ],
  },
})