Laden Sie Ihre Warteschlangen mit einem professionellen Frontend auf:
Verschaffen Sie sich einen vollständigen Überblick über alle Ihre Warteschlangen.
Überprüfen Sie Jobs, suchen Sie, versuchen Sie es erneut oder bewerben Sie verzögerte Jobs.
Metriken und Statistiken.
und viele weitere Funktionen.
Melden Sie sich bei Taskforce.sh an
Minimale CPU-Auslastung durch abfragefreies Design.
Robustes Design basierend auf Redis.
Verspätete Aufträge.
Planen und wiederholen Sie Jobs gemäß einer Cron-Spezifikation.
Ratenbegrenzer für Jobs.
Wiederholungen.
Priorität.
Parallelität.
Anhalten/Fortsetzen – global oder lokal.
Mehrere Jobtypen pro Warteschlange.
Threaded-(Sandbox-)Verarbeitungsfunktionen.
Automatische Wiederherstellung nach Prozessabstürzen.
Und die Roadmap steht kurz bevor ...
Bestätigung des Auftragsabschlusses (in der Zwischenzeit können Sie das Nachrichtenwarteschlangenmuster verwenden).
Eltern-Kind-Berufsbeziehungen.
Es gibt einige Benutzeroberflächen von Drittanbietern, die Sie zur Überwachung verwenden können:
BullMQ
Taskforce
Bull v3
Taskforce
Bullenbrett
Bull-Repl
Bullenmonitor
Monitoro
Bulle <= v2
Matador
React-Bull
Toureiro
Mit Prometheus Bull Queue Exporter
Da es einige Jobwarteschlangenlösungen gibt, finden Sie hier eine Vergleichstabelle:
Dragonfly ist ein neuer Redis™-Drop-In-Ersatz, der vollständig mit BullMQ kompatibel ist und einige wichtige Vorteile gegenüber Redis™ bietet, wie z. B. eine deutlich bessere Leistung durch Nutzung aller verfügbaren CPU-Kerne sowie schnellere und speichereffizientere Datenstrukturen. Lesen Sie hier mehr über die Verwendung mit BullMQ. | |
Wenn Sie für Ihr Bull-Projekt qualitativ hochwertige Redis-Produktionsinstanzen benötigen, ziehen Sie bitte ein Abonnement bei Memetria for Redis in Betracht, dem führenden Anbieter von Redis-Hosting, der perfekt mit BullMQ funktioniert. Verwenden Sie bei der Anmeldung den Promo-Code „BULLMQ“, um uns dabei zu helfen, die Entwicklung von BullMQ zu sponsern! |
Besonderheit | BullMQ-Pro | BullMQ | Stier | Kue | Biene | Agenda |
---|---|---|---|---|---|---|
Backend | redis | redis | redis | redis | redis | Mongo |
Observable | ✓ | |||||
Gruppentarifbegrenzung | ✓ | |||||
Gruppenunterstützung | ✓ | |||||
Batch-Unterstützung | ✓ | |||||
Eltern-/Kind-Abhängigkeiten | ✓ | ✓ | ||||
Prioritäten | ✓ | ✓ | ✓ | ✓ | ✓ | |
Parallelität | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
Verzögerte Aufträge | ✓ | ✓ | ✓ | ✓ | ✓ | |
Globale Ereignisse | ✓ | ✓ | ✓ | ✓ | ||
Ratenbegrenzer | ✓ | ✓ | ✓ | |||
Pause/Fortsetzen | ✓ | ✓ | ✓ | ✓ | ||
Sandkastenarbeiter | ✓ | ✓ | ✓ | |||
Wiederholbare Aufträge | ✓ | ✓ | ✓ | ✓ | ||
Atomare Operationen | ✓ | ✓ | ✓ | ✓ | ||
Beharrlichkeit | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
Benutzeroberfläche | ✓ | ✓ | ✓ | ✓ | ✓ | |
Optimiert für | Jobs / Nachrichten | Jobs / Nachrichten | Jobs / Nachrichten | Jobs | Nachrichten | Jobs |
npm install bull --save
oder
Garn hinzufügen Bulle
Anforderungen: Bull erfordert eine Redis-Version größer oder gleich 2.8.18
.
npm install @types/bull --save-dev
Garn add --dev @types/bull
Definitionen werden derzeit im DefinitelyTyped-Repository verwaltet.
Wir freuen uns über alle Arten von Beiträgen, sei es Codekorrekturen, neue Funktionen oder Dokumentverbesserungen. Die Codeformatierung wird von Prettyer erzwungen. Befolgen Sie für Commits bitte die herkömmlichen Commits-Konventionen. Der gesamte Code muss Lint-Regeln und Testsuiten erfüllen, bevor er in die Entwicklung integriert werden kann.
const Queue = require('bull');const videoQueue = new Queue('video transcoding', 'redis://127.0.0.1:6379');const audioQueue = new Queue('audio transcoding', { redis: { port : 6379, Host: '127.0.0.1', Passwort: 'foobared' } }); // Redis-Verbindung mit Objekt angebenconst imageQueue = new Queue('image transcoding');const pdfQueue = new Queue('pdf transcoding');videoQueue.process(function (job, done) { // job.data enthält die benutzerdefinierten Daten, die beim Erstellen des Jobs übergeben wurden // job.id enthält die ID dieses Jobs. // Video asynchron transkodieren und Fortschritt melden job.progress(42); // Aufruf beendet, wenn fertig Erledigt(); // oder einen Fehler ausgeben, wenn ein Fehler vorliegt done(new Error('error transcoding')); // oder ein Ergebnis übergeben done(null, { Framerate: 29,5 /* etc... */ }); // Wenn der Job eine nicht behandelte Ausnahme auslöst, wird diese ebenfalls korrekt behandelt throw new Error('some unerwarteter Fehler');});audioQueue.process(function (job, done) { // Audio asynchron transkodieren und Fortschritt melden job.progress(42); // Aufruf beendet, wenn fertig Erledigt(); // oder einen Fehler ausgeben, wenn ein Fehler vorliegt done(new Error('error transcoding')); // oder ein Ergebnis übergeben done(null, { samplerate: 48000 /* etc... */ }); // Wenn der Job eine nicht behandelte Ausnahme auslöst, wird diese ebenfalls korrekt behandelt throw new Error('some unerwarteter Fehler');});imageQueue.process(function (job, done) { // Bild asynchron transkodieren und Fortschritt melden job.progress(42); // Aufruf beendet, wenn fertig Erledigt(); // oder einen Fehler ausgeben, wenn ein Fehler vorliegt done(new Error('error transcoding')); // oder ein Ergebnis übergeben done(null, { width: 1280, height: 720 /* etc... */ }); // Wenn der Job eine nicht behandelte Ausnahme auslöst, wird diese ebenfalls korrekt behandelt throw new Error('some unerwarteter Fehler');});pdfQueue.process(function (job) { // Prozessoren können auch Versprechen zurückgeben, anstatt den fertigen Rückruf zu verwenden return pdfAsyncProcessor();});videoQueue.add({ video: 'http://example.com/video1.mov' });audioQueue.add({ audio: 'http://example.com/audio1.mp3 ' });imageQueue.add({ image: 'http://example.com/image1.tiff' });
Alternativ können Sie Versprechen zurückgeben, anstatt den done
Rückruf zu verwenden:
videoQueue.process(function (job) { // Vergessen Sie nicht, den Fertig-Rückruf zu entfernen! // Geben Sie einfach ein Versprechen zurück return fetchVideo(job.data.url).then(transcodeVideo); // Behandelt die Ablehnung von Versprechen return Promise.reject(new Error('error transcoding')); // Übergibt den Wert, mit dem das Versprechen aufgelöst wird, an das „completed“-Ereignis return Promise.resolve({ Framerate: 29.5 /* etc... */ }); // Wenn der Job eine nicht behandelte Ausnahme auslöst, wird diese ebenfalls korrekt behandelt throw new Error('irgendein unerwarteter Fehler'); // das Gleiche wie return Promise.reject(new Error('some unerwarteter Fehler'));});
Die Prozessfunktion kann auch in einem separaten Prozess ausgeführt werden. Dies hat mehrere Vorteile:
Der Prozess ist in einer Sandbox ausgeführt, sodass ein Absturz keine Auswirkungen auf den Worker hat.
Sie können Blockierungscode ausführen, ohne die Warteschlange zu beeinträchtigen (Jobs werden nicht angehalten).
Viel bessere Ausnutzung von Multicore-CPUs.
Weniger Verbindungen zu Redis.
Um diese Funktion zu nutzen, erstellen Sie einfach eine separate Datei mit dem Prozessor:
// Prozessor.jsmodule.exports = Funktion (Job) { // Erledige schwere Arbeit return Promise.resolve(result);}
Und definieren Sie den Prozessor wie folgt:
// Einzelner Prozess:queue.process('/path/to/my/processor.js');// Sie können auch Parallelität verwenden:queue.process(5, '/path/to/my/processor.js' );// und benannte Prozessoren:queue.process('mein Prozessor', 5, '/path/to/my/processor.js');
Ein Job kann einer Warteschlange hinzugefügt und gemäß einer Cron-Spezifikation wiederholt verarbeitet werden:
paymentQueue.process(function (job) {// Zahlungen prüfen }); // Zahlungsauftrag einmal täglich um 3:15 Uhr wiederholen paymentQueue.add( paymentsData, { repeat: { cron: '15 3 * * *' } });
Als Tipp: Überprüfen Sie Ihre Ausdrücke hier, um sicherzustellen, dass sie korrekt sind: Cron-Ausdrucksgenerator
Eine Warteschlange kann global angehalten und fortgesetzt werden (übergeben Sie true
um die Verarbeitung nur für diesen Worker anzuhalten):
queue.pause().then(function () { // Warteschlange ist jetzt angehalten});queue.resume().then(function () { // Warteschlange wird jetzt fortgesetzt})
Eine Warteschlange gibt einige nützliche Ereignisse aus, zum Beispiel ...
.on('abgeschlossen', Funktion (Auftrag, Ergebnis) { // Job mit Ausgabeergebnis abgeschlossen!})
Weitere Informationen zu Ereignissen, einschließlich der vollständigen Liste der ausgelösten Ereignisse, finden Sie in der Ereignisreferenz
Warteschlangen sind günstig. Wenn Sie also viele davon benötigen, erstellen Sie einfach neue mit anderen Namen:
const userJohn = new Queue('john');const userLisa = new Queue('lisa');...
Für jede Warteschlangeninstanz sind jedoch neue Redis-Verbindungen erforderlich. Überprüfen Sie, wie Sie Verbindungen wiederverwenden können, oder Sie können auch benannte Prozessoren verwenden, um ein ähnliches Ergebnis zu erzielen.
HINWEIS: Ab Version 3.2.0 und höher wird empfohlen, stattdessen Thread-Prozessoren zu verwenden.
Warteschlangen sind robust und können in mehreren Threads oder Prozessen parallel ausgeführt werden, ohne dass das Risiko von Gefahren oder Warteschlangenbeschädigungen besteht. Sehen Sie sich dieses einfache Beispiel an, bei dem Cluster verwendet wird, um Jobs über Prozesse hinweg zu parallelisieren:
const Queue = require('bull');const cluster = require('cluster');const numWorkers = 8;const queue = new Queue('test concurrent queue');if (cluster.isMaster) { for (let i = 0; i < numWorkers; i++) {cluster.fork(); } cluster.on('online', function (worker) {// Erstellen wir ein paar Jobs für die Warteschlangen-Workerfor (let i = 0; i < 500; i++) { queue.add({ foo: 'bar' }); }; }); cluster.on('exit', function (worker, code, signal) {console.log('worker ' + worker.process.pid + ' died'); });} anders { queue.process(function (job, jobDone) {console.log('Job erledigt vom Worker', cluster.worker.id, job.id);jobDone(); });}
Die vollständige Dokumentation finden Sie in der Referenz und den allgemeinen Mustern:
Leitfaden – Ihr Ausgangspunkt für die Entwicklung mit Bull.
Referenz – Referenzdokument mit allen verfügbaren Objekten und Methoden.
Muster – eine Reihe von Beispielen für gängige Muster.
Lizenz – die Bull-Lizenz – es ist MIT.
Wenn Sie etwas sehen, das mehr Dokumente gebrauchen könnte, senden Sie bitte eine Pull-Anfrage!
Die Warteschlange zielt auf eine „mindestens einmal“-Arbeitsstrategie ab. Dies bedeutet, dass in manchen Situationen ein Auftrag mehr als einmal bearbeitet werden kann. Dies geschieht meist dann, wenn ein Mitarbeiter es versäumt, während der gesamten Verarbeitungsdauer eine Sperre für einen bestimmten Job aufrechtzuerhalten.
Wenn ein Mitarbeiter einen Auftrag verarbeitet, bleibt der Auftrag „gesperrt“, sodass andere Mitarbeiter ihn nicht bearbeiten können.
Es ist wichtig zu verstehen, wie die Sperrung funktioniert, um zu verhindern, dass Ihre Jobs ihre Sperre verlieren, ins Stocken geraten und dadurch neu gestartet werden. Die Sperrung wird intern implementiert, indem eine Sperre für lockDuration
im Intervall lockRenewTime
erstellt wird (normalerweise die Hälfte lockDuration
). Wenn lockDuration
abläuft, bevor die Sperre erneuert werden kann, gilt der Job als angehalten und wird automatisch neu gestartet; es wird doppelt verarbeitet . Dies kann passieren, wenn:
Der Knotenprozess, der Ihren Jobprozessor ausführt, wird unerwartet beendet.
Ihr Jobprozessor war zu CPU-intensiv und hat die Node-Ereignisschleife blockiert, weshalb Bull die Jobsperre nicht erneuern konnte (siehe Nr. 488, wie wir dies besser erkennen können). Sie können dieses Problem beheben, indem Sie Ihren Jobprozessor in kleinere Teile aufteilen, sodass kein einzelner Teil die Node-Ereignisschleife blockieren kann. Alternativ können Sie einen größeren Wert für die Einstellung lockDuration
übergeben (mit dem Nachteil, dass es länger dauert, einen wirklich blockierten Job zu erkennen).
Daher sollten Sie immer auf das stalled
Ereignis achten und es in Ihrem Fehlerüberwachungssystem protokollieren, da dies bedeutet, dass Ihre Jobs wahrscheinlich doppelt verarbeitet werden.
Als Schutzmaßnahme, damit problematische Jobs nicht auf unbestimmte Zeit neu gestartet werden (z. B. wenn der Jobprozessor seinen Node-Prozess immer zum Absturz bringt), werden Jobs maximal maxStalledCount
-Mal aus einem angehaltenen Zustand wiederhergestellt (Standard: 1
).