使用专业的前端来增强您的队列:
全面了解所有队列。
检查作业、搜索、重试或提升延迟的作业。
指标和统计数据。
以及更多功能。
在 Taskforce.sh 上注册
由于采用无轮询设计,CPU 使用率降至最低。
基于Redis的稳健设计。
延迟工作。
根据 cron 规范安排和重复作业。
工作的速率限制器。
重试。
优先事项。
并发性。
暂停/恢复——全局或本地。
每个队列有多种作业类型。
线程化(沙盒)处理函数。
从进程崩溃中自动恢复。
以及路线图上的内容...
作业完成确认(同时可以使用消息队列模式)。
亲子工作关系。
您可以使用一些第三方 UI 进行监控:
BullMQ
工作组
公牛v3
工作组
牛板
公牛复制
公牛监视器
莫尼托罗
公牛 <= v2
斗牛士
反应公牛
图雷罗
使用 Prometheus Bull Queue Exporter
由于存在一些作业队列解决方案,因此下表对它们进行了比较:
Dragonfly 是一种新的 Redis™ 直接替代品,与 BullMQ 完全兼容,并带来了一些优于 Redis™ 的重要优势,例如通过利用所有可用 CPU 内核以及更快、内存效率更高的数据结构来大幅提高性能。请在此处阅读有关如何将其与 BullMQ 一起使用的更多信息。 | |
如果您的 Bull 项目需要高质量的生产 Redis 实例,请考虑订阅 Memetria for Redis,它是 Redis 托管领域的领导者,可与 BullMQ 完美配合。注册时使用促销代码“BULLMQ”来帮助我们赞助 BullMQ 的开发! |
特征 | BullMQ-Pro | BullMQ | 公牛 | 奎 | 蜜蜂 | 议程 |
---|---|---|---|---|---|---|
后端 | 雷迪斯 | 雷迪斯 | 雷迪斯 | 雷迪斯 | 雷迪斯 | 蒙戈 |
可观测值 | ✓ | |||||
团体速率限制 | ✓ | |||||
团体支持 | ✓ | |||||
批量支持 | ✓ | |||||
父/子依赖关系 | ✓ | ✓ | ||||
优先事项 | ✓ | ✓ | ✓ | ✓ | ✓ | |
并发性 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
延迟的工作 | ✓ | ✓ | ✓ | ✓ | ✓ | |
全球事件 | ✓ | ✓ | ✓ | ✓ | ||
速率限制器 | ✓ | ✓ | ✓ | |||
暂停/恢复 | ✓ | ✓ | ✓ | ✓ | ||
沙盒工人 | ✓ | ✓ | ✓ | |||
可重复的工作 | ✓ | ✓ | ✓ | ✓ | ||
原子操作 | ✓ | ✓ | ✓ | ✓ | ||
坚持 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
用户界面 | ✓ | ✓ | ✓ | ✓ | ✓ | |
优化用于 | 职位/留言 | 职位/留言 | 职位/留言 | 工作机会 | 留言 | 工作机会 |
npm 安装 bull --save
或者
纱线加牛
要求: Bull 要求 Redis 版本大于或等于2.8.18
。
npm install @types/bull --save-dev
纱线添加--dev @types/bull
定义当前维护在DefinitelyTyped 存储库中。
我们欢迎所有类型的贡献,无论是代码修复、新功能还是文档改进。代码格式由 prettier 强制执行。对于提交,请遵循常规提交约定。所有代码必须通过 lint 规则和测试套件才能合并到开发中。
const Queue = require('bull');const videoQueue = new Queue('视频转码', 'redis://127.0.0.1:6379');const audioQueue = new Queue('音频转码', { redis: { port : 6379,主机:'127.0.0.1',密码:'foobared' } }); // 使用object指定Redis连接 const imageQueue = new Queue('图片转码');const pdfQueue = new Queue('pdf转码');videoQueue.process(function (job, did) { // job.data 包含创建作业时传递的自定义数据 // job.id 包含该作业的 id。 // 异步转码视频并报告进度 工作进度(42); // 完成后调用done 完毕(); // 如果出错则给出错误 done(new Error('转码错误')); // 或者传递一个结果 完成(null, { 帧率: 29.5 /* 等等... */ }); // 如果作业抛出未处理的异常,它也会被正确处理 抛出新的错误('一些意外的错误');});audioQueue.process(函数(作业,完成){ // 异步转码音频并报告进度 工作进度(42); // 完成后调用done 完毕(); // 如果出错则给出错误 done(new Error('转码错误')); // 或者传递一个结果 完成(null, { 采样率: 48000 /* 等等... */ }); // 如果作业抛出未处理的异常,它也会被正确处理 throw new Error('一些意外错误');});imageQueue.process(function (job, done) { // 异步转码图像并报告进度 工作进度(42); // 完成后调用done 完毕(); // 如果出错则给出错误 done(new Error('转码错误')); // 或者传递一个结果 完成(null, { 宽度: 1280, 高度: 720 /* 等等... */ }); // 如果作业抛出未处理的异常,它也会被正确处理 throw new Error('一些意外错误');});pdfQueue.process(function (job) { // 处理器也可以返回 Promise,而不是使用 done 回调 return pdfAsyncProcessor();});videoQueue.add({ video: 'http://example.com/video1.mov' });audioQueue.add({ audio: 'http://example.com/audio1.mp3 ' });imageQueue.add({ image: 'http://example.com/image1.tiff' });
或者,您可以返回承诺而不是使用done
回调:
videoQueue.process(function (job) { // 不要忘记删除done回调! // 简单地返回一个承诺 返回 fetchVideo(job.data.url).then(transcodeVideo); // 处理承诺拒绝 return Promise.reject(new Error('错误转码')); // 将承诺解决的值传递给“完成”事件 return Promise.resolve({ 帧率: 29.5 /* 等等... */ }); // 如果作业抛出未处理的异常,它也会被正确处理 throw new Error('一些意外错误'); // 与 return Promise.reject(new Error('一些意外错误'));});
进程函数也可以在单独的进程中运行。这有几个优点:
该进程是沙盒的,因此如果它崩溃,不会影响工作人员。
您可以运行阻塞代码而不影响队列(作业不会停止)。
更好地利用多核 CPU。
与 Redis 的连接较少。
为了使用此功能,只需使用处理器创建一个单独的文件:
//processor.jsmodule.exports = 函数(作业){ // 做一些繁重的工作 返回 Promise.resolve(结果);}
并像这样定义处理器:
// 单进程:queue.process('/path/to/my/processor.js');// 也可以使用并发:queue.process(5, '/path/to/my/processor.js' );// 和命名处理器:queue.process('myprocessor', 5, '/path/to/my/processor.js');
可以将作业添加到队列中并根据 cron 规范重复处理:
paymentQueue.process(function (job) {// 检查付款 }); // 每天凌晨 3:15 重复一次付款作业 paymentQueue.add( paymentsData, { 重复: { cron: '15 3 * * *' } });
作为提示,请在此处检查您的表达式以验证它们是否正确:cron 表达式生成器
队列可以全局暂停和恢复(传递true
来暂停该工作进程的处理):
队列.pause().then(函数() { // 队列现在暂停});queue.resume().then(function () { // 队列现在恢复})
队列发出一些有用的事件,例如......
.on('完成', 函数 (作业, 结果) { // 作业完成并输出结果!})
有关事件的更多信息,包括触发的事件的完整列表,请查看事件参考
队列很便宜,因此如果您需要很多队列,只需创建具有不同名称的新队列即可:
const userJohn = new Queue('john');const userLisa = new Queue('lisa');...
但是,每个队列实例都需要新的 Redis 连接,请检查如何重用连接,或者您也可以使用命名处理器来实现类似的结果。
注意:从 3.2.0 及更高版本开始,建议使用线程处理器。
队列非常强大,可以在多个线程或进程中并行运行,而不会出现任何危险或队列损坏的风险。检查这个使用集群跨进程并行作业的简单示例:
const Queue = require('bull');const cluster = require('cluster');const numWorkers = 8;const queue = new Queue('测试并发队列');if (cluster.isMaster) { for (让 i = 0; i < numWorkers; i++) {cluster.fork(); } cluster.on('online', function (worker) {// 让我们为队列工人创建一些作业for (let i = 0; i < 500; i++) { queue.add({ foo: 'bar' }); }; }); cluster.on('退出', function (worker, code, signal) {console.log('worker' + worker.process.pid + '死了'); });} 别的 { queue.process(function (job, jobDone) {console.log('worker 完成的作业', cluster.worker.id, job.id);jobDone(); });}
有关完整文档,请查看参考和常见模式:
指南 — 您与 Bull 一起开发的起点。
参考 - 包含所有可用对象和方法的参考文档。
模式——一组常见模式的示例。
许可证——公牛许可证——它是麻省理工学院。
如果您发现任何可以使用更多文档的内容,请提交拉取请求!
该队列的目标是“至少一次”工作策略。这意味着在某些情况下,一项作业可能会被处理多次。当工作人员在处理的整个持续时间内未能保持给定作业的锁定时,通常会发生这种情况。
当工作人员正在处理作业时,它将保持该作业“锁定”,以便其他工作人员无法处理它。
重要的是要了解锁定如何工作,以防止您的作业失去锁定(变得停滞)并因此重新启动。锁定是通过在lockRenewTime
间隔(通常是lockDuration
一半)上为lockDuration
创建锁来内部实现的。如果在锁更新之前超过了lockDuration
,则作业将被视为停止并自动重新启动;它将被双重处理。这可能发生在以下情况:
运行作业处理器的 Node 进程意外终止。
您的作业处理器过于占用 CPU 资源,导致 Node 事件循环停滞,因此 Bull 无法更新作业锁(请参阅 #488 了解如何更好地检测到这一点)。您可以通过将作业处理器分成更小的部分来解决此问题,这样任何单个部分都不会阻塞 Node 事件循环。或者,您可以为lockDuration
设置传递一个更大的值(代价是需要更长的时间来识别真正停滞的作业)。
因此,您应该始终侦听stalled
事件并将其记录到错误监视系统,因为这意味着您的作业可能会被双重处理。
作为一种保护措施,有问题的作业不会无限期地重新启动(例如,如果作业处理器总是使其节点进程崩溃),作业将从停滞状态恢复最多maxStalledCount
次(默认值: 1
)。