Redis 支援的任務佇列引擎,具有高階任務控制和最終一致性。
大範圍的任務分組、連結、迭代器。
推遲和計劃的任務運行。
負載分配+工作池。
易於嵌入。
idoit
提供了先進的控制來實現
分組。特殊group
任務執行子任務並等待所有任務完成。對於映射/歸約邏輯很有用。
連鎖。特殊的chain
任務一一執行孩子。對於映射縮減或將非常複雜的任務拆分為更簡單的步驟也很有用。
映射迭代器。針對巨大有效負載的特殊功能,可按需產生區塊。好處:
映射階段沒有滯後,區塊處理立即開始。
易於優化資料庫查詢以建立相同大小的區塊(跳過+限制查詢在大數據上非常慢)。
進步。當您使用群組/鏈/地圖場景時,可以輕鬆地透過頂級父級監控總進度。長獨立任務還可以通知用戶進度變化。
工人池。您可以按不同的流程拆分任務。例如,如果您不希望繁重的任務阻礙輕量的任務。
調度者。內建 cron 允許按給定的時間表執行任務。
Redis中的所有數據都是永遠一致的。
任務不會遺失,但可以在邊緣情況下運行兩次(如果任務功能即將完成時進程崩潰)
如果使用task.progressAdd()
並且在任務完成之前進程崩潰,進度可以「更快」地計數。但這並不重要,因為此類資訊只能用於介面進度條更新。在大多數情況下,您不會看到差異。
需要node.js
6+ 和redis
3.0+。
npm 安裝 idoit --save
redisURL (String) - redis 連線 url。
concurrency (Number) - 單一工作執行緒並行消耗的最大任務數,預設為 100。
pool (String) - 工作池名稱,如果未設定則為「預設」。如果此佇列實例僅消耗任務(在.start()
之後),則使用。您可以將任務路由到特定的工作人員池,以避免不必要的鎖定。您可以將pool
設定為數組, [ 'pool1', 'pool2' ]
以使用多個池中的任務(用於開發/測試目的)。
ns (String) - 資料命名空間,目前用作 redis 鍵前綴,預設為「idoitqueue:」。
對於繁重的阻塞任務和非阻塞任務,有單獨的工作池是一個很好的做法。例如,任何人都不應阻止發送緊急電子郵件。因此,創建多個工作進程,將它們固定到不同的池中並設定適當的任務並發性。非阻塞任務可以並行消費, concurrency
concurrency
=100就可以了。
筆記。您可能會從應用程式中刪除某些任務類型。在這種情況下,孤立資料將在 3 天後被擦除。
選項:
name (字串)- 任務的名稱。
baseClass (Function) - 可選,基本任務的建構函數,預設為「Task」。
init (Function) - 可選,用於非同步任務初始化,應返回Promise
this (Object) - 目前任務(任務總計可用作this.total
)。
taskID (函數)- 可選,應傳回新的任務 ID。僅在建立「獨佔」任務時需要,預設會傳回隨機值,稱為: function (taskData)
。 Sugar:如果您傳遞純字串,它將包裝到始終傳回該字串的函數。
process (Function) - 主任務函數,稱為: task.process(...args)
。應該返回Promise
this (物件)- 目前任務。
retry (Number) - 可選,出錯時重試的次數,預設 2。
retryDelay (Number) - 可選,重試後的延遲(以毫秒為單位),預設為 60000 毫秒。
timeout (Number) - 可選,執行逾時,預設為 120000 毫秒。
Total (Number) - 可選,最大進度值,預設 1。
延遲延遲(數字) - 可選,如果沒有延遲地呼叫推遲,則假定延遲等於該值(以毫秒為單位)。
cron (String) - 可選,cron 字串 ("15 */6 * * *"),預設為 null。
軌道(數量)- 預設 3600000 毫秒(1 小時)。是時候記住 cron 中的計劃任務了,以避免在叢集中的多個伺服器時脈錯誤時重新運行。對於非常頻繁的任務不要設定得太高,因為它會佔用大量記憶體。
透過id獲取任務。傳回一個透過任務解決的 Promise,如果任務不存在則傳回null
。
您可以使用的任務欄位:
總計- 任務總進度
進度- 目前任務進度
result - 任務結果
error - 任務錯誤
取消任務。返回一個透過任務解決的 Promise。
筆記。您只能取消沒有父任務的任務。
啟動worker並開始任務資料消耗。返回Promise
,當佇列準備好時解決(在內部呼叫.ready()
)。
如果在 structor 中指定了pool
,則只會消耗路由到此拉取的任務。
停止從佇列中接受新任務。返回Promise
,當該工作執行緒中的所有活動任務完成時已解決。
返回Promise
,當佇列準備好操作時解決(在「connect」事件之後,請參閱下文)。
更新建構函式選項,redisURL 除外。
idoit
是一個EventEmitter
實例,它觸發一些事件:
當redis連線建立並且可以執行指令時ready
(無需連線即可註冊任務)
發生錯誤時發生error
。
task:progress
, task:progress:<task_id>
- 任務更新進度時。事件資料為:{ id、uid、total、progress }
task:end
, task:end:<task_id>
- 任務結束時。事件資料為:{ id, uid }
使用可選參數建立新任務。
覆蓋任務屬性。例如,您可能希望將特定的群組/鏈任務指派給另一個池。
立即運行任務。傳回一個有任務 id 的 Promise。
推遲任務執行以delay
毫秒(或task.postponeDelay
)。
傳回一個有任務 id 的 Promise。
重新啟動目前正在運行的任務。
add_retry (Boolean) - 可選,是否增加重試次數(預設值: false)
如果true
,重試次數會增加,且如果超出則任務不會重新啟動
如果為false
,重試計數保持不變,因此任務可以無限期地自行重新啟動
延遲(數字)重新啟動之前的延遲(以毫秒為單位)(預設值: task.retryDelay
)。
請注意, idoit
已經內建了針對任務錯誤的重新啟動邏輯。也許,您不應該直接使用此方法。它針對非常具體的情況而公開。
增加目前任務進度。
傳回一個有任務 id 的 Promise。
更新目前任務截止日期。
傳回一個有任務 id 的 Promise。
建立一個新任務,並行執行子任務。
隊列.組([ 隊列.children1(), 隊列.children2(), 隊列.children3()]).run()
群組結果是未排序的子結果陣列。
建立一個新任務,連續執行子任務。如果任何一個孩子失敗了,那麼鏈條也會失敗。
queue.registerTask('乘法', (a, b) => a * b);queue.registerTask('減法', (a, b) => a - b);queue.chain([ 隊列.multiply(2, 3), // 2 * 3 = 6 隊列.subtract(10), // 10 - 6 = 4 隊列.multiply(3) // 3 * 4 = 12]).run()
上一個任務的結果作為下一個任務的最後一個參數傳遞。鏈的結果是鏈中最後一個任務的結果。
以惰性方式運行大型映射的特殊方式(按需)。請參閱下面的評論。
// 註冊迭代器 taskqueue.registerTask({ 名稱:'lazy_mapper', 基底類別:Queue.Iterator, // 此方法在任務開始和每個子任務結束時呼叫。它可以是 // 生成器函數或傳回 `Promise` 的函數。 * iterate(state) {// ...// 三種可能的輸出狀態:結束、不執行任何操作和新資料。 。新迭代器狀態(例如,偏移量// 資料庫查詢),任何可序列化的資料 // 任務 - 要推入佇列的新子任務數組 // }//// 重要!迭代器可以從不同的工作執行緒並行呼叫。 // 我們使用輸入 `state` 來解決 redis 更新時的衝突。因此,如果您// 建立新的子任務://// 1. 新的`state` 必須不同(對於所有先前的狀態)// 2. `tasks` 陣列不得為空。情況下,您應該發出關於 'end' 或 'idle' 的信號。 }});// 執行 iteratorqueue.lazy_mapper().run();
為什麼要發明這種瘋狂的魔法?
想像一下,您需要重建 1000 萬個論壇貼文。您希望將工作分成相等的小塊,但帖子沒有順序整數枚舉,只有 mongo ID。你能做什麼?
直接skip
+ limit
請求對於任何資料庫中的大集合來說都是非常昂貴的。
您不能按日期間隔拆分,因為第一篇文章到最後一篇文章的帖子密度差異很大。
您可以為每個帖子添加帶有隨機數字的索引欄位。然後按間隔分割。這可以工作,但會導致隨機磁碟存取 - 不太酷。
解決方案是使用迭代映射器,它可以記住“先前的位置”。在這種情況下,您將執行range
+ limit
請求,而不是skip
+ limit
。這對於資料庫來說效果很好。額外獎金有:
您不需要將所有子任務保留在佇列中。例如,您可以建立 100 個區塊,並在前一個區塊即將完成時新增下一個 100 個。
映射階段變得分散,您可以立即開始監控整體進度。
透過 docker Quik 運行 redis:
# startdocker run -d -p 6379:6379 --name redis1 redis# stopdocker 停止redis1 docker rm redis1
當然,我們熟悉 kue、celery 和 akka。我們的目標是在簡單性和功能之間取得平衡。因此,我們不知道idoit
在具有數千個實例的叢集中是否運作良好。但小體積應該沒問題,而且使用起來也很方便。
kue 無法滿足我們的需求,因為:
它的「優先順序」概念不靈活,並且不能很好地防止繁重任務的鎖定
沒有任務分組/連結等
沒有強有力的數據一致性保證
在idoit
中我們關心:
任務組/鏈操作&在任務之間傳遞資料(類似celery)
工作池依類型隔離任務執行。
易於使用和安裝(僅需要redis,可在現有進程中運行)
儲存資料的最終一致性
像內建調度程序一樣必不可少的糖
用於巨大有效負載的迭代映射器(獨特的功能,對於許多維護任務非常有用)
任務進度追蹤
避免全域鎖
Redis 仍然可能是一個故障點,但出於簡單性考慮,這是可以接受的價格。當然,您可以透過 RMQ 等分散式訊息總線獲得更好的可用性。但在許多情況下,讓事情保持簡單更為重要。借助idoit
您可以重複使用現有技術,而無需額外費用。