高度なタスク制御と結果整合性を備えた Redis ベースのタスク キュー エンジン。
タスクのグループ化、チェーン化、巨大な範囲のイテレータ。
延期およびスケジュールされたタスクの実行。
負荷分散 + ワーカー プール。
埋め込みも簡単。
idoit
実装するための高度な制御を提供します。
グループ化。特別なgroup
タスクは子タスクを実行し、すべてが完了するまで待ちます。マップ/リデュース ロジックに役立ちます。
連鎖。特別なchain
タスクは子を 1 つずつ実行します。マップリデュースや、非常に複雑なタスクをより単純なステップに分割する場合にも役立ちます。
マッピング反復子。オンデマンドでチャンクを生成する、巨大なペイロード用の特別な機能。利点:
マッピング段階での遅れはなく、チャンクの処理がすぐに開始されます。
DB クエリを最適化して同じサイズのチャンクを構築するのが簡単です (スキップ + 制限クエリは巨大なデータでは非常に遅くなります)。
進捗。グループ/チェーン/マップ シナリオを使用すると、最上位の親を介して全体の進行状況を簡単に監視できます。長時間にわたるスタンドアロン タスクでも、進行状況の変化をユーザーに通知できます。
ワーカープール。タスクを異なるプロセスごとに分割できます。たとえば、重いタスクが軽いタスクをブロックしたくない場合です。
シェデュラー。組み込みの cron により、指定されたスケジュールでタスクを実行できます。
Redis 内のすべてのデータは常に一貫性があります。
タスクが失われることはありませんが、エッジケースでは 2 回実行できます (タスク関数が終了しようとしていたときにプロセスがクラッシュした場合)
task.progressAdd()
が使用され、タスクが完了する前にプロセスがクラッシュすると、進行状況を「より速く」カウントできます。ただし、そのような情報はインターフェイスのプログレスバーの更新にのみ使用できるため、これは重要ではありません。ほとんどの場合、違いはわかりません。
node.js
6 以降およびredis
3.0 以降が必要です。
npm install idoit --save
redisURL (文字列) - Redis 接続 URL。
concurrency (数値) - 単一ワーカーによって並行して消費される最大タスク数 (デフォルトでは 100)。
pool (文字列) - ワーカー プール名、設定されていない場合は「デフォルト」。このキュー インスタンスがタスクのみを消費する場合に使用されます ( .start()
の後)。不要なロックを回避するために、タスクをワーカーの特定のプールにルーティングできます。 pool
を配列[ 'pool1', 'pool2' ]
に設定して、複数のプールからタスクを消費できます (開発/テスト目的)。
ns (文字列) - データ名前空間。現在 Redis キーのプレフィックスとして使用されており、デフォルトでは「idoitqueue:」です。
重度のブロッキング タスクと非ブロッキング タスクに別個のワーカー プールを用意することをお勧めします。たとえば、誰も緊急メールの送信をブロックすべきではありません。したがって、いくつかのワーカー プロセスを作成し、それらを異なるプールに固定し、適切なタスクの同時実行性を設定します。非ブロッキング タスクは並行して実行でき、デフォルトのconcurrency
= 100 で問題ありません。ブロッキング タスクは 1 つずつ実行する必要があり、それらのワーカーにはconcurrency
= 1 を設定します。
注記。一部のタスク タイプをアプリから削除する場合があります。この場合、孤立したデータは 3 日後に消去されます。
オプション:
name (文字列) - タスクの名前。
baseClass (関数) - オプション、基本タスクのコンストラクター、デフォルトでは「タスク」。
init (関数) - オプション。非同期タスクの初期化に使用され、 Promise
返す必要があります。
this (オブジェクト) - 現在のタスク (タスクの合計はthis.total
として利用可能)。
taskID (関数) - オプションで、新しいタスク ID を返す必要があります。 「排他的な」タスクを作成する場合にのみ必要で、 function (taskData)
と呼ばれるデフォルトでランダムな値を返します。 Sugar: プレーンな文字列を渡すと、常にこの文字列を返す関数にラップされます。
process (関数) - メインタスク関数。 task.process(...args)
と呼ばれます。 Promise
返す必要があります
this (オブジェクト) - 現在のタスク。
retry (数値) - オプション、エラー時の再試行回数、デフォルトは 2。
retryDelay (数値) - オプション、再試行後の遅延 (ミリ秒)、デフォルトは 60000 ミリ秒。
timeout (数値) - オプション、実行タイムアウト、デフォルトは 120000 ミリ秒。
total (数値) - オプション、最大進行状況値、デフォルトは 1。動作を変更しない場合、進行状況は 0 から始まり、タスク終了時に 1 になります。
延期Delay (数値) - オプション。遅延なしで延期が呼び出された場合、遅延はこの値 (ミリ秒単位) と等しいとみなされます。
cron (文字列) - オプション、cron 文字列 ("15 */6 * * *")、デフォルトは null。
トラック(番号) - デフォルトは 3600000 ミリ秒 (1 時間)。クラスター内の複数のサーバーのクロックが間違っている場合に再実行を避けるために、cron からスケジュールされたタスクを記憶する時間。大量のメモリを占有する可能性があるため、非常に頻繁なタスクの場合はあまり高く設定しないでください。
IDでタスクを取得します。タスクで解決された Promise を返します。タスクが存在しない場合はnull
で解決されます。
使用できるタスクフィールド:
total - タスクの合計進捗状況
progress - 現在のタスクの進捗状況
result - タスクの結果
error - タスクのエラー
タスクをキャンセルします。タスクで解決された Promise を返します。
注記。親のないタスクのみをキャンセルできます。
ワーカーを開始し、タスク データの消費を開始します。キューの準備ができたときに解決されるPromise
を返します (内部で.ready()
を呼び出します)。
cunstructor でpool
が指定されている場合、このプルにルーティングされたタスクのみが消費されます。
キューからの新しいタスクの受け入れを停止します。 Promise
を返します。このワーカーのすべてのアクティブなタスクが完了すると解決されます。
Promise
を返します。キューが動作する準備ができたときに解決されます (「connect」イベントの後、以下を参照)。
redisURL を除くコンストラクター オプションを更新します。
idoit
いくつかのイベントを発生させるEventEmitter
インスタンスです。
Redis接続が確立され、コマンドが実行できるようになるとready
(接続なしでタスクを登録できます)
エラーが発生した場合はerror
。
task:progress
、 task:progress:<task_id>
- タスクの更新の進行状況。イベントデータ: { id、uid、total、progress }
task:end
、 task:end:<task_id>
- タスクが終了するとき。イベントデータ: { id, uid }
オプションのパラメータを使用して新しいタスクを作成します。
タスクのプロパティをオーバーライドします。たとえば、特定のグループ/チェーン タスクを別のプールに割り当てることができます。
タスクをすぐに実行します。タスク ID で解決された Promise を返します。
タスクの実行を延期してミリ秒単位でdelay
(またはtask.postponeDelay
まで)。
タスク ID で解決された Promise を返します。
現在実行中のタスクを再起動します。
add_retry (ブール値) - オプション、再試行回数を増やすかどうか (デフォルト: false)
true
の場合、再試行回数が増加し、それを超えた場合にはタスクは再起動されません。
false
の場合、再試行回数は変わらないため、タスクは無制限に再起動できます。
遅延(数値) 再起動までの遅延 (ミリ秒単位) (デフォルト: task.retryDelay
)。
idoit
タスク エラー時の再起動ロジックが既に組み込まれていることに注意してください。おそらく、このメソッドを直接使用しないでください。非常に特殊なケースで公開されます。
現在のタスクの進行状況を増分します。
タスク ID で解決された Promise を返します。
現在のタスクの期限を更新します。
タスク ID で解決された Promise を返します。
新しいタスクを作成し、子を並行して実行します。
キュー.グループ([ queue.children1()、 queue.children2()、 queue.children3()]).run()
グループの結果は、子の結果のソートされていない配列です。
新しいタスクを作成し、子を連続して実行します。子のいずれかが失敗すると、チェーンも失敗します。
queue.registerTask('multiply', (a, b) => a * b);queue.registerTask('subtract', (a, b) => a - b);queue.chain([ queue.multiply(2, 3), // 2 * 3 = 6 queue.subtract(10), // 10 - 6 = 4 queue.multiply(3) // 3 * 4 = 12]).run()
前のタスクの結果は、次のタスクの最後の引数として渡されます。チェーンの結果は、チェーン内の最後のタスクの結果です。
巨大なマッピングを遅延スタイル (オンデマンド) で実行する特別な方法。以下のコメントを参照してください。
// イテレータを登録します taskqueue.registerTask({ 名前: 'lazy_mapper', 基本クラス: Queue.Iterator、 // このメソッドはタスクの開始時とすべての子の終了時に呼び出されます。それは可能です // ジェネレーター関数、または `Promise` を返す関数。 * iterate(state) {// ...// 可能な出力状態は 3 種類あります: 終了、何もしない、新しいデータ。//// 1. `null` - 終了に達しました。イテレータはもう呼び出されません。// 2. `{}` - アイドル状態、キューに十分なサブタスクがあります。// 後でイテレータを呼び出してみます (次の子が終了したとき)。// 3. {// state - 記憶する新しいイテレータの状態 (オフセットなど) // db クエリ用)、シリアル化可能なデータ // タスク - キューにプッシュする新しいサブタスクの配列 // }//// 重要!イテレータは、異なるワーカーから並行して呼び出すことができます。 // 入力 `state` を使用して、redis 更新時の衝突を解決します。したがって、// 新しいサブタスクを作成する場合 /// 1. 新しい `state` は (以前のすべての状態に対して) 異なっていなければなりません // 2. `tasks` 配列は空であってはなりません。//// それ以外の場合は、 'end' または 'idle' について通知する必要があります。//// 無効な組み合わせは、'end' + エラー イベントを引き起こします。//return { state: newState, task: chunksArray}; }});// iteratorqueue.lazy_mapper().run(); を実行します。
なぜこのクレイジーな魔法が発明されたのでしょうか?
1,000 万件のフォーラム投稿を再構築する必要があると想像してください。作業を同じ小さなチャンクで分割したいと考えていますが、投稿には連続した整数の列挙がなく、mongo ID のみが含まれています。何ができるでしょうか?
直接skip
+ limit
リクエストは、データベース内の大きなコレクションでは非常にコストがかかります。
投稿密度は最初の投稿から最後の投稿まで大きく異なるため、日付間隔で分割することはできません。
すべての投稿に乱数を含むインデックス付きフィールドを追加できます。次に、間隔ごとに分割します。これでも動作しますが、ランダムなディスク アクセスが発生するため、あまり良くありません。
解決策は、「前の位置」を記憶できる反復マッパーを使用することです。この場合、 skip
+ limit
の代わりにrange
+ limit
リクエストを実行します。これはデータベースとうまく連携します。追加のボーナスは次のとおりです。
すべてのサブタスクをキューに入れておく必要はありません。たとえば、100 個のチャンクを作成し、前のチャンクが終了しようとしているときに次の 100 個を追加できます。
マッピングフェーズが分散され、全体の進行状況の監視をすぐに開始できます。
Docker 経由で Redis を Quik 実行します。
# startdocker run -d -p 6379:6379 --name redis1 redis# stopdocker stop redis1 docker rm redis1
もちろん、私たちはクエ、セロリ、アッカに精通しています。私たちの目標は、シンプルさとパワーのバランスを取ることでした。したがって、数千のインスタンスを持つクラスターでidoit
適切に機能するかどうかはわかりません。しかし、少量であれば大丈夫ですし、とても使いやすいです。
kue は私たちのニーズには適していませんでした。理由は次のとおりです。
「優先順位」の概念は柔軟ではなく、重いタスクによるロックから十分に保護されません。
タスクのグループ化/チェーン化などはありません
データの一貫性に対する強力な保証はない
idoit
では次の点に注意しました。
タスクグループ/チェーン操作とタスク間でのデータの受け渡し (セロリと同様)
ワーカー プールを使用してタスクの実行をタイプごとに分離します。
使用とインストールが簡単 (redis のみが必要、既存のプロセスで実行可能)
保存されたデータの最終的な整合性
内蔵スケジューラーのような必須の砂糖
巨大なペイロード用の反復マッパー (多くのメンテナンス タスクに非常に役立つ独自の機能)
タスクの進捗状況の追跡
グローバルロックを避ける
Redis は依然として障害点になる可能性がありますが、シンプルさの代償として許容できる範囲です。もちろん、RMQ のような分散メッセージ バスを使用すると、可用性が向上します。しかし多くの場合、物事をシンプルにしておくことがより重要です。 idoit
使用すると、追加費用をかけずに既存のテクノロジーを再利用できます。