Solid Queue ist ein DB-basiertes Warteschlangen-Backend für Active Job, das auf Einfachheit und Leistung ausgelegt ist.
Neben dem regulären Einreihen und Verarbeiten von Jobs unterstützt Solid Queue verzögerte Jobs, Parallelitätskontrollen, wiederkehrende Jobs, pausierende Warteschlangen, numerische Prioritäten pro Job, Prioritäten nach Warteschlangenreihenfolge und Masseneinreihen ( enqueue_all
für perform_all_later
des aktiven Jobs).
Solid Queue kann mit SQL-Datenbanken wie MySQL, PostgreSQL oder SQLite verwendet werden und nutzt die FOR UPDATE SKIP LOCKED
-Klausel, falls verfügbar, um Blockierungen und das Warten auf Sperren beim Abfragen von Jobs zu vermeiden. Es basiert auf Active Job für Wiederholungsversuche, Verwerfen, Fehlerbehandlung, Serialisierung oder Verzögerungen und ist mit dem Multithreading von Ruby on Rails kompatibel.
Solid Queue ist in neuen Rails 8-Anwendungen standardmäßig konfiguriert. Wenn Sie jedoch eine frühere Version ausführen, können Sie diese manuell hinzufügen, indem Sie die folgenden Schritte ausführen:
bundle add solid_queue
bin/rails solid_queue:install
Dadurch wird Solid Queue als Produktions-Active-Job-Backend konfiguriert, die Konfigurationsdateien config/queue.yml
und config/recurring.yml
erstellt und db/queue_schema.rb
erstellt. Außerdem wird ein bin/jobs
-Wrapper für die ausführbare Datei erstellt, den Sie zum Starten von Solid Queue verwenden können.
Sobald Sie dies getan haben, müssen Sie die Konfiguration für die Warteschlangendatenbank in config/database.yml
hinzufügen. Wenn Sie SQLite verwenden, sieht es so aus:
production :
primary :
<< : *default
database : storage/production.sqlite3
queue :
<< : *default
database : storage/production_queue.sqlite3
migrations_paths : db/queue_migrate
...oder wenn Sie MySQL/PostgreSQL/Trilogy verwenden:
production :
primary : &primary_production
<< : *default
database : app_production
username : app
password : <%= ENV["APP_DATABASE_PASSWORD"] %>
queue :
<< : *primary_production
database : app_production_queue
migrations_paths : db/queue_migrate
Hinweis: Durch den Aufruf bin/rails solid_queue:install
wird config.solid_queue.connects_to = { database: { writing: :queue } }
automatisch zu config/environments/production.rb
hinzugefügt, sodass dort keine zusätzliche Konfiguration erforderlich ist (obwohl Sie sicherstellen müssen). dass Sie den queue
in database.yml
verwenden, damit dieser übereinstimmt!). Wenn Sie Solid Queue jedoch in einer anderen Umgebung (z. B. Staging oder sogar Entwicklung) verwenden möchten, müssen Sie diese Zeile config.solid_queue.connects_to
manuell zur entsprechenden Umgebungsdatei hinzufügen. Und stellen Sie wie immer sicher, dass der Name, den Sie für die Datenbank in config/database.yml
verwenden, mit dem Namen übereinstimmt, den Sie in config.solid_queue.connects_to
verwenden.
Führen Sie dann db:prepare
in der Produktion aus, um sicherzustellen, dass die Datenbank erstellt und das Schema geladen wird.
Jetzt können Sie mit der Verarbeitung von Jobs beginnen, indem Sie bin/jobs
auf dem Server ausführen, der die Arbeit erledigt. Dadurch werden Aufträge in allen Warteschlangen mit der Standardkonfiguration verarbeitet. Weitere Informationen zum Konfigurieren von Solid Queue finden Sie weiter unten.
Bei kleinen Projekten können Sie Solid Queue auf demselben Computer wie Ihren Webserver ausführen. Wenn Sie zur Skalierung bereit sind, unterstützt Solid Queue sofort die horizontale Skalierung. Sie können Solid Queue auf einem von Ihrem Webserver getrennten Server ausführen oder bin/jobs
sogar auf mehreren Computern gleichzeitig ausführen. Abhängig von der Konfiguration können Sie einige Maschinen so festlegen, dass sie nur Dispatcher oder nur Worker ausführen. Weitere Einzelheiten hierzu finden Sie im Abschnitt „Konfiguration“.
Hinweis : Zukünftige Änderungen am Schema werden in Form regelmäßiger Migrationen erfolgen.
Es wird empfohlen, Solid Queue in einer separaten Datenbank auszuführen, es ist jedoch auch möglich, eine einzige Datenbank sowohl für die App als auch für die Warteschlange zu verwenden. Befolgen Sie einfach diese Schritte:
db/queue_schema.rb
in eine normale Migration und löschen Sie db/queue_schema.rb
config.solid_queue.connects_to
aus production.rb
bin/jobs
auszuführen Sie verfügen nicht über mehrere Datenbanken, daher benötigt database.yml
keine Primär- und Warteschlangendatenbank.
Wenn Sie planen, Solid Queue schrittweise einzuführen, indem Sie jeweils einen Job wechseln, können Sie dies tun, indem Sie den config.active_job.queue_adapter
auf Ihrem alten Backend festgelegt lassen und dann den queue_adapter
direkt in den Jobs festlegen, die Sie verschieben:
# app/jobs/my_job.rb
class MyJob < ApplicationJob
self . queue_adapter = :solid_queue
# ...
end
Solid Queue wurde für den höchsten Durchsatz bei Verwendung mit MySQL 8+ oder PostgreSQL 9.5+ entwickelt, da diese FOR UPDATE SKIP LOCKED
unterstützen. Sie können es mit älteren Versionen verwenden, aber in diesem Fall kann es zu Sperrwartezeiten kommen, wenn Sie mehrere Worker für dieselbe Warteschlange ausführen. Sie können es auch mit SQLite für kleinere Anwendungen verwenden.
Wir haben verschiedene Arten von Akteuren in Solid Queue:
solid_queue_ready_executions
.solid_queue_scheduled_executions
in die Tabelle solid_queue_ready_executions
verschoben, damit die Mitarbeiter sie abholen können. Darüber hinaus führen sie einige Wartungsarbeiten im Zusammenhang mit der Parallelitätskontrolle durch.Der Vorgesetzte von Solid Queue wird für jeden beaufsichtigten Arbeiter/Dispatcher/Planer einen separaten Prozess aufteilen.
Standardmäßig versucht Solid Queue, Ihre Konfiguration unter config/queue.yml
zu finden, Sie können jedoch mithilfe der Umgebungsvariablen SOLID_QUEUE_CONFIG
oder mithilfe der Option -c/--config_file
mit bin/jobs
einen anderen Pfad festlegen, etwa so:
bin/jobs -c config/calendar.yml
So sieht diese Konfiguration aus:
production :
dispatchers :
- polling_interval : 1
batch_size : 500
concurrency_maintenance_interval : 300
workers :
- queues : " * "
threads : 3
polling_interval : 2
- queues : [ real_time, background ]
threads : 5
polling_interval : 0.1
processes : 3
Alles ist optional. Wenn überhaupt keine Konfiguration bereitgestellt wird, läuft Solid Queue mit einem Dispatcher und einem Worker mit Standardeinstellungen. Wenn Sie nur Dispatcher oder Worker ausführen möchten, müssen Sie nur diesen Abschnitt allein in die Konfiguration aufnehmen. Beispielsweise mit folgender Konfiguration:
production :
dispatchers :
- polling_interval : 1
batch_size : 500
concurrency_maintenance_interval : 300
Der Vorgesetzte wird einen Dispatcher und keine Arbeiter leiten.
Hier ein Überblick über die verschiedenen Optionen:
polling_interval
: Das Zeitintervall in Sekunden, das Arbeiter und Disponenten warten, bevor sie nach weiteren Aufträgen suchen. Diese Zeit beträgt standardmäßig 1
Sekunde für Disponenten und 0.1
Sekunden für Arbeiter.
batch_size
: Der Dispatcher versendet Jobs in Stapeln dieser Größe. Der Standardwert ist 500.
concurrency_maintenance_interval
: Das Zeitintervall in Sekunden, das der Dispatcher wartet, bevor er nach blockierten Jobs sucht, die entsperrt werden können. Lesen Sie mehr über Parallelitätskontrollen, um mehr über diese Einstellung zu erfahren. Der Standardwert beträgt 600
Sekunden.
queues
: Die Liste der Warteschlangen, aus denen Arbeiter Jobs auswählen. Sie können *
verwenden, um alle Warteschlangen anzugeben (dies ist auch die Standardeinstellung und das Verhalten, das Sie erhalten, wenn Sie dies weglassen). Sie können eine einzelne Warteschlange oder eine Liste von Warteschlangen als Array bereitstellen. Jobs werden der Reihe nach aus diesen Warteschlangen abgefragt, sodass beispielsweise bei [ real_time, background ]
keine Jobs aus background
übernommen werden, es sei denn, in real_time
warten keine weiteren Jobs. Sie können auch ein Präfix mit einem Platzhalter angeben, um Warteschlangen zuzuordnen, die mit einem Präfix beginnen. Zum Beispiel:
staging :
workers :
- queues : staging*
threads : 3
polling_interval : 5
Dadurch wird ein Worker erstellt, der Jobs aus allen Warteschlangen abruft, beginnend mit staging
. Der Platzhalter *
ist nur allein oder am Ende eines Warteschlangennamens zulässig; Sie können keine Warteschlangennamen wie *_some_queue
angeben. Diese werden ignoriert.
Schließlich können Sie Präfixe mit genauen Namen kombinieren, z. B. [ staging*, background ]
, und das Verhalten in Bezug auf die Reihenfolge ist das gleiche wie bei nur genauen Namen.
In den folgenden Abschnitten erfahren Sie, wie sich die Warteschlangenreihenfolge in Kombination mit Prioritäten verhält und wie sich die Art und Weise, wie Sie die Warteschlangen pro Worker angeben, auf die Leistung auswirken kann.
threads
: Dies ist die maximale Größe des Thread-Pools, den jeder Worker zum Ausführen von Jobs haben muss. Jeder Worker ruft höchstens diese Anzahl an Jobs aus seiner/seinen Warteschlange(n) ab und stellt sie zur Ausführung in den Thread-Pool. Standardmäßig ist dies 3
. Nur Arbeiter haben diese Einstellung.
processes
: Dies ist die Anzahl der Arbeitsprozesse, die vom Supervisor mit den angegebenen Einstellungen geforkt werden. Standardmäßig ist dies 1
, nur ein einzelner Prozess. Diese Einstellung ist nützlich, wenn Sie einer oder mehreren Warteschlangen mit derselben Konfiguration mehr als einen CPU-Kern zuweisen möchten. Nur Arbeiter haben diese Einstellung.
concurrency_maintenance
: Ob der Dispatcher die Parallelitätswartungsarbeiten durchführen wird. Dies true
standardmäßig und ist nützlich, wenn Sie keine Parallelitätskontrollen verwenden und diese deaktivieren möchten oder wenn Sie mehrere Dispatcher ausführen und möchten, dass einige von ihnen nur Jobs versenden, ohne etwas anderes zu tun.
Wenn Sie wie oben erwähnt eine Liste von Warteschlangen für einen Worker angeben, werden diese in der angegebenen Reihenfolge abgefragt. Beispielsweise werden für die Liste real_time,background
keine Jobs aus background
übernommen, es sei denn, es warten keine weiteren Jobs darauf real_time
.
Active Job unterstützt auch positive Ganzzahlprioritäten beim Einreihen von Jobs in die Warteschlange. In Solid Queue gilt: Je kleiner der Wert, desto höher die Priorität. Der Standardwert ist 0
.
Dies ist nützlich, wenn Sie Jobs mit unterschiedlicher Wichtigkeit oder Dringlichkeit in derselben Warteschlange ausführen. Innerhalb derselben Warteschlange werden Jobs in der Reihenfolge ihrer Priorität ausgewählt, aber in einer Liste von Warteschlangen hat die Warteschlangenreihenfolge Vorrang, sodass im vorherigen Beispiel mit real_time,background
Jobs in der real_time
Warteschlange vor Jobs im background
ausgewählt werden Warteschlange, auch wenn für diejenigen in der background
eine höhere Priorität (kleinerer Wert) festgelegt ist.
Wir empfehlen, die Reihenfolge der Warteschlangen nicht mit den Prioritäten zu verwechseln, sondern entweder die eine oder die andere zu wählen, da dies die Reihenfolge der Auftragsausführung für Sie einfacher macht.
Um die Polling-Leistung aufrechtzuerhalten und sicherzustellen, dass immer ein abdeckender Index verwendet wird, führt Solid Queue nur zwei Arten von Polling-Abfragen durch:
-- No filtering by queue
SELECT job_id
FROM solid_queue_ready_executions
ORDER BY priority ASC , job_id ASC
LIMIT ?
FOR UPDATE SKIP LOCKED;
-- Filtering by a single queue
SELECT job_id
FROM solid_queue_ready_executions
WHERE queue_name = ?
ORDER BY priority ASC , job_id ASC
LIMIT ?
FOR UPDATE SKIP LOCKED;
Die erste Option (keine Filterung nach Warteschlange) wird verwendet, wenn Sie dies angeben
queues : *
und es werden keine Warteschlangen pausiert, da wir alle Warteschlangen ansprechen möchten.
In anderen Fällen benötigen wir eine Liste der Warteschlangen, nach denen wir der Reihe nach filtern können, da wir immer nur nach einer einzelnen Warteschlange filtern können, um sicherzustellen, dass wir zum Sortieren einen Index verwenden. Das bedeutet, wenn Sie Ihre Warteschlangen wie folgt angeben:
queues : beta*
Wir müssen zuerst eine Liste aller vorhandenen Warteschlangen abrufen, die diesem Präfix entsprechen, mit einer Abfrage, die wie folgt aussehen würde:
SELECT DISTINCT (queue_name)
FROM solid_queue_ready_executions
WHERE queue_name LIKE ' beta% ' ;
Diese Art von DISTINCT
-Abfrage für eine Spalte, die in einem Index ganz links steht, kann in MySQL dank einer Technik namens „Loose Index Scan“ sehr schnell durchgeführt werden. PostgreSQL und SQLite implementieren diese Technik jedoch nicht. Das bedeutet, dass diese Abfrage langsam wird, wenn Ihre Tabelle solid_queue_ready_executions
sehr groß ist, weil Ihre Warteschlangen sehr lang werden. Normalerweise ist Ihre Tabelle solid_queue_ready_executions
klein, aber das kann passieren.
Ähnlich wie bei der Verwendung von Präfixen passiert das Gleiche, wenn Sie Warteschlangen angehalten haben, da wir eine Liste aller Warteschlangen mit einer Abfrage wie erhalten müssen
SELECT DISTINCT (queue_name)
FROM solid_queue_ready_executions
und dann die pausierten entfernen. Pausieren sollte im Allgemeinen etwas Seltenes sein, nur unter besonderen Umständen und für einen kurzen Zeitraum. Wenn Sie Aufträge aus einer Warteschlange nicht mehr verarbeiten möchten, können Sie dies am besten tun, indem Sie sie aus Ihrer Warteschlangenliste entfernen.
Zusammenfassend lässt sich sagen: Wenn Sie eine optimale Leistung bei der Abfrage sicherstellen möchten , ist es am besten, immer genaue Namen für sie anzugeben und keine Warteschlangen anzuhalten.
Gehen Sie folgendermaßen vor:
queues : background, backend
stattdessen:
queues : back*
Worker in Solid Queue verwenden einen Thread-Pool, um Arbeit in mehreren Threads auszuführen, konfigurierbar über den threads
-Parameter oben. Darüber hinaus kann Parallelität durch mehrere Prozesse auf einer Maschine (konfigurierbar über verschiedene Worker oder die oben genannten processes
) oder durch horizontale Skalierung erreicht werden.
Der Vorgesetzte ist für die Steuerung dieser Prozesse verantwortlich und reagiert auf folgende Signale:
TERM
, INT
: startet die ordnungsgemäße Beendigung. Der Supervisor sendet ein TERM
Signal an seine überwachten Prozesse und wartet bis zur SolidQueue.shutdown_timeout
-Zeit, bis sie abgeschlossen sind. Wenn bis dahin noch überwachte Prozesse vorhanden sind, wird ihnen ein QUIT
-Signal gesendet, um anzuzeigen, dass sie beendet werden müssen.QUIT
: Startet die sofortige Beendigung. Der Supervisor sendet ein QUIT
Signal an seine überwachten Prozesse, wodurch diese sofort beendet werden. Wenn Arbeiter beim Empfang eines QUIT
Signals noch Jobs in Bearbeitung haben, werden diese beim Abmelden der Prozesse in die Warteschlange zurückgebracht.
Wenn Prozesse vor dem Beenden keine Chance haben, sich zu bereinigen (z. B. wenn jemand irgendwo an einem Kabel zieht), bleiben Aufträge im laufenden Betrieb möglicherweise weiterhin von den Prozessen beansprucht, die sie ausführen. Prozesse senden Heartbeats, und der Supervisor prüft und bereinigt Prozesse mit abgelaufenen Heartbeats, wodurch alle beanspruchten Jobs wieder in ihre Warteschlangen freigegeben werden. Sie können sowohl die Häufigkeit der Heartbeats als auch den Schwellenwert konfigurieren, um einen Prozess als tot zu betrachten. Siehe dazu den Abschnitt unten.
Sie können die von Solid Queue verwendete Datenbank über die Option config.solid_queue.connects_to
in den Konfigurationsdateien config/application.rb
oder config/environments/production.rb
konfigurieren. Standardmäßig wird zum Schreiben und Lesen eine einzige Datenbank namens queue
verwendet, die der Datenbankkonfiguration entspricht, die Sie während der Installation eingerichtet haben.
Hier können alle Möglichkeiten von Active Record für mehrere Datenbanken genutzt werden.
In der Solid-Warteschlange können Sie an zwei verschiedenen Punkten im Leben des Vorgesetzten teilnehmen:
start
: nachdem der Supervisor den Bootvorgang abgeschlossen hat und kurz bevor er Arbeiter und Disponenten trennt.stop
: nach dem Empfang eines Signals ( TERM
, INT
oder QUIT
) und unmittelbar vor dem ordnungsgemäßen oder sofortigen Herunterfahren.Und in zwei verschiedene Punkte im Leben eines Arbeiters:
worker_start
: nachdem der Worker den Bootvorgang abgeschlossen hat und kurz bevor er die Abfrageschleife startet.worker_stop
: nach dem Empfang eines Signals ( TERM
, INT
oder QUIT
) und direkt vor dem ordnungsgemäßen oder sofortigen Herunterfahren (was einfach nur exit!
).Sie können dazu die folgenden Methoden mit einem Block verwenden:
SolidQueue . on_start
SolidQueue . on_stop
SolidQueue . on_worker_start
SolidQueue . on_worker_stop
Zum Beispiel:
SolidQueue . on_start { start_metrics_server }
SolidQueue . on_stop { stop_metrics_server }
Diese können mehrmals aufgerufen werden, um mehrere Hooks hinzuzufügen. Dies muss jedoch geschehen, bevor Solid Queue gestartet wird. Ein Initialisierer wäre hierfür ein guter Ort.
Hinweis : Die Einstellungen in diesem Abschnitt sollten in Ihrer config/application.rb
oder Ihrer Umgebungskonfiguration wie folgt festgelegt werden: config.solid_queue.silence_polling = true
Es gibt mehrere Einstellungen, die die Funktionsweise von Solid Queue steuern und die Sie ebenfalls festlegen können:
logger
: Der Logger, den Solid Queue verwenden soll. Standardmäßig wird der App-Logger verwendet.
app_executor
: Der Rails-Executor, der zum Umschließen asynchroner Vorgänge verwendet wird, standardmäßig der App-Executor
on_thread_error
: Benutzerdefiniertes Lambda/Proc, das aufgerufen wird, wenn in einem Solid Queue-Thread ein Fehler auftritt, der die ausgelöste Ausnahme als Argument verwendet. Standardmäßig ist
-> ( exception ) { Rails . error . report ( exception , handled : false ) }
Dies wird nicht für Fehler verwendet, die innerhalb einer Jobausführung auftreten . Fehler, die in Jobs auftreten, werden von retry_on
oder discard_on
des aktiven Jobs behandelt und führen letztendlich zu fehlgeschlagenen Jobs. Dies gilt für Fehler, die innerhalb von Solid Queue selbst auftreten.
use_skip_locked
: Gibt an, ob FOR UPDATE SKIP LOCKED
beim Durchführen von Sperrlesevorgängen verwendet werden soll. Dies wird in Zukunft automatisch erkannt und Sie müssen dies vorerst nur dann auf false
setzen, wenn Ihre Datenbank dies nicht unterstützt. Für MySQL wären das Versionen < 8 und für PostgreSQL Versionen < 9.5. Wenn Sie SQLite verwenden, hat dies keine Auswirkung, da die Schreibvorgänge sequentiell erfolgen.
process_heartbeat_interval
: Das Heartbeat-Intervall, dem alle Prozesse folgen – standardmäßig 60 Sekunden.
process_alive_threshold
: Wie lange soll gewartet werden, bis ein Prozess nach seinem letzten Heartbeat als tot gilt – standardmäßig sind es 5 Minuten.
shutdown_timeout
: Zeit, die der Supervisor wartet, seit er das TERM
-Signal an seine überwachten Prozesse gesendet hat, bevor er eine QUIT
-Version an sie sendet, die eine sofortige Beendigung anfordert – standardmäßig 5 Sekunden.
silence_polling
: Gibt an, ob Active Record-Protokolle stummgeschaltet werden sollen, die bei der Abfrage von Workern und Dispatchern ausgegeben werden – der Standardwert ist true
.
supervisor_pidfile
: Pfad zu einer PID-Datei, die der Supervisor beim Booten erstellt, um zu verhindern, dass mehr als ein Supervisor auf demselben Host ausgeführt wird, oder für den Fall, dass Sie sie für eine Gesundheitsprüfung verwenden möchten. Standardmäßig ist es nil
.
preserve_finished_jobs
: Gibt an, ob abgeschlossene Jobs in der Tabelle solid_queue_jobs
verbleiben sollen – der Standardwert ist true
.
clear_finished_jobs_after
: Zeitraum, um abgeschlossene Jobs aufzubewahren, falls preserve_finished_jobs
wahr ist – standardmäßig 1 Tag. Hinweis: Derzeit gibt es keine automatische Bereinigung abgeschlossener Jobs. Sie müssten dies tun, indem Sie regelmäßig SolidQueue::Job.clear_finished_in_batches
aufrufen, aber dies wird in naher Zukunft automatisch geschehen.
default_concurrency_control_period
: Der Wert, der als Standard für den duration
in Parallelitätskontrollen verwendet werden soll. Der Standardwert beträgt 3 Minuten.
Solid Queue löst einen SolidQueue::Job::EnqueueError
für alle Active Record-Fehler aus, die beim Einreihen eines Jobs in die Warteschlange auftreten. Der Grund dafür, dass ActiveJob::EnqueueError
nicht ausgelöst wird, liegt darin, dass dieser von Active Job behandelt wird, was dazu führt, dass perform_later
false
zurückgibt und job.enqueue_error
setzt, wodurch der Job an einen Block übergeben wird, den Sie an perform_later
übergeben müssen. Dies funktioniert sehr gut für Ihre eigenen Jobs, macht es jedoch sehr schwierig, Fehler bei Jobs zu behandeln, die von Rails oder anderen Gems in die Warteschlange gestellt werden, wie z. B. Turbo::Streams::BroadcastJob
oder ActiveStorage::AnalyzeJob
, da Sie den Aufruf von perform_later
nicht steuern in diesen Fällen.
Wenn bei wiederkehrenden Aufgaben ein solcher Fehler beim Einreihen des der Aufgabe entsprechenden Jobs in die Warteschlange auftritt, wird er behandelt und protokolliert, es kommt jedoch nicht zu einer Blasenbildung.
Solid Queue erweitert Active Job um Parallelitätskontrollen, mit denen Sie begrenzen können, wie viele Jobs eines bestimmten Typs oder mit bestimmten Argumenten gleichzeitig ausgeführt werden können. Bei einer solchen Einschränkung wird die Ausführung von Jobs blockiert und sie bleiben blockiert, bis ein anderer Job beendet wird und die Blockierung aufhebt oder bis die festgelegte Ablaufzeit ( Dauer des Parallelitätslimits) abgelaufen ist. Jobs werden niemals verworfen oder gehen verloren, sondern nur blockiert.
class MyJob < ApplicationJob
limits_concurrency to : max_concurrent_executions , key : -> ( arg1 , arg2 , ** ) { ... } , duration : max_interval_to_guarantee_concurrency_limit , group : concurrency_group
# ...
key
ist der einzige erforderliche Parameter und kann ein Symbol, eine Zeichenfolge oder ein Prozess sein, der die Jobargumente als Parameter empfängt und zur Identifizierung der Jobs verwendet wird, die zusammen begrenzt werden müssen. Wenn der Prozess einen Active Record-Datensatz zurückgibt, wird der Schlüssel aus seinem Klassennamen und id
erstellt.to
ist standardmäßig 1
.duration
ist standardmäßig auf SolidQueue.default_concurrency_control_period
eingestellt, was selbst standardmäßig 3 minutes
beträgt, aber auch konfiguriert werden kann.group
wird verwendet, um die Parallelität verschiedener Jobklassen gemeinsam zu steuern. Standardmäßig wird der Name der Jobklasse verwendet. Wenn ein Job diese Steuerelemente enthält, stellen wir sicher, dass höchstens die Anzahl der Jobs (angegeben to
), die denselben key
ergeben, gleichzeitig ausgeführt werden, und diese Garantie gilt für duration
für jeden in die Warteschlange gestellten Job. Beachten Sie, dass es keine Garantie für die Ausführungsreihenfolge gibt, sondern nur dafür, dass Jobs gleichzeitig ausgeführt werden (überlappend).
Die Parallelitätsgrenzen verwenden beim Einreihen in die Warteschlange das Konzept von Semaphoren und funktionieren wie folgt: Wenn ein Job in die Warteschlange gestellt wird, prüfen wir, ob er Parallelitätskontrollen angibt. Wenn dies der Fall ist, überprüfen wir das Semaphor auf den berechneten Parallelitätsschlüssel. Wenn das Semaphor geöffnet ist, beanspruchen wir es und setzen den Job auf „bereit“ . „Bereit“ bedeutet, dass es von den Arbeitern zur Ausführung abgeholt werden kann. Wenn die Ausführung des Jobs abgeschlossen ist (sei es erfolgreich oder erfolglos, was zu einer fehlgeschlagenen Ausführung führt), signalisieren wir dem Semaphor und versuchen, die Blockierung des nächsten Jobs mit demselben Schlüssel (falls vorhanden) aufzuheben. Das Entsperren des nächsten Jobs bedeutet nicht, dass dieser Job sofort ausgeführt wird, sondern dass er von „Blockiert“ in „Bereit“ verschoben wird. Da etwas passieren kann, das verhindert, dass der erste Job das Semaphor freigibt und den nächsten Job entsperrt (z. B. wenn jemand einen Stecker in der Maschine zieht, auf der der Worker läuft), haben wir die duration
als Ausfallsicherung. Jobs, die länger als die Dauer blockiert waren, sind Kandidaten für die Freigabe, allerdings nur so viele davon, wie die Parallelitätsregeln zulassen, da jeder einzelne den Semaphore-Dance-Check durchlaufen müsste. Das bedeutet, dass es bei der duration
nicht wirklich um den Job geht, der in der Warteschlange steht oder ausgeführt wird, sondern um die Jobs, die blockiert und warten.
Zum Beispiel:
class DeliverAnnouncementToContactJob < ApplicationJob
limits_concurrency to : 2 , key : -> ( contact ) { contact . account } , duration : 5 . minutes
def perform ( contact )
# ...
Wobei contact
und account
ActiveRecord
Datensätze sind. In diesem Fall stellen wir sicher, dass höchstens zwei Jobs der Art DeliverAnnouncementToContact
für dasselbe Konto gleichzeitig ausgeführt werden. Wenn einer dieser Jobs aus irgendeinem Grund länger als 5 Minuten dauert oder seine Parallelitätssperre (signalisiert das Semaphor) nicht innerhalb von 5 Minuten nach dem Erwerb aufhebt, kann ein neuer Job mit demselben Schlüssel die Sperre erhalten.
Sehen wir uns ein weiteres Beispiel für die Verwendung group
an:
class Box :: MovePostingsByContactToDesignatedBoxJob < ApplicationJob
limits_concurrency key : -> ( contact ) { contact } , duration : 15 . minutes , group : "ContactActions"
def perform ( contact )
# ...
class Bundle :: RebundlePostingsJob < ApplicationJob
limits_concurrency key : -> ( bundle ) { bundle . contact } , duration : 15 . minutes , group : "ContactActions"
def perform ( bundle )
# ...
Wenn in diesem Fall ein Box::MovePostingsByContactToDesignatedBoxJob
-Job für einen Kontaktdatensatz mit der ID 123
und ein anderer Bundle::RebundlePostingsJob
-Job gleichzeitig für einen Bundle-Datensatz in die Warteschlange gestellt werden, der auf den Kontakt 123
verweist, darf nur einer von ihnen fortfahren. Der andere bleibt blockiert, bis der erste fertig ist (oder 15 Minuten vergehen, je nachdem, was zuerst passiert).
Beachten Sie, dass die Einstellung duration
indirekt vom Wert für concurrency_maintenance_interval
abhängt, den Sie für Ihre Dispatcher festlegen, da dies die Häufigkeit ist, mit der blockierte Jobs überprüft und entsperrt werden. Im Allgemeinen sollten Sie duration
so festlegen, dass alle Ihre Jobs innerhalb dieser Dauer abgeschlossen werden, und die Aufgabe zur Aufrechterhaltung der Parallelität als Ausfallsicherung für den Fall betrachten, dass etwas schief geht.
Jobs werden in der Reihenfolge ihrer Priorität entsperrt, die Warteschlangenreihenfolge wird jedoch beim Entsperren von Jobs nicht berücksichtigt. Das heißt, wenn Sie über eine Gruppe von Jobs verfügen, die eine Parallelitätsgruppe teilen, sich aber in verschiedenen Warteschlangen befinden, oder über Jobs derselben Klasse, die Sie in verschiedene Warteschlangen einreihen, wird die Warteschlangenreihenfolge, die Sie für einen Worker festgelegt haben, beim Aufheben der Blockierung nicht berücksichtigt diejenigen. Der Grund dafür ist, dass ein Job, der ausgeführt wird, die Blockierung des nächsten aufhebt und der Job selbst nichts über die Warteschlangenreihenfolge eines bestimmten Arbeiters weiß (es könnten sogar verschiedene Arbeiter mit unterschiedlichen Warteschlangenreihenfolgen sein), er kann nur über die Priorität Bescheid wissen. Sobald blockierte Jobs entsperrt sind und zur Abfrage verfügbar sind, werden sie von einem Mitarbeiter entsprechend der Warteschlangenreihenfolge abgeholt.
Schließlich funktionieren fehlgeschlagene Jobs, die automatisch oder manuell wiederholt werden, auf die gleiche Weise wie neue Jobs, die in die Warteschlange gestellt werden: Sie werden in die Warteschlange gestellt, um ein offenes Semaphor zu erhalten, und wann immer sie es erhalten, werden sie ausgeführt. Es spielt keine Rolle, ob sie bereits in der Vergangenheit ein offenes Semaphor erhalten haben.
Solid Queue enthält keinen automatischen Wiederholungsmechanismus und verlässt sich hierfür auf Active Job. Fehlgeschlagene Jobs bleiben im System erhalten und für diese wird eine fehlgeschlagene Ausführung (ein Datensatz in der Tabelle solid_queue_failed_executions
) erstellt. Der Job bleibt dort, bis er manuell verworfen oder erneut in die Warteschlange gestellt wird. Sie können dies in einer Konsole wie folgt tun:
failed_execution = SolidQueue :: FailedExecution . find ( ... ) # Find the failed execution related to your job
failed_execution . error # inspect the error
failed_execution . retry # This will re-enqueue the job as if it was enqueued for the first time
failed_execution . discard # This will delete the job from the system
Wir empfehlen jedoch einen Blick auf mission_control-jobs, ein Dashboard, in dem Sie unter anderem fehlgeschlagene Jobs überprüfen und erneut versuchen/verwerfen können.
Einige Fehlerverfolgungsdienste, die in Rails integriert sind, wie Sentry oder Rollbar, binden sich an Active Job ein und melden automatisch nicht behandelte Fehler, die während der Jobausführung auftreten. Wenn Ihr Fehlerverfolgungssystem dies jedoch nicht unterstützt oder Sie benutzerdefinierte Berichte benötigen, können Sie sich selbst in Active Job einbinden. Ein möglicher Weg, dies zu tun, wäre:
# application_job.rb
class ApplicationJob < ActiveJob :: Base
rescue_from ( Exception ) do | exception |
Rails . error . report ( exception )
raise exception
end
end
Beachten Sie, dass Sie die obige Logik auch auf ActionMailer::MailDeliveryJob
duplizieren müssen. Das liegt daran, dass ActionMailer
nicht von ApplicationJob
erbt, sondern stattdessen ActionMailer::MailDeliveryJob
verwendet, das von ActiveJob::Base
erbt.
# application_mailer.rb
class ApplicationMailer < ActionMailer :: Base
ActionMailer :: MailDeliveryJob . rescue_from ( Exception ) do | exception |
Rails . error . report ( exception )
raise exception
end
end
Wir stellen ein Puma-Plugin zur Verfügung, wenn Sie den Supervisor der Solid Queue zusammen mit Puma ausführen und ihn von Puma überwachen und verwalten lassen möchten. Sie müssen nur hinzufügen
plugin :solid_queue
zu Ihrer puma.rb
-Konfiguration hinzufügen.
Da dies recht knifflig sein kann und viele Benutzer sich darüber keine Sorgen machen müssen, ist Solid Queue standardmäßig in einer anderen Datenbank als die Hauptanwendung konfiguriert. Dank der integrierten Funktion von Active Job wird das Einreihen von Jobs in die Warteschlange verschoben, bis eine laufende Transaktion festgeschrieben wird Fähigkeit, dies zu tun. Das bedeutet, dass Sie diese Transaktionsintegrität nicht nutzen, selbst wenn Sie Solid Queue in derselben Datenbank wie Ihre App ausführen.
Wenn Sie dies lieber ändern möchten, können Sie config.active_job.enqueue_after_transaction_commit
auf never
setzen. Sie können dies auch pro Auftrag festlegen.
Wenn Sie dies auf never
“ festlegen, aber dennoch sicherstellen möchten, dass Sie nicht versehentlich die Transaktionsintegrität beeinträchtigen, können Sie Folgendes sicherstellen:
Ihre Jobs, die auf bestimmten Daten basieren, werden immer bei after_commit
-Rückrufen oder auf andere Weise von einem Ort in die Warteschlange gestellt, an dem Sie sicher sind, dass alle Daten, die der Job verwenden wird, in die Datenbank übernommen wurden, bevor der Job in die Warteschlange gestellt wird.
Oder Sie konfigurieren eine andere Datenbank für Solid Queue, auch wenn diese mit Ihrer App identisch ist, und stellen so sicher, dass eine andere Verbindung im Thread, der Anforderungen verarbeitet oder Jobs für Ihre App ausführt, zum Einreihen von Jobs in die Warteschlange verwendet wird. Zum Beispiel:
class ApplicationRecord < ActiveRecord :: Base
self . abstract_class = true
connects_to database : { writing : :primary , reading : :replica }
config . solid_queue . connects_to = { database : { writing : :primary , reading : :replica } }
Solid Queue unterstützt die Definition wiederkehrender Aufgaben, die zu bestimmten Zeitpunkten in der Zukunft regelmäßig ausgeführt werden, wie z. B. Cron-Jobs. Diese werden vom Scheduler-Prozess verwaltet und in einer eigenen Konfigurationsdatei definiert. Standardmäßig befindet sich die Datei in config/recurring.yml
, Sie können jedoch einen anderen Pfad festlegen, indem Sie die Umgebungsvariable SOLID_QUEUE_RECURRING_SCHEDULE
verwenden oder die Option --recurring_schedule_file
mit bin/jobs
verwenden, etwa so:
bin/jobs --recurring_schedule_file=config/schedule.yml
Die Konfiguration selbst sieht so aus:
production :
a_periodic_job :
class : MyJob
args : [ 42, { status: "custom_status" } ]
schedule : every second
a_cleanup_task :
command : " DeletedStuff.clear_all "
schedule : every day at 9am
Aufgaben werden als Hash/Wörterbuch angegeben, wobei der Schlüssel intern der Schlüssel der Aufgabe ist. Jede Aufgabe muss entweder eine class
haben, die die in die Warteschlange gestellte Jobklasse ist, oder einen command
, der im Kontext eines Jobs ( SolidQueue::RecurringJob
) ausgewertet wird, der gemäß seinem Zeitplan in die Warteschlange gestellt wird die solid_queue_recurring
Warteschlange.
Jede Aufgabe muss auch einen Zeitplan haben, der mit Fugit analysiert wird, damit alles akzeptiert wird, was Fugit als Cron akzeptiert. Optional können Sie für jede Aufgabe Folgendes angeben:
args
: die Argumente, die an den Job übergeben werden sollen, als einzelnes Argument, als Hash oder als Array von Argumenten, das auch kwargs als letztes Element im Array enthalten kann.Der Job in der obigen Beispielkonfiguration wird jede Sekunde wie folgt in die Warteschlange gestellt:
MyJob . perform_later ( 42 , status : "custom_status" )
queue
: Eine andere Warteschlange, die beim Einreihen des Jobs in die Warteschlange verwendet werden soll. Wenn keine vorhanden ist, wird die Warteschlange für die Jobklasse eingerichtet.
priority
: ein numerischer Prioritätswert, der beim Einreihen des Jobs in die Warteschlange verwendet wird.
Aufgaben werden vom Planer zu den entsprechenden Zeiten in die Warteschlange gestellt, und jede Aufgabe plant die nächste. Dies ist ziemlich stark von dem inspiriert, was GoodJob tut.
Es ist möglich, mehrere Planer mit derselben recurring_tasks
-Konfiguration auszuführen, wenn Sie beispielsweise aus Redundanzgründen über mehrere Server verfügen und den scheduler
auf mehr als einem von ihnen ausführen. Um zu vermeiden, dass doppelte Aufgaben gleichzeitig in die Warteschlange gestellt werden, wird in derselben Transaktion, in der der Job in die Warteschlange gestellt wird, ein Eintrag in einer neuen Tabelle solid_queue_recurring_executions
erstellt. Diese Tabelle verfügt über einen eindeutigen Index für task_key
und run_at
, wodurch sichergestellt wird, dass nur ein Eintrag pro Aufgabe und Zeit erstellt wird. Dies funktioniert nur, wenn Sie preserve_finished_jobs
auf true
(Standardeinstellung) gesetzt haben und die Garantie gilt, solange Sie die Jobs behalten.
Hinweis : Ein einzelner wiederkehrender Zeitplan wird unterstützt, sodass Sie mehrere Planer haben können, die denselben Zeitplan verwenden, aber nicht mehrere Planer, die unterschiedliche Konfigurationen verwenden.
Schließlich ist es möglich, Jobs zu konfigurieren, die nicht von Solid Queue verarbeitet werden. Das heißt, Sie können einen Job wie diesen in Ihrer App haben:
class MyResqueJob < ApplicationJob
self . queue_adapter = :resque
def perform ( arg )
# ..
end
end
Sie können dies weiterhin in Solid Queue konfigurieren:
my_periodic_resque_job :
class : MyResqueJob
args : 22
schedule : " */5 * * * * "
und der Job wird über perform_later
in die Warteschlange gestellt, damit er in Resque ausgeführt wird. Allerdings verfolgen wir in diesem Fall keinen solid_queue_recurring_execution
Datensatz dafür und es gibt keine Garantie dafür, dass der Job jedes Mal nur einmal in die Warteschlange gestellt wird.
Solid Queue wurde von resque und GoodJob inspiriert. Wir empfehlen Ihnen, sich diese Projekte anzusehen, da es sich um großartige Beispiele handelt, aus denen wir viel gelernt haben.
Das Juwel ist als Open Source unter den Bedingungen der MIT-Lizenz verfügbar.