工作流

"Workflow"(工作流)是一种可选方式,用于将多个任务组合在一起,并能够从失败点优雅地重试。

当你有一系列连续任务时,工作流特别有用,它可以配置每个任务在失败时能够重试。

如果工作流中的某个任务失败,工作流会自动"从失败处继续",不会重新执行任何已经执行过的先前任务

定义工作流

工作流最重要的部分是 handler,在这里你可以通过简单地调用 runTask 函数来声明任务应该在何时以及如何运行。如果工作流中的任何任务失败,整个 handler 函数将会重新运行。

但重要的是,已经成功完成的任务只会返回缓存和保存的输出,而不会再次运行。工作流会从失败点恢复,只有从失败点开始的任务会被重新执行。

要定义一个基于 JS 的工作流,只需在 Payload 配置的 jobs.workflows 数组中添加一个工作流。工作流包含以下字段:

选项描述
slug为工作流定义一个基于 slug 的名称。该 slug 在任务和工作流中必须是唯一的。
handler负责运行工作流的函数。你可以传递一个基于字符串的工作流函数文件路径,或者直接传递工作流任务函数本身。如果你在工作流中使用大型依赖项,可能更倾向于传递字符串路径,这样可以避免在 Next.js 应用中捆绑大型依赖项。传递字符串路径是一个高级功能,可能需要复杂的构建管道才能工作。
inputSchema定义输入字段的 schema - Payload 会为这个 schema 生成类型。
interfaceName你可以使用 interfaceName 来更改为此工作流生成的接口名称。默认情况下,这是 "Workflow" + 首字母大写的 workflow slug。
label为工作流定义一个用户友好的标签。
queue可选地,定义此工作流应绑定的队列名称。默认为 "default"。
retries你可以在工作流级别定义 retries,这将强制工作流最多只能失败指定次数的重试。如果任务没有指定重试次数,它将继承工作流上指定的重试计数。你可以将 workflow 重试指定为 0,这将忽略所有 task 的重试规范,并在任何任务失败时使整个工作流失败。你可以将 workflow 重试保留为 undefined,在这种情况下,工作流将尊重每个任务自己定义的重试计数。默认情况下这是 undefined,意味着工作流重试由其任务定义

示例:

export default buildConfig({
  // ...
  jobs: {
    tasks: [
      // ...
    ]
    workflows: [
      {
        slug: 'createPostAndUpdate',

        // 工作流将接受的参数
        inputSchema: [
          {
            name: 'title',
            type: 'text',
            required: true,
          },
        ],

        // 定义工作流"控制流"的 handler
        // 注意它如何使用 `tasks` 参数来执行你预定义的任务。
        // 这些都是强类型的!
        handler: async ({ job, tasks }) => {

          // 这个工作流首先运行一个名为 `createPost` 的任务。

          // 你需要为此任务调用定义一个唯一的 ID
          // 如果此工作流失败并在将来重新执行,该 ID 将始终保持相同。这里我们硬编码为 '1'
          const output = await tasks.createPost('1', {
            input: {
              title: job.input.title,
            },
          })

          // 一旦前一个任务完成,它将运行一个名为 `updatePost` 的任务
          await tasks.updatePost('2', {
            input: {
              post: job.taskStatus.createPost['1'].output.postID, // 或 output.postID
              title: job.input.title + '2',
            },
          })
        },
      } as WorkflowConfig<'updatePost'>
    ]
  }
})

内联运行任务

在上面的示例中,我们的工作流执行的是已在 Payload 配置中预定义的任务。但你也可以运行未预先定义的任务。

为此,你可以使用 inlineTask 函数。

这种方法的缺点是任务不能轻松地在不同工作流中复用,而且存储在 job 中的任务数据将没有类型定义。在下面的示例中,内联任务数据会存储在 job.taskStatus.inline['2'] 下,但完全没有类型定义,因为这类动态任务的类型无法预先生成。

示例:

export default buildConfig({
  // ...
  jobs: {
    tasks: [
      // ...
    ]
    workflows: [
      {
        slug: 'createPostAndUpdate',
        inputSchema: [
          {
            name: 'title',
            type: 'text',
            required: true,
          },
        ],
        handler: async ({ job, tasks, inlineTask }) => {
          // 这里我们运行一个预定义任务
          // `createPost` 处理函数的参数和返回类型
          // 都是强类型的
          const output = await tasks.createPost('1', {
            input: {
              title: job.input.title,
            },
          })

          // 这里的任务没有在 Payload 配置中定义
          // 而是"内联"的。它的输出会存储在数据库的 Job 中
          // 但它的参数将没有类型定义
          const { newPost } = await inlineTask('2', {
            task: async ({ req }) => {
              const newPost = await req.payload.update({
                collection: 'post',
                id: '2',
                req,
                retries: 3,
                data: {
                  title: 'updated!',
                },
              })
              return {
                output: {
                  newPost
                },
              }
            },
          })
        },
      } as WorkflowConfig<'updatePost'>
    ]
  }
})