工作流
"Workflow"(工作流)是一种可选方式,用于将多个任务组合在一起,并能够从失败点优雅地重试。
当你有一系列连续任务时,工作流特别有用,它可以配置每个任务在失败时能够重试。
如果工作流中的某个任务失败,工作流会自动"从失败处继续",不会重新执行任何已经执行过的先前任务。
定义工作流
工作流最重要的部分是 handler
,在这里你可以通过简单地调用 runTask
函数来声明任务应该在何时以及如何运行。如果工作流中的任何任务失败,整个 handler
函数将会重新运行。
但重要的是,已经成功完成的任务只会返回缓存和保存的输出,而不会再次运行。工作流会从失败点恢复,只有从失败点开始的任务会被重新执行。
要定义一个基于 JS 的工作流,只需在 Payload 配置的 jobs.workflows
数组中添加一个工作流。工作流包含以下字段:
选项 | 描述 |
---|---|
slug | 为工作流定义一个基于 slug 的名称。该 slug 在任务和工作流中必须是唯一的。 |
handler | 负责运行工作流的函数。你可以传递一个基于字符串的工作流函数文件路径,或者直接传递工作流任务函数本身。如果你在工作流中使用大型依赖项,可能更倾向于传递字符串路径,这样可以避免在 Next.js 应用中捆绑大型依赖项。传递字符串路径是一个高级功能,可能需要复杂的构建管道才能工作。 |
inputSchema | 定义输入字段的 schema - Payload 会为这个 schema 生成类型。 |
interfaceName | 你可以使用 interfaceName 来更改为此工作流生成的接口名称。默认情况下,这是 "Workflow" + 首字母大写的 workflow slug。 |
label | 为工作流定义一个用户友好的标签。 |
queue | 可选地,定义此工作流应绑定的队列名称。默认为 "default"。 |
retries | 你可以在工作流级别定义 retries ,这将强制工作流最多只能失败指定次数的重试。如果任务没有指定重试次数,它将继承工作流上指定的重试计数。你可以将 workflow 重试指定为 0 ,这将忽略所有 task 的重试规范,并在任何任务失败时使整个工作流失败。你可以将 workflow 重试保留为 undefined,在这种情况下,工作流将尊重每个任务自己定义的重试计数。默认情况下这是 undefined,意味着工作流重试由其任务定义 |
示例:
export default buildConfig({
// ...
jobs: {
tasks: [
// ...
]
workflows: [
{
slug: 'createPostAndUpdate',
// 工作流将接受的参数
inputSchema: [
{
name: 'title',
type: 'text',
required: true,
},
],
// 定义工作流"控制流"的 handler
// 注意它如何使用 `tasks` 参数来执行你预定义的任务。
// 这些都是强类型的!
handler: async ({ job, tasks }) => {
// 这个工作流首先运行一个名为 `createPost` 的任务。
// 你需要为此任务调用定义一个唯一的 ID
// 如果此工作流失败并在将来重新执行,该 ID 将始终保持相同。这里我们硬编码为 '1'
const output = await tasks.createPost('1', {
input: {
title: job.input.title,
},
})
// 一旦前一个任务完成,它将运行一个名为 `updatePost` 的任务
await tasks.updatePost('2', {
input: {
post: job.taskStatus.createPost['1'].output.postID, // 或 output.postID
title: job.input.title + '2',
},
})
},
} as WorkflowConfig<'updatePost'>
]
}
})
内联运行任务
在上面的示例中,我们的工作流执行的是已在 Payload 配置中预定义的任务。但你也可以运行未预先定义的任务。
为此,你可以使用 inlineTask
函数。
这种方法的缺点是任务不能轻松地在不同工作流中复用,而且存储在 job 中的任务数据将没有类型定义。在下面的示例中,内联任务数据会存储在 job.taskStatus.inline['2']
下,但完全没有类型定义,因为这类动态任务的类型无法预先生成。
示例:
export default buildConfig({
// ...
jobs: {
tasks: [
// ...
]
workflows: [
{
slug: 'createPostAndUpdate',
inputSchema: [
{
name: 'title',
type: 'text',
required: true,
},
],
handler: async ({ job, tasks, inlineTask }) => {
// 这里我们运行一个预定义任务
// `createPost` 处理函数的参数和返回类型
// 都是强类型的
const output = await tasks.createPost('1', {
input: {
title: job.input.title,
},
})
// 这里的任务没有在 Payload 配置中定义
// 而是"内联"的。它的输出会存储在数据库的 Job 中
// 但它的参数将没有类型定义
const { newPost } = await inlineTask('2', {
task: async ({ req }) => {
const newPost = await req.payload.update({
collection: 'post',
id: '2',
req,
retries: 3,
data: {
title: 'updated!',
},
})
return {
output: {
newPost
},
}
},
})
},
} as WorkflowConfig<'updatePost'>
]
}
})