队列

队列是 Payload 的 Jobs Queue 功能的最后一个环节,负责处理如何_运行你的作业_。到目前为止,我们只讨论了如何将作业加入队列,但还没有实际运行任何作业。

队列是一组按添加顺序执行的作业集合。

当你准备运行作业时,Payload 会查询队列中的所有作业并执行它们。默认情况下,所有排队的作业都会被添加到 default 队列中。

但是,假设你希望某些作业每晚运行,而其他作业每五分钟运行一次。

通过在 payload.jobs.queue() 方法中指定 queue 名称,你可以将特定作业加入 queue: 'nightly' 队列,而其他作业则保留在默认队列中。

这样,你就可以配置两种不同的运行策略:

  1. 一个每晚运行的 cron 任务,查询 nightly 队列中的作业
  2. 另一个每约5分钟运行一次的任务,处理 default 队列中的作业

执行作业

如前所述,你可以将作业加入队列,但除非有工作线程获取并运行这些作业,否则它们不会自动执行。可以通过四种方式实现:

定时任务

你可以使用 jobs.autoRun 属性来配置定时任务:

export default buildConfig({
  // 其他配置...
  jobs: {
    tasks: [
      // 你的任务放在这里
    ],
    // autoRun 也可以是一个接收 `payload` 参数的函数
    autoRun: [
      {
        cron: '0 * * * *', // 每小时的第0分钟执行
        limit: 100, // 每次运行处理的任务数量限制
        queue: 'hourly', // 队列名称
      },
      // 可以添加任意数量的定时任务
    ],
    shouldAutoRun: async (payload) => {
      // 告诉 Payload 是否应该运行任务
      // 每次 Payload 准备执行任务时都会调用此函数
      // 如果此函数返回 false,定时任务将被停止
      return true
    },
  },
})

autoRun 功能适用于持续运行的专用服务器, 不应在 Vercel 等无服务器平台上使用。

端点

你可以通过向 /api/payload-jobs/run 端点发送 fetch 请求来执行任务:

// 这里我们指定本次调用只运行100个任务,
// 并且从 `nightly` 队列中获取任务:
await fetch('/api/payload-jobs/run?limit=100&queue=nightly', {
  method: 'GET',
  headers: {
    Authorization: `Bearer ${token}`,
  },
})

这个端点会自动为你挂载,在与 Vercel 等无服务器平台配合使用时特别有用,你可以使用 Vercel Cron 来调用一个执行任务的 serverless 函数。

Vercel Cron 示例

如果你在 Vercel 上部署,可以在项目根目录添加一个 vercel.json 文件,配置 Vercel Cron 按照计划任务时间调用 run 端点。

以下是该文件的示例配置:

{
  "crons": [
    {
      "path": "/api/payload-jobs/run",
      "schedule": "*/5 * * * *"
    }
  ]
}

上述配置会安排 /api/payload-jobs/run 端点每 5 分钟被调用一次。

最后一步是保护你的 run 端点,确保只有授权用户才能调用运行器。

为此,你可以在 Vercel 项目中设置一个名为 CRON_SECRET 的环境变量,它应该是一个随机字符串——理想情况下 16 个字符或更长。

然后,你可以修改运行作业的 access 函数,确保只有 Vercel 能调用你的运行器。

export default buildConfig({
  // 其他配置...
  jobs: {
    access: {
      run: ({ req }: { req: PayloadRequest }): boolean => {
        // 允许登录用户执行此端点(默认)
        if (req.user) return true

        // 如果没有登录用户,则检查
        // Vercel Cron 密钥是否作为
        // Authorization 请求头存在:
        const authHeader = req.headers.get('authorization')
        return authHeader === `Bearer ${process.env.CRON_SECRET}`
      },
    },
    // 其他作业配置...
  },
})

这种方式有效是因为当 Vercel Cron 触发时,Vercel 会自动将 CRON_SECRET 环境变量作为 Authorization 请求头提供给端点,确保作业可以安全运行。

项目部署到 Vercel 后,Vercel Cron 作业会自动按照指定计划触发 /api/payload-jobs/run 端点,在后台运行队列中的作业。

本地 API

如果你想通过服务器端代码以编程方式处理作业,可以使用 Local API:

运行所有任务:

const results = await payload.jobs.run()

// 可以通过参数自定义队列名称和限制数量:
await payload.jobs.run({ queue: 'nightly', limit: 100 })

// 可以提供 where 条件来筛选需要运行的任务:
await payload.jobs.run({
  where: { 'input.message': { equals: 'secret' } },
})

运行单个任务:

const results = await payload.jobs.runByID({
  id: myJobID,
})

命令行脚本

最后,你可以使用 Payload 开箱即带的命令行脚本来处理任务。

npx payload jobs:run --queue default --limit 10

此外,该脚本允许你向 jobs:run 命令传递 --cron 标志,以按 cron 计划定时运行任务:

npx payload jobs:run --cron "*/5 * * * *"

处理顺序

默认情况下,任务按照先进先出(FIFO)的顺序处理。这意味着最先加入队列的任务会最先被处理。不过,你也可以配置任务的执行顺序。

任务队列配置

你可以通过传递 processingOrder 属性来配置任务处理的顺序。这个属性模仿了 Payload 的 sort 功能,类似于 payload.find() 中使用的排序方式。

export default buildConfig({
  // 其他配置...
  jobs: {
    tasks: [
      // 你的任务定义
    ],
    processingOrder: '-createdAt', // 按创建时间倒序处理任务 = 后进先出(LIFO)
  },
})

你也可以针对每个队列单独设置处理顺序:

export default buildConfig({
  // 其他配置...
  jobs: {
    tasks: [
      // 你的任务定义
    ],
    processingOrder: {
      default: 'createdAt', // 先进先出(FIFO)
      queues: {
        nightly: '-createdAt', // 后进先出(LIFO)
        myQueue: '-createdAt', // 后进先出(LIFO)
      },
    },
  },
})

如果需要更精细地控制处理顺序,你可以传递一个返回处理顺序的函数 - 这个函数会在每次队列开始处理任务时被调用。

export default buildConfig({
  // 其他配置...
  jobs: {
    tasks: [
      // 你的任务定义
    ],
    processingOrder: ({ queue }) => {
      if (queue === 'myQueue') {
        return '-createdAt' // 后进先出(LIFO)
      }
      return 'createdAt' // 先进先出(FIFO)
    },
  },
})

本地 API

payload.jobs.queue 方法中,你也可以通过传递 processingOrder 属性来配置任务处理顺序。

const createdJob = await payload.jobs.queue({
  workflow: 'createPostAndUpdate',
  input: {
    title: 'my title',
  },
  processingOrder: '-createdAt', // 按创建时间倒序处理任务 = 后进先出(LIFO)
})