Von Redis unterstützte Task-Queue-Engine mit erweiterter Task-Steuerung und letztendlicher Konsistenz.
Aufgabengruppierung, Verkettung, Iteratoren für große Bereiche.
Verschobene und geplante Aufgabenausführung.
Lastverteilung + Worker-Pools.
Einfach einzubetten.
idoit
bietet erweiterte Kontrolle zur Implementierung
Gruppierung . Spezielle group
führen untergeordnete Aufgaben aus und warten, bis alle abgeschlossen sind. Nützlich für die Zuordnungs-/Reduzierungslogik.
Verkettung . Eine spezielle chain
führt die Kinder einzeln aus. Auch nützlich für die Kartenreduzierung oder die Aufteilung sehr komplizierter Aufgaben in einfachere Schritte.
Mapping-Iterator . Besondere Funktion für große Nutzlasten, um Brocken nach Bedarf zu produzieren. Vorteile:
Keine Verzögerungen in der Mapping-Phase, die Chunk-Verarbeitung beginnt sofort.
Einfach zu optimierende DB-Abfragen, um Blöcke gleicher Größe zu erstellen (Abfragen überspringen und begrenzen sind bei großen Datenmengen sehr langsam).
Fortschritt . Wenn Sie Gruppen-/Ketten-/Kartenszenarien verwenden, ist es einfach, den Gesamtfortschritt über das oberste übergeordnete Element zu überwachen. Lange eigenständige Aufgaben können den Benutzer auch über Fortschrittsänderungen benachrichtigen.
Arbeiterpools . Sie können Aufgaben nach verschiedenen Prozessen aufteilen. Wenn Sie beispielsweise nicht möchten, dass schwere Aufgaben leichte Aufgaben blockieren.
Planer . Der integrierte Cron ermöglicht die Ausführung von Aufgaben nach einem vorgegebenen Zeitplan.
Alle Daten in Redis sind stets konsistent.
Die Aufgabe kann nicht verloren gehen, kann aber in Grenzfällen zweimal ausgeführt werden (wenn der Prozess abstürzt, als die Aufgabenfunktion kurz vor dem Abschluss stand).
Der Fortschritt kann „schneller“ gezählt werden, wenn task.progressAdd()
verwendet wird und der Prozess abstürzt, bevor die Aufgabe abgeschlossen ist. Dies ist jedoch nicht kritisch, da solche Informationen nur für Aktualisierungen der Fortschrittsbalken der Benutzeroberfläche verwendet werden können. In den meisten Fällen werden Sie den Unterschied nicht bemerken.
node.js
6+ und redis
3.0+ erforderlich.
npm install idoit --save
redisURL (String) – Redis-Verbindungs-URL.
Parallelität (Anzahl) – maximale Anzahl von Aufgaben, die von einem einzelnen Worker parallel verarbeitet werden können, standardmäßig 100.
pool (String) – Name des Worker-Pools, „Standard“, wenn nicht festgelegt. Wird verwendet, wenn diese Warteschlangeninstanz nur Aufgaben verbraucht (nach .start()
). Sie können Aufgaben an bestimmte Worker-Pools weiterleiten, um unerwünschte Sperren zu vermeiden. Sie können pool
auf Array, [ 'pool1', 'pool2' ]
setzen, um Aufgaben aus mehreren Pools zu nutzen (für Entwicklungs-/Testzwecke).
ns (String) – Daten-Namespace, wird derzeit als Präfix für Redis-Schlüssel verwendet, standardmäßig „idoitqueue:“.
Es empfiehlt sich, separate Worker-Pools für stark blockierende und nicht blockierende Aufgaben zu haben. Beispielsweise sollte niemand den Versand dringender E-Mails blockieren. Erstellen Sie also mehrere Worker-Prozesse, heften Sie diese an verschiedene Pools und legen Sie die richtige Aufgaben-Parallelität fest. Nicht blockierende Aufgaben können parallel bearbeitet werden, und Sie können mit der Standard- concurrency
= 100 zufrieden sein. Blockierende Aufgaben sollten einzeln verarbeitet werden, legen Sie für diese Worker concurrency
= 1 fest.
Notiz. Es kann vorkommen, dass Sie einige Aufgabentypen aus Ihrer App entfernen. In diesem Fall werden verwaiste Daten nach 3 Tagen gelöscht.
Optionen:
name (String) – der Name der Aufgabe.
baseClass (Funktion) – optional, Konstruktor der Basisaufgabe, standardmäßig „Task“.
init (Funktion) – optional, wird für die Initialisierung asynchroner Aufgaben verwendet und sollte Promise
zurückgeben
this (Object) – aktuelle Aufgabe (Aufgabensumme ist verfügbar als this.total
).
taskID (Funktion) – optional, sollte neue Aufgaben-ID zurückgeben. Wird nur zum Erstellen „exklusiver“ Aufgaben benötigt, gibt standardmäßig einen Zufallswert zurück, aufgerufen als: function (taskData)
. Zucker: Wenn Sie eine einfache Zeichenfolge übergeben, wird diese in eine Funktion umschlossen, die immer diese Zeichenfolge zurückgibt.
Prozess (Funktion) – Hauptaufgabenfunktion, aufgerufen als: task.process(...args)
. Sollte Promise
zurückgeben
this (Object) – aktuelle Aufgabe.
Wiederholung (Anzahl) – optional, Anzahl der Wiederholungen bei Fehler, Standard 2.
retryDelay (Zahl) – optional, Verzögerung in ms nach Wiederholungsversuchen, Standard 60000 ms.
timeout (Zahl) – optional, Ausführungszeitlimit, Standard 120000 ms.
total (Zahl) – optional, maximaler Fortschrittswert, Standardwert 1. Wenn Sie das Verhalten nicht ändern, beginnt der Fortschritt bei 0 und wird am Ende der Aufgabe 1.
verschiebenDelay (Zahl) – optional. Wenn „Verschieben“ ohne Verzögerung aufgerufen wird, wird davon ausgegangen, dass die Verzögerung diesem Wert (in Millisekunden) entspricht.
cron (String) – optional, Cron-String („15 */6 * * *“), Standardwert null.
Spur (Anzahl) – Standard 3600000 ms (1 Stunde). Zeit, sich geplante Aufgaben von Cron zu merken, um eine erneute Ausführung zu vermeiden, wenn mehrere Server im Cluster falsche Uhren haben. Stellen Sie den Wert für sehr häufige Aufgaben nicht zu hoch ein, da dies viel Speicher beanspruchen kann.
Aufgabe nach ID abrufen. Gibt ein Versprechen zurück, das mit der Aufgabe oder mit null
gelöst wird, wenn die Aufgabe nicht vorhanden ist.
Aufgabenfelder, die Sie verwenden können:
total – Gesamtfortschritt der Aufgabe
Fortschritt – aktueller Aufgabenfortschritt
result – das Aufgabenergebnis
Fehler – der Aufgabenfehler
Aufgabe abbrechen. Gibt ein Versprechen zurück, das mit der Aufgabe gelöst wurde.
Notiz. Sie können nur Aufgaben ohne übergeordnete Aufgaben abbrechen.
Starten Sie den Worker und beginnen Sie mit der Datenverarbeitung durch die Aufgabe. Return Promise
, wird aufgelöst, wenn die Warteschlange bereit ist (rufen Sie .ready()
darin auf).
Wenn im Cunstructor pool
angegeben wurde, werden nur an diesen Pull weitergeleitete Aufgaben verbraucht.
Hören Sie auf, neue Aufgaben aus der Warteschlange anzunehmen. Return Promise
, wird aufgelöst, wenn alle aktiven Aufgaben in diesem Worker abgeschlossen sind.
Return Promise
, wird aufgelöst, wenn die Warteschlange betriebsbereit ist (nach dem „connect“-Ereignis, siehe unten).
Konstruktoroptionen aktualisieren, außer redisURL.
idoit
ist eine EventEmitter
Instanz, die einige Ereignisse auslöst:
ready
wenn die Redis-Verbindung besteht und Befehle ausgeführt werden können (Aufgaben können ohne Verbindung registriert werden)
error
wenn ein Fehler aufgetreten ist.
task:progress
, task:progress:<task_id>
– wenn der Aktualisierungsfortschritt der Aufgabe voranschreitet. Ereignisdaten sind: { id, uid, total, progress }
task:end
, task:end:<task_id>
– wenn die Aufgabe endet. Ereignisdaten sind: { id, uid }
Erstellen Sie eine neue Aufgabe mit optionalen Parametern.
Aufgabeneigenschaften überschreiben. Beispielsweise möchten Sie möglicherweise bestimmte Gruppen-/Kettenaufgaben einem anderen Pool zuweisen.
Aufgabe sofort ausführen. Gibt ein mit der Aufgaben-ID gelöstes Versprechen zurück.
Verschieben Sie die Aufgabenausführung, um Millisekunden zu delay
(oder auf task.postponeDelay
).
Gibt ein mit der Aufgaben-ID gelöstes Versprechen zurück.
Starten Sie die aktuell ausgeführte Aufgabe neu.
add_retry (Boolean) – optional, ob die Anzahl der Wiederholungsversuche erhöht werden soll oder nicht (Standard: false)
Bei true
wird die Anzahl der Wiederholungsversuche erhöht und die Aufgabe wird nicht neu gestartet, falls sie überschritten wird
Bei false
bleibt die Anzahl der Wiederholungsversuche gleich, sodass sich eine Aufgabe auf unbestimmte Zeit neu starten kann
Verzögerung (Zahl) Verzögerung vor dem Neustart in Millisekunden (Standard: task.retryDelay
).
Beachten Sie, dass idoit
bereits über eine integrierte Neustartlogik bei Aufgabenfehlern verfügt. Wahrscheinlich sollten Sie diese Methode nicht direkt verwenden. Es wird für ganz bestimmte Fälle aufgedeckt.
Erhöhen Sie den aktuellen Aufgabenfortschritt.
Gibt ein mit der Aufgaben-ID gelöstes Versprechen zurück.
Aktuelle Aufgabenfrist aktualisieren.
Gibt ein mit der Aufgaben-ID gelöstes Versprechen zurück.
Erstellen Sie eine neue Aufgabe und führen Sie gleichzeitig untergeordnete Elemente aus.
queue.group([ queue.children1(), queue.children2(), queue.children3()]).run()
Das Gruppenergebnis ist ein unsortiertes Array von untergeordneten Ergebnissen.
Erstellen Sie eine neue Aufgabe und führen Sie die Kinder nacheinander aus. Wenn eines der untergeordneten Elemente ausfällt, fällt auch die Kette aus.
queue.registerTask('multiply', (a, b) => a * b);queue.registerTask('subtract', (a, b) => a - b);queue.chain([ queue.multiply(2, 3), // 2 * 3 = 6 queue.subtract(10), // 10 - 6 = 4 queue.multiply(3) // 3 * 4 = 12]).run()
Ergebnis der vorherigen Aufgabe als letztes Argument der nächsten Aufgabe übergeben. Das Ergebnis der Kette ist das Ergebnis der letzten Aufgabe in der Kette.
Eine besondere Möglichkeit, umfangreiche Mappings im Lazy-Stil (auf Abruf) auszuführen. Siehe Kommentare unten.
// Iterator registrieren taskqueue.registerTask({ Name: 'lazy_mapper', Basisklasse: Queue.Iterator, // Diese Methode wird am Anfang der Aufgabe und am Ende jedes untergeordneten Elements aufgerufen. Es kann sein // eine Generatorfunktion oder Funktion, die „Promise“ zurückgibt. * iterate(state) {// ...// Drei Arten von Ausgabezuständen möglich: beendet, nichts tun & neue Daten.//// 1. „null“ – Ende erreicht, Iterator sollte nicht mehr aufgerufen werden.// 2. „{}“ – inaktiv, es sind genügend Unteraufgaben in der Warteschlange, versuchen Sie, den // Iterator später aufzurufen (wenn das nächste untergeordnete Element fertig ist).// 3. {// Status – neuer zu merkender Iteratorstatus (z. B. Offset). für// Datenbankabfrage), alle serialisierbaren Daten// Aufgaben – Array neuer Unteraufgaben, die in die Warteschlange verschoben werden sollen// }//// WICHTIG! Der Iterator kann von verschiedenen Workern parallel aufgerufen werden. Wir// verwenden die Eingabe „state“, um Kollisionen bei der Redis-Aktualisierung aufzulösen. Wenn Sie also// neue Unteraufgaben erstellen: //// 1. Der neue „Zustand“ MUSS unterschiedlich sein (für alle vorherigen Zustände)// 2. Das Array „Aufgaben“ DARF NICHT leer sein.//// In anderen Fällen Sie sollte über „Ende“ oder „Leerlauf“ signalisieren.//// Ungültige Kombination führt zu „Ende“ + Fehlerereignis.//return { state: newState, task: chunksArray}; }});// run iteratorqueue.lazy_mapper().run();
Warum wurde diese verrückte Magie erfunden?
Stellen Sie sich vor, Sie müssten 10 Millionen Forenbeiträge neu erstellen. Sie möchten die Arbeit in gleiche kleine Teile aufteilen, aber Beiträge haben keine sequentielle Ganzzahlaufzählung, sondern nur Mongo-IDs. Was können Sie tun?
Direkte skip
und limit
-Anfragen sind bei großen Sammlungen in jeder Datenbank sehr teuer.
Eine Aufteilung nach Datumsintervallen ist nicht möglich, da die Beitragsdichte vom ersten bis zum letzten Beitrag stark variiert.
Sie können jedem Beitrag ein indiziertes Feld mit einer Zufallszahl hinzufügen. Dann nach Intervallen aufteilen. Das wird funktionieren, aber es wird einen zufälligen Festplattenzugriff verursachen – nicht cool.
Die Lösung besteht darin, einen iterativen Mapper zu verwenden, der sich die „vorherige Position“ merken kann. In diesem Fall werden Sie range
+ limit
-Anfragen anstelle von skip
+ limit
ausführen. Das funktioniert gut mit Datenbanken. Zusätzliche Boni sind:
Sie müssen nicht alle Unteraufgaben in der Warteschlange halten. Sie können beispielsweise 100 Blöcke erstellen und die nächsten 100 hinzufügen, wenn die vorherigen fast fertig sind.
Die Mapping-Phase wird verteilt und Sie können sofort mit der Überwachung des Gesamtfortschritts beginnen.
Redis schnell über Docker ausführen:
# startdocker run -d -p 6379:6379 --name redis1 redis# stopdocker stop redis1 Docker RM Redis1
Natürlich kennen wir Kue, Sellerie und Akka. Unser Ziel war es, ein Gleichgewicht zwischen Einfachheit und Leistung zu erreichen. Wir wissen also nicht, ob idoit
im Cluster mit Tausenden von Instanzen gut funktioniert. Bei kleineren Mengen sollte es aber in Ordnung sein und die Anwendung ist wirklich einfach.
kue war für unsere Bedürfnisse nicht in Ordnung, weil:
Das „Prioritäten“-Konzept ist nicht flexibel und schützt nicht gut vor Sperren durch schwere Aufgaben
Keine Aufgabengruppierung/-verkettung usw
keine starken Garantien für die Datenkonsistenz
Im idoit
haben wir uns um Folgendes gekümmert:
Aufgaben gruppieren/verketten Operationen und übergeben Daten zwischen Aufgaben (ähnlich wie Sellerie)
Worker-Pools, um die Aufgabenausführung nach Typen zu isolieren.
einfach zu verwenden und zu installieren (nur Redis erforderlich, kann im bestehenden Prozess ausgeführt werden)
letztendliche Konsistenz der gespeicherten Daten
essentieller Zucker wie integrierter Zeitplaner
Iterativer Mapper für große Nutzlasten (einzigartige Funktion, sehr nützlich für viele Wartungsaufgaben)
Verfolgung des Aufgabenfortschritts
Vermeiden Sie globale Sperren
Redis kann immer noch ein Fehlerpunkt sein, aber der Einfachheit halber ist das ein akzeptabler Preis. Natürlich können Sie über verteilte Nachrichtenbusse wie RMQ eine bessere Verfügbarkeit erreichen. Aber in vielen Fällen ist es wichtiger, die Dinge einfach zu halten. Mit idoit
können Sie bestehende Technologien ohne zusätzliche Kosten wiederverwenden.