使用專業的前端來增強您的隊列:
全面了解所有隊列。
檢查作業、搜尋、重試或提升延遲的作業。
指標和統計數據。
以及更多功能。
在 Taskforce.sh 上註冊
由於採用無輪詢設計,CPU 使用率降至最低。
基於Redis的穩健設計。
延遲工作。
根據 cron 規範安排和重複作業。
工作的速率限制器。
重試。
優先事項。
並發性。
暫停/恢復-全域或本地。
每個佇列有多種作業類型。
線程化(沙盒)處理函數。
從進程崩潰中自動復原。
以及路線圖上的內容...
作業完成確認(同時可以使用訊息佇列模式)。
親子工作關係。
您可以使用一些第三方 UI 進行監控:
BullMQ
工作小組
公牛v3
工作小組
牛板
公牛複製
公牛監視器
莫尼托羅
公牛 <= v2
鬥牛士
反應公牛
圖雷羅
使用 Prometheus Bull Queue Exporter
由於存在一些作業佇列解決方案,因此下表對它們進行了比較:
Dragonfly 是一種新的Redis™ 直接替代品,與BullMQ 完全相容,並帶來了一些優於Redis™ 的重要優勢,例如透過利用所有可用CPU 核心以及更快、記憶體效率更高的資料結構來大幅提高性能。請在此處閱讀有關如何將其與 BullMQ 一起使用的更多資訊。 | |
如果您的 Bull 專案需要高品質的生產 Redis 實例,請考慮訂閱 Memetria for Redis,它是 Redis 託管領域的領導者,可與 BullMQ 完美配合。註冊時使用促銷代碼「BULLMQ」來幫助我們贊助 BullMQ 的開發! |
特徵 | BullMQ-Pro | BullMQ | 公牛 | 奎 | 蜜蜂 | 議程 |
---|---|---|---|---|---|---|
後端 | 雷迪斯 | 雷迪斯 | 雷迪斯 | 雷迪斯 | 雷迪斯 | 蒙戈 |
可觀測值 | ✓ | |||||
團體速率限制 | ✓ | |||||
團體支持 | ✓ | |||||
批量支持 | ✓ | |||||
父/子依賴關係 | ✓ | ✓ | ||||
優先事項 | ✓ | ✓ | ✓ | ✓ | ✓ | |
並發性 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
延遲的工作 | ✓ | ✓ | ✓ | ✓ | ✓ | |
全球事件 | ✓ | ✓ | ✓ | ✓ | ||
速率限制器 | ✓ | ✓ | ✓ | |||
暫停/恢復 | ✓ | ✓ | ✓ | ✓ | ||
沙盒工人 | ✓ | ✓ | ✓ | |||
可重複的工作 | ✓ | ✓ | ✓ | ✓ | ||
原子操作 | ✓ | ✓ | ✓ | ✓ | ||
堅持 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
使用者介面 | ✓ | ✓ | ✓ | ✓ | ✓ | |
優化用於 | 職位/留言 | 職位/留言 | 職位/留言 | 工作機會 | 留言 | 工作機會 |
npm 安裝 bull --save
或者
紗線加牛
要求: Bull 要求 Redis 版本大於或等於2.8.18
。
npm install @types/bull --save-dev
紗線添加--dev @types/bull
定義目前維護在DefinitelyTyped 儲存庫中。
我們歡迎所有類型的貢獻,無論是程式碼修復、新功能還是文件改進。代碼格式由 prettier 強制執行。對於提交,請遵循常規提交約定。所有程式碼必須通過 lint 規則和測試套件才能合併到開發中。
const Queue = require('bull');const videoQueue = new Queue('視訊轉碼', 'redis://127.0.0.1:6379');const audioQueue = new Queue('音訊轉碼', { redis: { port : 6379,主機:'127.0.0.1',密碼:'foobared' } }); // 使用object指定Redis連接 const imageQueue = new Queue('圖片轉碼');const pdfQueue = new Queue('pdf轉碼');videoQueue.process(function (job, did) { // job.data 包含建立作業時傳遞的自訂數據 // job.id 包含該作業的 id。 // 非同步轉碼影片並報告進度 工作進度(42); // 完成後呼叫done 完畢(); // 如果出錯則給出錯誤 done(new Error('轉碼錯誤')); // 或傳遞一個結果 完成(null, { 幀率: 29.5 /* 等等... */ }); // 如果作業拋出未處理的例外,它也會被正確處理 拋出新的錯誤('一些意外的錯誤');});audioQueue.process(函數(作業,完成){ // 非同步轉碼音訊並報告進度 工作進度(42); // 完成後呼叫done 完畢(); // 如果出錯則給出錯誤 done(new Error('轉碼錯誤')); // 或傳遞一個結果 完成(null, { 取樣率: 48000 /* 等等... */ }); // 如果作業拋出未處理的例外,它也會被正確處理 throw new Error('一些意外錯誤');});imageQueue.process(function (job, done) { // 非同步轉碼圖像並報告進度 工作進度(42); // 完成後呼叫done 完畢(); // 如果出錯則給出錯誤 done(new Error('轉碼錯誤')); // 或傳遞一個結果 完成(null, { 寬度: 1280, 高度: 720 /* 等等... */ }); // 如果作業拋出未處理的例外,它也會被正確處理 throw new Error('一些意外錯誤');});pdfQueue.process(function (job) { // 處理器也可以返回 Promise,而不是使用 done 回調 return pdfAsyncProcessor();});videoQueue.add({ video: 'http://example.com/video1.mov' });audioQueue.add({ audio: 'http://example.com/audio1.mp3 ' });imageQueue.add({ image: 'http://example.com/image1.tiff' });
或者,您可以返回承諾而不是使用done
回調:
videoQueue.process(function (job) { // 不要忘記刪除done回呼! // 簡單地回傳一個承諾 返回 fetchVideo(job.data.url).then(transcodeVideo); // 處理承諾拒絕 return Promise.reject(new Error('錯誤轉碼')); // 將承諾解決的值傳遞給「完成」事件 return Promise.resolve({ 幀率: 29.5 /* 等等... */ }); // 如果作業拋出未處理的例外,它也會被正確處理 throw new Error('一些意外錯誤'); // 與 return Promise.reject(new Error('一些意外錯誤'));});
進程函數也可以在單獨的進程中運行。這有幾個優點:
該進程是沙盒的,因此如果它崩潰,不會影響工作人員。
您可以執行阻塞程式碼而不影響佇列(作業不會停止)。
更好地利用多核心 CPU。
與 Redis 的連結較少。
為了使用此功能,只需使用處理器建立一個單獨的檔案:
//processor.jsmodule.exports = 函數(作業){ // 做一些繁重的工作 返回 Promise.resolve(結果);}
並這樣定義處理器:
// 單一進程:queue.process('/path/to/my/processor.js');// 也可以使用並發:queue.process(5, '/path/to/my/processor.js' ); // 和命名處理器:queue.process('myprocessor', 5, '/path/to/my/processor.js');
可以將作業新增至佇列並根據 cron 規範重複處理:
paymentQueue.process(function (job) {// 檢查付款 }); // 每天凌晨 3:15 重複付款作業 paymentQueue.add( paymentsData, { 重複: { cron: '15 3 * * *' } });
作為提示,請在此處檢查您的表達式以驗證它們是否正確:cron 表達式產生器
佇列可以全域暫停和恢復(傳遞true
來暫停該工作進程的處理):
隊列.pause().then(函數() { // 佇列現在暫停});queue.resume().then(function () { // 隊列現在恢復})
隊列發出一些有用的事件,例如...
.on('完成', 函數 (作業, 結果) { // 作業完成並輸出結果!
有關事件的更多信息,包括觸發的事件的完整列表,請查看事件參考
佇列很便宜,因此如果您需要很多佇列,只需建立具有不同名稱的新佇列即可:
const userJohn = new Queue('john');const userLisa = new Queue('lisa');...
但是,每個佇列實例都需要新的 Redis 連接,請檢查如何重複使用連接,或者您也可以使用命名處理器來實現類似的結果。
注意:從 3.2.0 及更高版本開始,建議使用執行緒處理器。
隊列非常強大,可以在多個執行緒或進程中並行運行,而不會出現任何危險或隊列損壞的風險。檢查這個使用叢集跨進程並行作業的簡單範例:
const Queue = require('bull');const cluster = require('cluster');const numWorkers = 8;const queue = new Queue('測試並發隊列');if (cluster.isMaster) { for (令 i = 0; i < numWorkers; i++) {cluster.fork(); } cluster.on('online', function (worker) {// 讓我們為佇列工人建立一些作業for (let i = 0; i < 500; i++) { queue.add({ foo: 'bar' }); }; }); cluster.on('退出', function (worker, code, signal) {console.log('worker' + worker.process.pid + '死了'); });} 別的 { queue.process(function (job, jobDone) {console.log('worker 完成的作業', cluster.worker.id, job.id);jobDone(); });}
有關完整文檔,請查看參考和常見模式:
指南 — 您與 Bull 一起發展的起點。
參考 - 包含所有可用物件和方法的參考文件。
模式——一組常見模式的範例。
許可證——公牛許可證——它是麻省理工學院。
如果您發現任何可以使用更多文件的內容,請提交拉取請求!
該隊列的目標是「至少一次」工作策略。這意味著在某些情況下,一項作業可能會被處理多次。當工作人員在處理的整個持續時間內未能保持給定作業的鎖定時,通常會發生這種情況。
當工作人員正在處理作業時,它將保持該作業“鎖定”,以便其他工作人員無法處理它。
重要的是要了解鎖定如何運作,以防止您的作業失去鎖定(變得停滯)並因此重新啟動。鎖定是透過在lockRenewTime
間隔(通常是lockDuration
一半)上為lockDuration
建立鎖定來內部實現的。如果在鎖定更新之前超過了lockDuration
,則作業將被視為停止並自動重新啟動;它將被雙重處理。這可能發生在以下情況:
執行作業處理器的 Node 程序意外終止。
您的作業處理器過於佔用 CPU 資源,導致 Node 事件循環停滯,因此 Bull 無法更新作業鎖(請參閱 #488 以了解如何更好地偵測到這一點)。您可以透過將作業處理器分成更小的部分來解決此問題,這樣任何單一部分都不會阻塞 Node 事件循環。或者,您可以為lockDuration
設定傳遞一個更大的值(代價是需要更長的時間來識別真正停滯的作業)。
因此,您應該始終偵聽stalled
事件並將其記錄到錯誤監視系統,因為這表示您的作業可能會被雙重處理。
作為一種保護措施,有問題的作業不會無限期地重新啟動(例如,如果作業處理器總是使其節點進程崩潰),作業將從停滯狀態恢復最多maxStalledCount
次(預設值: 1
)。