プロフェッショナルなフロントエンドでキューを強化します。
すべてのキューの完全な概要を取得します。
ジョブを検査し、遅延したジョブを検索、再試行、またはプロモートします。
メトリクスと統計。
他にも多くの機能があります。
Taskforce.sh にサインアップします
ポーリング不要の設計により、CPU 使用率が最小限に抑えられます。
Redis に基づいた堅牢な設計。
遅れた仕事。
cron 仕様に従ってジョブをスケジュールし、繰り返します。
ジョブのレート制限。
再試行します。
優先度。
同時実行性。
グローバルまたはローカルで一時停止/再開します。
キューごとに複数のジョブ タイプ。
スレッド化された (サンドボックス化された) 処理関数。
プロセスクラッシュからの自動回復。
そしてロードマップに登場するのは...
ジョブ完了の確認 (その間はメッセージ キュー パターンを使用できます)。
親子の仕事関係。
監視に使用できるサードパーティ UI がいくつかあります。
ブルMQ
タスクフォース
ブル v3
タスクフォース
掲示板
ブルレプル
ブルモニター
モニターロ
雄牛 <= v2
マタドール
反応する雄牛
トゥレイロ
Prometheus Bull Queue Exporter を使用
ジョブ キュー ソリューションはいくつかあるので、それらを比較した表を次に示します。
Dragonfly は、BullMQ と完全に互換性のある新しい Redis™ ドロップイン代替品であり、利用可能なすべての CPU コアを利用することによる大幅なパフォーマンスの向上や、より高速でメモリ効率の高いデータ構造など、Redis™ に比べていくつかの重要な利点をもたらします。 BullMQ での使用方法の詳細については、こちらをお読みください。 | |
Bull プロジェクトに高品質の本番 Redis インスタンスが必要な場合は、BullMQ と完全に連携する Redis ホスティングのリーダーである Memetria for Redis のサブスクライブを検討してください。 BullMQ の開発のスポンサーになるために、サインアップ時にプロモーション コード「BULLMQ」を使用してください。 |
特徴 | BullMQ-プロ | ブルMQ | ブル | クエ | 蜂 | 議題 |
---|---|---|---|---|---|---|
バックエンド | レディス | レディス | レディス | レディス | レディス | モンゴ |
観測可能なもの | ✓ | |||||
グループレート制限 | ✓ | |||||
グループサポート | ✓ | |||||
バッチのサポート | ✓ | |||||
親/子の依存関係 | ✓ | ✓ | ||||
優先事項 | ✓ | ✓ | ✓ | ✓ | ✓ | |
同時実行性 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
遅延したジョブ | ✓ | ✓ | ✓ | ✓ | ✓ | |
世界的なイベント | ✓ | ✓ | ✓ | ✓ | ||
レートリミッター | ✓ | ✓ | ✓ | |||
一時停止/再開 | ✓ | ✓ | ✓ | ✓ | ||
サンドボックス化されたワーカー | ✓ | ✓ | ✓ | |||
反復可能なジョブ | ✓ | ✓ | ✓ | ✓ | ||
原子作戦 | ✓ | ✓ | ✓ | ✓ | ||
持続性 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
UI | ✓ | ✓ | ✓ | ✓ | ✓ | |
に最適化 | ジョブ/メッセージ | ジョブ/メッセージ | ジョブ/メッセージ | 求人 | メッセージ | 求人 |
npm install bull --save
または
糸追加雄牛
要件: Bull には、Redis バージョン2.8.18
以上が必要です。
npm install @types/bull --save-dev
糸追加 --dev @types/bull
定義は現在、DefinitelyTyped リポジトリに維持されています。
コードの修正、新機能、ドキュメントの改善など、あらゆる種類の貢献を歓迎します。コードのフォーマットは、prettier によって強制されます。コミットについては、従来のコミット規則に従ってください。すべてのコードは、開発にマージする前に、lint ルールとテスト スイートに合格する必要があります。
const Queue = require('bull');const videoQueue = new Queue('ビデオ トランスコーディング', 'redis://127.0.0.1:6379');const audioQueue = new Queue('オーディオ トランスコーディング', { redis: { port : 6379、ホスト: '127.0.0.1'、パスワード: 'foabared' } }); // objectを使用してRedis接続を指定しますconst imageQueue = new Queue('image transcoding');const pdfQueue = new Queue('pdf transcoding');videoQueue.process(function (job,ned) { // job.data には、ジョブの作成時に渡されたカスタム データが含まれています // job.id にはこのジョブの ID が含まれます。 // ビデオを非同期にトランスコードし、進捗状況を報告します job.progress(42); // 終了したら、done を呼び出します 終わり(); // エラーの場合はエラーを返す 完了(新しいエラー('トランスコーディングエラー')); // または結果を渡します 完了(null, { フレームレート: 29.5 /* etc... */ }); // ジョブが未処理の例外をスローした場合も、正しく処理されます throw new Error('予期しないエラー');});audioQueue.process(function (ジョブ、完了) { // 音声を非同期にトランスコードし、進行状況を報告する job.progress(42); // 終了したら、done を呼び出します 終わり(); // エラーの場合はエラーを返す 完了(新しいエラー('トランスコーディングエラー')); // または結果を渡します 完了(null, { サンプルレート: 48000 /* など... */ }); // ジョブが未処理の例外をスローした場合も、正しく処理されます throw new Error('予期せぬエラー');});imageQueue.process(function (ジョブ, 完了) { // 画像を非同期にトランスコードし、進行状況を報告する job.progress(42); // 終了したら、done を呼び出します 終わり(); // エラーの場合はエラーを返す 完了(新しいエラー('トランスコーディングエラー')); // または結果を渡します 完了(null, { 幅: 1280, 高さ: 720 /* など... */ }); // ジョブが未処理の例外をスローした場合も、正しく処理されます throw new Error('予期せぬエラー');});pdfQueue.process(function (job) { // プロセッサは、done コールバックを使用する代わりに Promise を返すこともできます return pdfAsyncProcessor();});videoQueue.add({ ビデオ: 'http://example.com/video1.mov' });audioQueue.add({ オーディオ: 'http://example.com/audio1.mp3 ' });imageQueue.add({ 画像: 'http://example.com/image1.tiff' });
あるいは、 done
コールバックを使用する代わりに Promise を返すこともできます。
videoQueue.process(function (job) { // 完了コールバックを削除することを忘れないでください。 //単にPromiseを返すだけです return fetchVideo(job.data.url).then(transcodeVideo); // プロミスの拒否を処理します return Promise.reject(new Error('エラートランスコーディング')); // Promise が解決された値を「completed」イベントに渡します return Promise.resolve({ フレームレート: 29.5 /* etc... */ }); // ジョブが未処理の例外をスローした場合も、正しく処理されます throw new Error('予期しないエラー'); // と同じ return Promise.reject(new Error('予期せぬエラー'));});
process 関数は別のプロセスで実行することもできます。これにはいくつかの利点があります。
プロセスはサンドボックス化されているため、クラッシュしてもワーカーには影響しません。
キューに影響を与えることなく、ブロッキング コードを実行できます (ジョブは停止しません)。
マルチコア CPU の使用率が大幅に向上します。
Redis への接続が少なくなります。
この機能を使用するには、プロセッサで別のファイルを作成するだけです。
//processor.jsmodule.exports = 関数 (ジョブ) { // 重い作業を行う return Promise.resolve(result);}
そしてプロセッサを次のように定義します。
// 単一プロセス:queue.process('/path/to/my/processor.js');// 同時実行も使用できます:queue.process(5, '/path/to/my/processor.js' );// および名前付きプロセッサ:queue.process('myprocessor', 5, '/path/to/my/processor.js');
ジョブはキューに追加され、cron 仕様に従って繰り返し処理できます。
paymentQueue.process(function (job) {// 支払いを確認します }); // 毎日 1 回、午前 3 時 15 分に支払いジョブを繰り返します paymentsQueue.add(paymentsData, { 繰り返し: { cron: '15 3 * * *' } });
ヒントとして、ここで式をチェックして、それらが正しいことを確認してください: cron 式ジェネレーター
キューはグローバルに一時停止および再開できます (このワーカーのみの処理を一時停止するにはtrue
を渡します)。
queue.pause().then(function() { // キューは現在一時停止中});queue.resume().then(function () { // キューが再開されました})
キューはいくつかの有用なイベントを発行します。たとえば...
.on('completed', 関数 (ジョブ, 結果) { // ジョブは出力結果とともに完了しました!})
発生するイベントの完全なリストなど、イベントの詳細については、「イベント リファレンス」を参照してください。
キューは安価なので、多数のキューが必要な場合は、別の名前で新しいキューを作成するだけです。
const userJohn = new Queue('john');const userLisa = new Queue('lisa');...
ただし、すべてのキュー インスタンスには新しい Redis 接続が必要です。接続を再利用する方法を確認するか、名前付きプロセッサを使用して同様の結果を達成することもできます。
注: バージョン 3.2.0 以降では、代わりにスレッドプロセッサを使用することをお勧めします。
キューは堅牢であり、危険やキュー破損のリスクを伴うことなく、複数のスレッドまたはプロセスで並行して実行できます。クラスターを使用してプロセス間でジョブを並列化するこの簡単な例を確認してください。
const Queue = require('bull');const cluster = require('cluster');const numWorkers = 8;const queue = new Queue('test concurrent queue');if (cluster.isMaster) { for (let i = 0; i < numWorkers; i++) {cluster.fork(); } cluster.on('online', function (worker) {// キューにいくつかのジョブを作成しましょう worksfor (let i = 0; i < 500; i++) { queue.add({ foo: 'bar' }); }; }); クラスタ.on('終了', 関数 (ワーカー, コード, シグナル) {console.log('ワーカー ' + ワーカー.プロセス.pid + ' 死亡'); });} それ以外 { queue.process(function (job, jobDone) {console.log('ワーカーによって実行されたジョブ',cluster.worker.id, job.id);jobDone(); });}
完全なドキュメントについては、リファレンスおよび一般的なパターンを確認してください。
ガイド — Bull を使用した開発の出発点です。
リファレンス — 利用可能なすべてのオブジェクトとメソッドを含むリファレンス ドキュメント。
パターン — 一般的なパターンの例のセット。
ライセンス — Bull ライセンス — それは MIT です。
さらにドキュメントを使用できるものを見つけた場合は、プル リクエストを送信してください。
キューは「少なくとも 1 回」の動作戦略を目指しています。これは、状況によっては、ジョブが複数回処理される可能性があることを意味します。これは主に、ワーカーが処理の合計期間中に特定のジョブのロックを維持できなかった場合に発生します。
ワーカーがジョブを処理しているときは、他のワーカーがジョブを処理できないようにジョブを「ロック」したままにします。
ジョブがロックを失い、停止し、その結果再起動されることを防ぐために、ロックがどのように機能するかを理解することが重要です。ロックはlockRenewTime
間隔 (通常はlockDuration
半分) でlockDuration
のロックを作成することによって内部的に実装されます。ロックを更新できる前にlockDuration
経過すると、ジョブは停止したとみなされ、自動的に再開されます。二重処理となります。これは次の場合に発生する可能性があります。
ジョブ プロセッサを実行しているノード プロセスが予期せず終了します。
ジョブ プロセッサが CPU を集中的に使用しすぎてノード イベント ループが停止し、その結果、Bull はジョブ ロックを更新できませんでした (これをより適切に検出する方法については #488 を参照してください)。この問題は、ジョブ プロセッサを小さな部分に分割して、単一の部分が Node イベント ループをブロックできないようにすることで修正できます。あるいは、 lockDuration
設定により大きな値を渡すこともできます (実際に停止したジョブを認識するのに時間がかかるというトレードオフがあります)。
したがって、ジョブが二重処理される可能性があることを意味するため、常にstalled
イベントをリッスンし、これをエラー監視システムに記録する必要があります。
問題のあるジョブが無期限に再起動されないための安全策として (たとえば、ジョブ プロセッサがノード プロセスを常にクラッシュさせる場合)、ジョブは最大maxStalledCount
回 (デフォルト: 1
) 停止状態から回復されます。