Solid Queue est un backend de file d'attente basé sur une base de données pour Active Job, conçu dans un souci de simplicité et de performances.
Outre la mise en file d'attente et le traitement réguliers des tâches, Solid Queue prend en charge les tâches retardées, les contrôles de concurrence, les tâches récurrentes, les files d'attente en pause, les priorités numériques par tâche, les priorités par ordre de file d'attente et la mise en file d'attente en masse ( enqueue_all
pour perform_all_later
d'Active Job).
Solid Queue peut être utilisé avec des bases de données SQL telles que MySQL, PostgreSQL ou SQLite, et exploite la clause FOR UPDATE SKIP LOCKED
, si disponible, pour éviter de bloquer et d'attendre des verrous lors de l'interrogation des tâches. Il s'appuie sur Active Job pour les tentatives, les suppressions, la gestion des erreurs, la sérialisation ou les retards, et il est compatible avec le multithread de Ruby on Rails.
Solid Queue est configuré par défaut dans les nouvelles applications Rails 8. Mais si vous utilisez une version antérieure, vous pouvez l'ajouter manuellement en suivant ces étapes :
bundle add solid_queue
bin/rails solid_queue:install
Cela configurera Solid Queue comme backend de production Active Job, créera les fichiers de configuration config/queue.yml
et config/recurring.yml
, et créera le db/queue_schema.rb
. Il créera également un wrapper exécutable bin/jobs
que vous pourrez utiliser pour démarrer Solid Queue.
Une fois que vous avez fait cela, vous devrez alors ajouter la configuration de la base de données de file d'attente dans config/database.yml
. Si vous utilisez SQLite, cela ressemblera à ceci :
production :
primary :
<< : *default
database : storage/production.sqlite3
queue :
<< : *default
database : storage/production_queue.sqlite3
migrations_paths : db/queue_migrate
...ou si vous utilisez MySQL/PostgreSQL/Trilogy :
production :
primary : &primary_production
<< : *default
database : app_production
username : app
password : <%= ENV["APP_DATABASE_PASSWORD"] %>
queue :
<< : *primary_production
database : app_production_queue
migrations_paths : db/queue_migrate
Remarque : L'appel de bin/rails solid_queue:install
ajoutera automatiquement config.solid_queue.connects_to = { database: { writing: :queue } }
à config/environments/production.rb
, donc aucune configuration supplémentaire n'est nécessaire ici (même si vous devez vous assurer que vous utilisez le nom queue
dans database.yml
pour que cela corresponde !). Mais si vous souhaitez utiliser Solid Queue dans un environnement différent (comme la préparation ou même le développement), vous devrez ajouter manuellement cette ligne config.solid_queue.connects_to
au fichier d'environnement respectif. Et, comme toujours, assurez-vous que le nom que vous utilisez pour la base de données dans config/database.yml
correspond au nom que vous utilisez dans config.solid_queue.connects_to
.
Exécutez ensuite db:prepare
en production pour vous assurer que la base de données est créée et que le schéma est chargé.
Vous êtes maintenant prêt à commencer à traiter les tâches en exécutant bin/jobs
sur le serveur qui effectue le travail. Cela commencera à traiter les tâches dans toutes les files d'attente en utilisant la configuration par défaut. Voir ci-dessous pour en savoir plus sur la configuration de Solid Queue.
Pour les petits projets, vous pouvez exécuter Solid Queue sur la même machine que votre serveur Web. Lorsque vous êtes prêt à évoluer, Solid Queue prend en charge la mise à l'échelle horizontale prête à l'emploi. Vous pouvez exécuter Solid Queue sur un serveur distinct de votre serveur Web, ou même exécuter bin/jobs
sur plusieurs machines en même temps. Selon la configuration, vous pouvez désigner certaines machines pour exécuter uniquement des répartiteurs ou uniquement des travailleurs. Voir la section configuration pour plus de détails à ce sujet.
Remarque : les futures modifications du schéma se feront sous la forme de migrations régulières.
Il est recommandé d'exécuter Solid Queue dans une base de données distincte, mais il est également possible d'utiliser une seule base de données pour l'application et la file d'attente. Suivez simplement ces étapes :
db/queue_schema.rb
dans une migration normale et supprimez db/queue_schema.rb
config.solid_queue.connects_to
de production.rb
bin/jobs
Vous n'aurez pas plusieurs bases de données, donc database.yml
n'a pas besoin d'avoir une base de données principale et une base de données de file d'attente.
Si vous envisagez d'adopter Solid Queue progressivement en changeant de tâche à la fois, vous pouvez le faire en laissant le config.active_job.queue_adapter
défini sur votre ancien backend, puis en définissant le queue_adapter
directement dans les tâches que vous déplacez :
# app/jobs/my_job.rb
class MyJob < ApplicationJob
self . queue_adapter = :solid_queue
# ...
end
Solid Queue a été conçu pour le débit le plus élevé lorsqu'il est utilisé avec MySQL 8+ ou PostgreSQL 9.5+, car ils prennent en charge FOR UPDATE SKIP LOCKED
. Vous pouvez l'utiliser avec des versions plus anciennes, mais dans ce cas, vous risquez de rencontrer des attentes de verrouillage si vous exécutez plusieurs nœuds de calcul pour la même file d'attente. Vous pouvez également l'utiliser avec SQLite sur des applications plus petites.
Nous avons plusieurs types d'acteurs dans Solid Queue :
solid_queue_ready_executions
.solid_queue_scheduled_executions
vers la table solid_queue_ready_executions
afin que les travailleurs puissent les récupérer. En plus de cela, ils effectuent des travaux de maintenance liés aux contrôles de concurrence.Le superviseur de Solid Queue établira un processus distinct pour chaque travailleur/répartiteur/planificateur supervisé.
Par défaut, Solid Queue essaiera de trouver votre configuration sous config/queue.yml
, mais vous pouvez définir un chemin différent en utilisant la variable d'environnement SOLID_QUEUE_CONFIG
ou en utilisant l'option -c/--config_file
avec bin/jobs
, comme ceci :
bin/jobs -c config/calendar.yml
Voici à quoi ressemble cette configuration :
production :
dispatchers :
- polling_interval : 1
batch_size : 500
concurrency_maintenance_interval : 300
workers :
- queues : " * "
threads : 3
polling_interval : 2
- queues : [ real_time, background ]
threads : 5
polling_interval : 0.1
processes : 3
Tout est facultatif. Si aucune configuration n'est fournie, Solid Queue fonctionnera avec un répartiteur et un travailleur avec les paramètres par défaut. Si vous souhaitez exécuter uniquement des répartiteurs ou des travailleurs, il vous suffit d'inclure cette section seule dans la configuration. Par exemple, avec la configuration suivante :
production :
dispatchers :
- polling_interval : 1
batch_size : 500
concurrency_maintenance_interval : 300
le superviseur dirigera 1 répartiteur et aucun travailleur.
Voici un aperçu des différentes options :
polling_interval
: l'intervalle de temps en secondes pendant lequel les travailleurs et les répartiteurs attendront avant de rechercher d'autres tâches. Ce temps est par défaut de 1
seconde pour les répartiteurs et 0.1
seconde pour les travailleurs.
batch_size
: le répartiteur répartira les tâches par lots de cette taille. La valeur par défaut est 500.
concurrency_maintenance_interval
: l'intervalle de temps en secondes pendant lequel le répartiteur attendra avant de vérifier les tâches bloquées qui peuvent être débloquées. En savoir plus sur les contrôles de concurrence pour en savoir plus sur ce paramètre. La valeur par défaut est 600
secondes.
queues
: la liste des files d'attente dans lesquelles les travailleurs sélectionneront les tâches. Vous pouvez utiliser *
pour indiquer toutes les files d'attente (ce qui est également la valeur par défaut et le comportement que vous obtiendrez si vous l'omettez). Vous pouvez fournir une seule file d'attente ou une liste de files d'attente sous forme de tableau. Les tâches seront interrogées dans ces files d'attente dans l'ordre, ainsi par exemple, avec [ real_time, background ]
, aucune tâche ne sera extraite de background
à moins qu'il n'y ait plus de tâches en attente en real_time
. Vous pouvez également fournir un préfixe avec un caractère générique pour faire correspondre les files d'attente commençant par un préfixe. Par exemple:
staging :
workers :
- queues : staging*
threads : 3
polling_interval : 5
Cela créera un travailleur récupérant les tâches de toutes les files d'attente en commençant par staging
. Le caractère générique *
n'est autorisé que seul ou à la fin d'un nom de file d'attente ; vous ne pouvez pas spécifier de noms de file d'attente tels que *_some_queue
. Ceux-ci seront ignorés.
Enfin, vous pouvez combiner des préfixes avec des noms exacts, comme [ staging*, background ]
, et le comportement en ce qui concerne l'ordre sera le même qu'avec des noms exacts uniquement.
Consultez les sections ci-dessous pour savoir comment l'ordre des files d'attente se comporte en combinaison avec les priorités et comment la manière dont vous spécifiez les files d'attente par travailleur peut affecter les performances.
threads
: c'est la taille maximale du pool de threads dont chaque travailleur devra exécuter des travaux. Chaque travailleur récupérera ce nombre de tâches au maximum dans sa ou ses files d'attente et les publiera dans le pool de threads pour être exécuté. Par défaut, c'est 3
. Seuls les travailleurs disposent de ce paramètre.
processes
: c'est le nombre de processus de travail qui seront lancés par le superviseur avec les paramètres donnés. Par défaut, c'est 1
, juste un seul processus. Ce paramètre est utile si vous souhaitez consacrer plusieurs cœurs de processeur à une ou plusieurs files d'attente avec la même configuration. Seuls les travailleurs disposent de ce paramètre.
concurrency_maintenance
: indique si le répartiteur effectuera le travail de maintenance de la concurrence. Cela est true
par défaut, et c'est utile si vous n'utilisez aucun contrôle de concurrence et souhaitez le désactiver ou si vous exécutez plusieurs répartiteurs et souhaitez que certains d'entre eux répartissent simplement les tâches sans rien faire d'autre.
Comme mentionné ci-dessus, si vous spécifiez une liste de files d'attente pour un travailleur, celles-ci seront interrogées dans l'ordre indiqué, comme pour la liste real_time,background
, aucune tâche ne sera supprimée de background
à moins qu'il n'y ait plus de tâches en attente. real_time
.
Active Job prend également en charge les priorités entières positives lors de la mise en file d'attente des tâches. Dans Solid Queue, plus la valeur est petite, plus la priorité est élevée. La valeur par défaut est 0
.
Ceci est utile lorsque vous exécutez des tâches avec une importance ou une urgence différente dans la même file d'attente. Dans la même file d'attente, les tâches seront sélectionnées par ordre de priorité, mais dans une liste de files d'attente, l'ordre de la file d'attente est prioritaire, donc dans l'exemple précédent avec real_time,background
, les tâches dans la file d'attente real_time
seront sélectionnées avant les tâches en background
file d'attente, même si ceux de la file d'attente background
ont une priorité plus élevée (valeur plus petite).
Nous vous recommandons de ne pas mélanger l'ordre de la file d'attente avec les priorités, mais de choisir l'un ou l'autre, car cela rendra l'ordre d'exécution des tâches plus simple pour vous.
Pour que l'interrogation reste performante et garantit qu'un index de couverture est toujours utilisé, Solid Queue n'effectue que deux types de requêtes d'interrogation :
-- No filtering by queue
SELECT job_id
FROM solid_queue_ready_executions
ORDER BY priority ASC , job_id ASC
LIMIT ?
FOR UPDATE SKIP LOCKED;
-- Filtering by a single queue
SELECT job_id
FROM solid_queue_ready_executions
WHERE queue_name = ?
ORDER BY priority ASC , job_id ASC
LIMIT ?
FOR UPDATE SKIP LOCKED;
Le premier (pas de filtrage par file d'attente) est utilisé lorsque vous spécifiez
queues : *
et aucune file d'attente n'est mise en pause, car nous voulons cibler toutes les files d'attente.
Dans d'autres cas, nous devons disposer d'une liste de files d'attente à filtrer, dans l'ordre, car nous ne pouvons filtrer que sur une seule file d'attente à la fois pour garantir que nous utilisons un index pour trier. Cela signifie que si vous spécifiez vos files d'attente comme :
queues : beta*
nous devrons d'abord obtenir une liste de toutes les files d'attente existantes correspondant à ce préfixe, avec une requête qui ressemblerait à ceci :
SELECT DISTINCT (queue_name)
FROM solid_queue_ready_executions
WHERE queue_name LIKE ' beta% ' ;
Ce type de requête DISTINCT
sur une colonne qui est la colonne la plus à gauche d'un index peut être effectué très rapidement dans MySQL grâce à une technique appelée Loose Index Scan. PostgreSQL et SQLite, cependant, n'implémentent pas cette technique, ce qui signifie que si votre table solid_queue_ready_executions
est très grande parce que vos files d'attente deviennent très profondes, cette requête deviendra lente. Normalement, votre table solid_queue_ready_executions
sera petite, mais cela peut arriver.
De la même manière que pour l'utilisation de préfixes, la même chose se produira si vous avez mis des files d'attente en pause, car nous devons obtenir une liste de toutes les files d'attente avec une requête telle que
SELECT DISTINCT (queue_name)
FROM solid_queue_ready_executions
puis supprimez ceux en pause. La pause en général devrait être quelque chose de rare, utilisé dans des circonstances particulières et pour une courte période de temps. Si vous ne souhaitez plus traiter les tâches d'une file d'attente, la meilleure façon de le faire est de les supprimer de votre liste de files d'attente.
Pour résumer, si vous souhaitez garantir des performances optimales lors des sondages , la meilleure façon d'y parvenir est de toujours leur spécifier des noms exacts et de ne pas suspendre les files d'attente.
Faites ceci :
queues : background, backend
au lieu de ça :
queues : back*
Les travailleurs de Solid Queue utilisent un pool de threads pour exécuter le travail dans plusieurs threads, configurable via le paramètre threads
ci-dessus. En plus de cela, le parallélisme peut être obtenu via plusieurs processus sur une seule machine (configurables via différents travailleurs ou le paramètre processes
ci-dessus) ou par mise à l'échelle horizontale.
Le superviseur est en charge de gérer ces processus et il répond aux signaux suivants :
TERM
, INT
: démarre la terminaison gracieuse. Le superviseur enverra un signal TERM
à ses processus supervisés et attendra jusqu'à SolidQueue.shutdown_timeout
jusqu'à ce qu'ils aient terminé. Si des processus supervisés sont toujours là, il leur enverra un signal QUIT
pour indiquer qu'ils doivent quitter.QUIT
: démarre la terminaison immédiate. Le superviseur enverra un signal QUIT
à ses processus supervisés, les faisant quitter immédiatement. Lors de la réception d'un signal QUIT
, si les travailleurs ont encore des tâches en cours, celles-ci seront renvoyées dans la file d'attente lorsque les processus seront désenregistrés.
Si les processus n'ont aucune chance d'être nettoyés avant de quitter (par exemple si quelqu'un tire un câble quelque part), les tâches en cours peuvent rester réclamées par les processus qui les exécutent. Les processus envoient des battements de cœur, et le superviseur vérifie et élague les processus dont les battements de cœur ont expiré, ce qui libérera toutes les tâches réclamées dans leurs files d'attente. Vous pouvez configurer à la fois la fréquence des battements de cœur et le seuil pour considérer un processus comme mort. Voir la section ci-dessous pour cela.
Vous pouvez configurer la base de données utilisée par Solid Queue via l'option config.solid_queue.connects_to
dans les fichiers de configuration config/application.rb
ou config/environments/production.rb
. Par défaut, une seule base de données est utilisée à la fois pour l'écriture et la lecture, appelée queue
pour correspondre à la configuration de la base de données que vous avez définie lors de l'installation.
Toutes les options disponibles pour Active Record pour plusieurs bases de données peuvent être utilisées ici.
Dans Solid queue, vous pouvez vous connecter à deux moments différents de la vie du superviseur :
start
: après que le superviseur a fini de démarrer et juste avant qu'il ne charge les travailleurs et les répartiteurs.stop
: après avoir reçu un signal ( TERM
, INT
ou QUIT
) et juste avant de démarrer l'arrêt progressif ou immédiat.Et en deux moments différents de la vie d'un travailleur :
worker_start
: après que le travailleur ait fini de démarrer et juste avant de démarrer la boucle d'interrogation.worker_stop
: après avoir reçu un signal ( TERM
, INT
ou QUIT
) et juste avant de commencer l'arrêt progressif ou immédiat (qui est juste exit!
).Vous pouvez utiliser les méthodes suivantes avec un bloc pour ce faire :
SolidQueue . on_start
SolidQueue . on_stop
SolidQueue . on_worker_start
SolidQueue . on_worker_stop
Par exemple:
SolidQueue . on_start { start_metrics_server }
SolidQueue . on_stop { stop_metrics_server }
Ceux-ci peuvent être appelés plusieurs fois pour ajouter plusieurs hooks, mais cela doit se produire avant le démarrage de Solid Queue. Un initialiseur serait un bon endroit pour le faire.
Remarque : Les paramètres de cette section doivent être définis dans votre config/application.rb
ou dans la configuration de votre environnement comme ceci : config.solid_queue.silence_polling = true
Il existe plusieurs paramètres qui contrôlent le fonctionnement de Solid Queue et que vous pouvez également définir :
logger
: le logger que vous souhaitez que Solid Queue utilise. La valeur par défaut est l'enregistreur d'application.
app_executor
: l'exécuteur Rails utilisé pour encapsuler les opérations asynchrones, par défaut l'exécuteur d'application
on_thread_error
: lambda/Proc personnalisé à appeler lorsqu'il y a une erreur dans un thread Solid Queue qui prend l'exception levée comme argument. La valeur par défaut est
-> ( exception ) { Rails . error . report ( exception , handled : false ) }
Ceci n'est pas utilisé pour les erreurs générées lors de l'exécution d'une tâche . Les erreurs qui se produisent dans les tâches sont gérées par retry_on
ou discard_on
d'Active Job et entraîneront finalement l'échec des tâches. Ceci concerne les erreurs qui se produisent dans Solid Queue lui-même.
use_skip_locked
: s'il faut utiliser FOR UPDATE SKIP LOCKED
lors de l'exécution de lectures verrouillées. Cela sera automatiquement détecté à l'avenir, et pour l'instant, vous n'aurez besoin de définir cela sur false
que si votre base de données ne le prend pas en charge. Pour MySQL, ce serait les versions <8, et pour PostgreSQL, les versions <9.5. Si vous utilisez SQLite, cela n'a aucun effet car les écritures sont séquentielles.
process_heartbeat_interval
: l'intervalle de battement de coeur que tous les processus suivront (60 secondes par défaut).
process_alive_threshold
: combien de temps attendre jusqu'à ce qu'un processus soit considéré comme mort après son dernier battement de cœur (par défaut 5 minutes).
shutdown_timeout
: temps que le superviseur attendra depuis qu'il a envoyé le signal TERM
à ses processus supervisés avant de leur envoyer une version QUIT
demandant une terminaison immédiate (par défaut : 5 secondes).
silence_polling
: s'il faut désactiver les journaux Active Record émis lors de l'interrogation des travailleurs et des répartiteurs (la valeur par défaut est true
.
supervisor_pidfile
: chemin vers un fichier pid que le superviseur créera lors du démarrage pour éviter d'exécuter plus d'un superviseur sur le même hôte, ou au cas où vous souhaiteriez l'utiliser pour un contrôle de santé. C'est nil
par défaut.
preserve_finished_jobs
: s'il faut conserver les tâches terminées dans la table solid_queue_jobs
– la valeur par défaut est true
.
clear_finished_jobs_after
: période pendant laquelle conserver les tâches terminées, au cas où preserve_finished_jobs
serait vrai – la valeur par défaut est 1 jour. Remarque : À l'heure actuelle, il n'existe aucun nettoyage automatique des tâches terminées. Vous devrez le faire en appelant périodiquement SolidQueue::Job.clear_finished_in_batches
, mais cela se produira automatiquement dans un avenir proche.
default_concurrency_control_period
: la valeur à utiliser par défaut pour le paramètre duration
dans les contrôles de concurrence. La durée par défaut est de 3 minutes.
Solid Queue déclenchera une SolidQueue::Job::EnqueueError
pour toute erreur Active Record qui se produit lors de la mise en file d'attente d'un travail. La raison pour laquelle ActiveJob::EnqueueError
n'est pas déclenchée est que celle-ci est gérée par Active Job, ce qui oblige perform_later
à renvoyer false
et à définir job.enqueue_error
, cédant le travail à un bloc que vous devez transmettre à perform_later
. Cela fonctionne très bien pour vos propres tâches, mais rend l'échec très difficile à gérer pour les tâches mises en file d'attente par Rails ou d'autres gemmes, telles que Turbo::Streams::BroadcastJob
ou ActiveStorage::AnalyzeJob
, car vous ne contrôlez pas l'appel à perform_later
dans ces cas-là.
Dans le cas de tâches récurrentes, si une telle erreur est générée lors de la mise en file d'attente du travail correspondant à la tâche, elle sera traitée et enregistrée mais elle ne remontera pas.
Solid Queue étend Active Job avec des contrôles de concurrence, qui vous permettent de limiter le nombre de tâches d'un certain type ou avec certains arguments pouvant être exécutées en même temps. Lorsqu'elles sont limitées de cette manière, l'exécution des tâches sera bloquée et elles resteront bloquées jusqu'à ce qu'une autre tâche se termine et les débloque, ou après l'expiration du délai d'expiration défini ( durée de la limite de concurrence). Les emplois ne sont jamais supprimés ou perdus, mais simplement bloqués.
class MyJob < ApplicationJob
limits_concurrency to : max_concurrent_executions , key : -> ( arg1 , arg2 , ** ) { ... } , duration : max_interval_to_guarantee_concurrency_limit , group : concurrency_group
# ...
key
est le seul paramètre obligatoire, et il peut s'agir d'un symbole, d'une chaîne ou d'un proc qui reçoit les arguments du travail en tant que paramètres et sera utilisé pour identifier les travaux qui doivent être limités ensemble. Si le processus renvoie un enregistrement Active Record, la clé sera construite à partir de son nom de classe et id
.to
vaut 1
par défaut.duration
est définie par défaut sur SolidQueue.default_concurrency_control_period
, qui elle-même est définie par défaut sur 3 minutes
, mais que vous pouvez également configurer.group
est utilisé pour contrôler la simultanéité de différentes catégories d’emplois. Il s'agit par défaut du nom de la classe d'emplois. Lorsqu'une tâche inclut ces contrôles, nous garantirons qu'au plus le nombre de tâches (indiqué to
) qui génèrent la même key
seront exécutées simultanément, et cette garantie durera pendant duration
de chaque tâche mise en file d'attente. Notez qu'il n'y a aucune garantie sur l'ordre d'exécution , uniquement sur les tâches exécutées en même temps (chevauchement).
Les limites de concurrence utilisent le concept de sémaphores lors de la mise en file d'attente et fonctionnent comme suit : lorsqu'un travail est mis en file d'attente, nous vérifions s'il spécifie des contrôles de concurrence. Si c'est le cas, nous vérifions le sémaphore pour la clé de concurrence calculée. Si le sémaphore est ouvert, nous le réclamons et nous définissons le travail comme prêt . Prêt signifie qu'il peut être récupéré par les travailleurs pour exécution. Lorsque l'exécution du travail termine (que ce soit avec succès ou sans succès, entraînant un échec d'exécution), nous signalons le sémaphore et essayons de débloquer le travail suivant avec la même clé, le cas échéant. Débloquer la tâche suivante ne signifie pas exécuter cette tâche immédiatement, mais la faire passer de bloqué à prêt . Puisque quelque chose peut arriver qui empêche le premier travail de libérer le sémaphore et de débloquer le travail suivant (par exemple, quelqu'un débranche la machine sur laquelle le travailleur est en cours d'exécution), nous avons la duration
comme sécurité. Les emplois qui ont été bloqués pendant plus que la durée sont candidats à être libérés, mais seulement autant d'entre eux que le permettent les règles de concurrence, car chacun devrait passer par le contrôle de danse du sémaphore. Cela signifie que la duration
ne concerne pas vraiment le travail mis en file d'attente ou en cours d'exécution, mais plutôt les travaux bloqués en attente.
Par exemple:
class DeliverAnnouncementToContactJob < ApplicationJob
limits_concurrency to : 2 , key : -> ( contact ) { contact . account } , duration : 5 . minutes
def perform ( contact )
# ...
Où contact
et account
sont des enregistrements ActiveRecord
. Dans ce cas, nous veillerons à ce qu'au plus deux tâches du type DeliverAnnouncementToContact
pour le même compte s'exécutent simultanément. Si, pour une raison quelconque, l'un de ces travaux prend plus de 5 minutes ou ne libère pas son verrou de concurrence (signale le sémaphore) dans les 5 minutes suivant son acquisition, un nouveau travail avec la même clé pourrait obtenir le verrou.
Voyons un autre exemple utilisant group
:
class Box :: MovePostingsByContactToDesignatedBoxJob < ApplicationJob
limits_concurrency key : -> ( contact ) { contact } , duration : 15 . minutes , group : "ContactActions"
def perform ( contact )
# ...
class Bundle :: RebundlePostingsJob < ApplicationJob
limits_concurrency key : -> ( bundle ) { bundle . contact } , duration : 15 . minutes , group : "ContactActions"
def perform ( bundle )
# ...
Dans ce cas, si nous avons un travail Box::MovePostingsByContactToDesignatedBoxJob
mis en file d'attente pour un enregistrement de contact avec l'ID 123
et un autre travail Bundle::RebundlePostingsJob
mis en file d'attente simultanément pour un enregistrement de bundle faisant référence au contact 123
, un seul d'entre eux sera autorisé à continuer. L'autre restera bloqué jusqu'à la fin du premier (ou 15 minutes, quoi qu'il arrive en premier).
Notez que le paramètre duration
dépend indirectement de la valeur de concurrency_maintenance_interval
que vous définissez pour votre (vos) répartiteur(s), car ce serait la fréquence à laquelle les tâches bloquées sont vérifiées et débloquées. En général, vous devez définir duration
de manière à ce que tous vos travaux se terminent bien en dessous de cette durée et considérer la tâche de maintenance simultanée comme une sécurité intégrée en cas de problème.
Les tâches sont débloquées par ordre de priorité mais l'ordre de la file d'attente n'est pas pris en compte pour le déblocage des tâches. Cela signifie que si vous avez un groupe de tâches partageant un groupe de concurrence mais se trouvant dans des files d'attente différentes, ou si des tâches de la même classe que vous mettez en file d'attente dans différentes files d'attente, l'ordre de file d'attente que vous avez défini pour un travailleur n'est pas pris en compte lors du déblocage des tâches bloquées. ceux. La raison en est qu'un travail en cours d'exécution débloque le suivant et que le travail lui-même ne connaît pas l'ordre de file d'attente d'un travailleur particulier (vous pouvez même avoir différents travailleurs avec des ordres de file d'attente différents), il ne peut connaître que la priorité. Une fois que les tâches bloquées sont débloquées et disponibles pour interrogation, elles seront récupérées par un travailleur suivant son ordre de file d'attente.
Enfin, les tâches ayant échoué qui sont réessayées automatiquement ou manuellement fonctionnent de la même manière que les nouvelles tâches mises en file d'attente : elles entrent dans la file d'attente pour obtenir un sémaphore ouvert, et chaque fois qu'elles l'obtiennent, elles seront exécutées. Peu importe s'ils avaient déjà obtenu un sémaphore ouvert dans le passé.
Solid Queue n'inclut aucun mécanisme de nouvelle tentative automatique, il s'appuie sur Active Job pour cela. Les tâches qui échouent seront conservées dans le système et une exécution ayant échoué (un enregistrement dans la table solid_queue_failed_executions
) sera créée pour celles-ci. Le travail y restera jusqu'à ce qu'il soit manuellement supprimé ou remis en file d'attente. Vous pouvez le faire dans une console comme :
failed_execution = SolidQueue :: FailedExecution . find ( ... ) # Find the failed execution related to your job
failed_execution . error # inspect the error
failed_execution . retry # This will re-enqueue the job as if it was enqueued for the first time
failed_execution . discard # This will delete the job from the system
Cependant, nous vous recommandons de jeter un œil à mission_control-jobs, un tableau de bord où, entre autres choses, vous pouvez examiner et réessayer/abandonner les tâches ayant échoué.
Certains services de suivi des erreurs qui s'intègrent à Rails, tels que Sentry ou Rollbar, se connectent à Active Job et signalent automatiquement les erreurs non gérées qui se produisent lors de l'exécution du travail. Cependant, si votre système de suivi des erreurs ne le fait pas, ou si vous avez besoin de rapports personnalisés, vous pouvez vous connecter vous-même à Active Job. Une manière possible de procéder serait la suivante :
# application_job.rb
class ApplicationJob < ActiveJob :: Base
rescue_from ( Exception ) do | exception |
Rails . error . report ( exception )
raise exception
end
end
Notez que vous devrez également dupliquer la logique ci-dessus sur ActionMailer::MailDeliveryJob
. En effet, ActionMailer
n'hérite pas de ApplicationJob
mais utilise plutôt ActionMailer::MailDeliveryJob
qui hérite de ActiveJob::Base
.
# application_mailer.rb
class ApplicationMailer < ActionMailer :: Base
ActionMailer :: MailDeliveryJob . rescue_from ( Exception ) do | exception |
Rails . error . report ( exception )
raise exception
end
end
Nous fournissons un plugin Puma si vous souhaitez exécuter le superviseur de Solid Queue avec Puma et que Puma le surveille et le gère. Il vous suffit d'ajouter
plugin :solid_queue
à votre configuration puma.rb
Parce que cela peut être assez délicat et que beaucoup de gens ne devraient pas avoir à s'en inquiéter, par défaut, Solid Queue est configuré dans une base de données différente de celle de l'application principale, la mise en file d'attente des tâches est différée jusqu'à ce qu'une transaction en cours soit validée grâce à la fonctionnalité intégrée d'Active Job. capacité de le faire. Cela signifie que même si vous exécutez Solid Queue dans la même base de données que votre application, vous ne profiterez pas de cette intégrité transactionnelle.
Si vous préférez modifier cela, vous pouvez définir config.active_job.enqueue_after_transaction_commit
sur never
. Vous pouvez également définir cela pour chaque tâche.
Si vous définissez cette option sur never
mais que vous souhaitez tout de même vous assurer que vous ne portez pas atteinte par inadvertance à l'intégrité transactionnelle, vous pouvez vous assurer que :
Vos tâches reposant sur des données spécifiques sont toujours mises en file d'attente lors des rappels after_commit
ou autrement à partir d'un endroit où vous êtes certain que les données que la tâche utilisera ont été validées dans la base de données avant que la tâche ne soit mise en file d'attente.
Ou bien, vous configurez une base de données différente pour Solid Queue, même si elle est identique à votre application, en vous assurant qu'une connexion différente sur les demandes de traitement des threads ou les tâches en cours d'exécution pour votre application sera utilisée pour mettre les tâches en file d'attente. Par exemple:
class ApplicationRecord < ActiveRecord :: Base
self . abstract_class = true
connects_to database : { writing : :primary , reading : :replica }
config . solid_queue . connects_to = { database : { writing : :primary , reading : :replica } }
Solid Queue prend en charge la définition de tâches récurrentes qui s'exécutent à des moments précis dans le futur, sur une base régulière, comme les tâches cron. Ceux-ci sont gérés par le processus planificateur et sont définis dans leur propre fichier de configuration. Par défaut, le fichier se trouve dans config/recurring.yml
, mais vous pouvez définir un chemin différent en utilisant la variable d'environnement SOLID_QUEUE_RECURRING_SCHEDULE
ou en utilisant l'option --recurring_schedule_file
avec bin/jobs
, comme ceci :
bin/jobs --recurring_schedule_file=config/schedule.yml
La configuration elle-même ressemble à ceci :
production :
a_periodic_job :
class : MyJob
args : [ 42, { status: "custom_status" } ]
schedule : every second
a_cleanup_task :
command : " DeletedStuff.clear_all "
schedule : every day at 9am
Les tâches sont spécifiées sous forme de hachage/dictionnaire, où la clé sera la clé de la tâche en interne. Chaque tâche doit avoir soit une class
, qui sera la classe de travail à mettre en file d'attente, soit une command
, qui sera évaluée dans le contexte d'un travail ( SolidQueue::RecurringJob
) qui sera mis en file d'attente selon son calendrier, dans la file d'attente solid_queue_recurring
.
Chaque tâche doit également avoir un calendrier, qui est analysé à l'aide de Fugit, afin qu'elle accepte tout ce que Fugit accepte comme cron. Vous pouvez éventuellement fournir les éléments suivants pour chaque tâche :
args
: les arguments à transmettre au travail, sous forme d'argument unique, de hachage ou de tableau d'arguments pouvant également inclure des kwargs comme dernier élément du tableau.La tâche dans l'exemple de configuration ci-dessus sera mise en file d'attente toutes les secondes comme :
MyJob . perform_later ( 42 , status : "custom_status" )
queue
: une file d'attente différente à utiliser lors de la mise en file d'attente du travail. S'il n'y en a pas, la file d'attente est configurée pour la classe d'emplois.
priority
: une valeur de priorité numérique à utiliser lors de la mise en file d'attente du travail.
Les tâches sont mises en file d'attente aux heures correspondantes par le planificateur, et chaque tâche planifie la suivante. Ceci est assez inspiré de ce que fait GoodJob.
Il est possible d'exécuter plusieurs planificateurs avec la même configuration recurring_tasks
, par exemple, si vous disposez de plusieurs serveurs pour la redondance et que vous exécutez le scheduler
sur plusieurs d'entre eux. Pour éviter de mettre en file d'attente des tâches en double en même temps, une entrée dans une nouvelle table solid_queue_recurring_executions
est créée dans la même transaction que la mise en file d'attente du travail. Cette table a un index unique sur task_key
et run_at
, garantissant qu'une seule entrée par tâche et par heure sera créée. Cela ne fonctionne que si preserve_finished_jobs
est défini sur true
(valeur par défaut), et la garantie s'applique tant que vous conservez les tâches.
Remarque : un seul planning récurrent est pris en charge, vous pouvez donc avoir plusieurs planificateurs utilisant le même planning, mais pas plusieurs planificateurs utilisant des configurations différentes.
Enfin, il est possible de configurer des tâches qui ne sont pas gérées par Solid Queue. Autrement dit, vous pouvez avoir un travail comme celui-ci dans votre application :
class MyResqueJob < ApplicationJob
self . queue_adapter = :resque
def perform ( arg )
# ..
end
end
Vous pouvez toujours configurer cela dans Solid Queue :
my_periodic_resque_job :
class : MyResqueJob
args : 22
schedule : " */5 * * * * "
et le travail sera mis en file d'attente via perform_later
afin qu'il s'exécute dans Resque. Cependant, dans ce cas, nous ne suivrons aucun enregistrement solid_queue_recurring_execution
et il n'y aura aucune garantie que le travail ne soit mis en file d'attente qu'une seule fois à chaque fois.
Solid Queue a été inspiré par Resque et GoodJob. Nous vous recommandons de consulter ces projets car ce sont d'excellents exemples dont nous avons beaucoup appris.
La gemme est disponible en open source selon les termes de la licence MIT.