Boostez vos files d'attente avec un frontal professionnel :
Obtenez un aperçu complet de toutes vos files d’attente.
Inspectez les tâches, recherchez, réessayez ou promouvez les tâches retardées.
Mesures et statistiques.
et bien d'autres fonctionnalités.
Inscrivez-vous sur Taskforce.sh
Utilisation minimale du processeur grâce à une conception sans interrogation.
Conception robuste basée sur Redis.
Travaux retardés.
Planifiez et répétez les tâches selon une spécification cron.
Limiteur de débit pour les travaux.
Nouvelles tentatives.
Priorité.
Concurrence.
Pause/reprise – globalement ou localement.
Plusieurs types de travaux par file d'attente.
Fonctions de traitement threadées (en bac à sable).
Récupération automatique en cas de panne de processus.
Et à venir sur la feuille de route...
Accusé de réception de la fin du travail (vous pouvez utiliser le modèle de file d'attente de messages en attendant).
Relations professionnelles parents-enfants.
Il existe quelques interfaces utilisateur tierces que vous pouvez utiliser pour la surveillance :
BullMQ
Groupe de travail
Taureau v3
Groupe de travail
planche à taureaux
taureau-rempl
moniteur de taureau
Moniteur
Taureau <= v2
Matador
réagir-bull
Toureiro
Avec l'exportateur de files d'attente Prometheus Bull
Puisqu'il existe plusieurs solutions de file d'attente de tâches, voici un tableau les comparant :
Dragonfly est un nouveau remplacement direct de Redis™ qui est entièrement compatible avec BullMQ et apporte des avantages importants par rapport à Redis™, tels qu'une amélioration considérable des performances en utilisant tous les cœurs de processeur disponibles et des structures de données plus rapides et plus économes en mémoire. Apprenez-en plus ici sur la façon de l'utiliser avec BullMQ. | |
Si vous avez besoin d'instances Redis de production de haute qualité pour votre projet Bull, pensez à vous abonner à Memetria for Redis, leader de l'hébergement Redis qui fonctionne parfaitement avec BullMQ. Utilisez le code promo « BULLMQ » lors de votre inscription pour nous aider à sponsoriser le développement de BullMQ ! |
Fonctionnalité | BullMQ-Pro | BullMQ | Taureau | Kué | Abeille | Ordre du jour |
---|---|---|---|---|---|---|
Back-end | redis | redis | redis | redis | redis | mongo |
Observables | ✓ | |||||
Limite de tarif de groupe | ✓ | |||||
Soutien de groupe | ✓ | |||||
Prise en charge des lots | ✓ | |||||
Dépendances parent/enfant | ✓ | ✓ | ||||
Priorités | ✓ | ✓ | ✓ | ✓ | ✓ | |
Concurrence | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
Travaux retardés | ✓ | ✓ | ✓ | ✓ | ✓ | |
Événements mondiaux | ✓ | ✓ | ✓ | ✓ | ||
Limiteur de débit | ✓ | ✓ | ✓ | |||
Pause/Reprise | ✓ | ✓ | ✓ | ✓ | ||
Travailleur en bac à sable | ✓ | ✓ | ✓ | |||
Travaux répétables | ✓ | ✓ | ✓ | ✓ | ||
Opérations atomiques | ✓ | ✓ | ✓ | ✓ | ||
Persistance | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
Interface utilisateur | ✓ | ✓ | ✓ | ✓ | ✓ | |
Optimisé pour | Emplois / Messages | Emplois / Messages | Emplois / Messages | Emplois | Messages | Emplois |
npm installer taureau --save
ou
fil ajouter taureau
Exigences : Bull nécessite une version Redis supérieure ou égale à 2.8.18
.
npm install @types/bull --save-dev
fil ajouter --dev @types/bull
Les définitions sont actuellement conservées dans le dépôt DefinitelyTyped.
Nous acceptons tous les types de contributions, qu'il s'agisse de corrections de code, de nouvelles fonctionnalités ou d'améliorations de la documentation. Le formatage du code est appliqué de manière plus jolie. Pour les commits, veuillez suivre la convention de commits conventionnelle. Tout le code doit satisfaire aux règles de charpie et aux suites de tests avant de pouvoir être fusionné dans le développement.
const Queue = require('bull');const videoQueue = new Queue('transcodage vidéo', 'redis://127.0.0.1:6379');const audioQueue = new Queue('transcodage audio', { redis: { port : 6379, hôte : '127.0.0.1', mot de passe : 'foobared' } }); // Spécifiez la connexion Redis à l'aide de objectconst imageQueue = new Queue('image transcoding');const pdfQueue = new Queue('pdf transcoding');videoQueue.process(function (job, done) { // job.data contient les données personnalisées transmises lors de la création du travail // job.id contient l'identifiant de ce travail. // transcode la vidéo de manière asynchrone et rapporte la progression job.progress(42); // appel terminé une fois terminé fait(); // ou donne une erreur si erreur done(new Error('erreur de transcodage')); // ou lui passe un résultat done(null, { framerate : 29,5 /* etc... */ }); // Si le travail lève une exception non gérée, elle est également gérée correctement throw new Error('une erreur inattendue');});audioQueue.process(function (job, done) { // transcode l'audio de manière asynchrone et rapporte la progression job.progress(42); // appel terminé une fois terminé fait(); // ou donne une erreur si erreur done(new Error('erreur de transcodage')); // ou lui passe un résultat done(null, { samplerate: 48000 /* etc... */ }); // Si le travail lève une exception non gérée, elle est également gérée correctement throw new Error('une erreur inattendue');});imageQueue.process(function (job, done) { // transcode l'image de manière asynchrone et rapporte la progression job.progress(42); // appel terminé une fois terminé fait(); // ou donne une erreur si erreur done(new Error('erreur de transcodage')); // ou lui passe un résultat done(null, { largeur : 1280, hauteur : 720 /* etc... */ }); // Si le travail lève une exception non gérée, elle est également gérée correctement throw new Error('une erreur inattendue');});pdfQueue.process(function (job) { // Les processeurs peuvent également renvoyer des promesses au lieu d'utiliser le rappel done return pdfAsyncProcessor();});videoQueue.add({ vidéo : 'http://example.com/video1.mov' });audioQueue.add({ audio : 'http://example.com/audio1.mp3 ' });imageQueue.add({ image: 'http://example.com/image1.tiff' });
Alternativement, vous pouvez renvoyer des promesses au lieu d'utiliser le rappel done
:
videoQueue.process(function (job) { // n'oubliez pas de supprimer le rappel terminé ! // Renvoie simplement une promesse return fetchVideo(job.data.url).then(transcodeVideo); // Gère le rejet de la promesse return Promise.reject(new Error('erreur de transcodage')); // Passe la valeur avec laquelle la promesse est résolue à l'événement "terminé" return Promise.resolve({ framerate: 29.5 /* etc... */ }); // Si le travail lève une exception non gérée, elle est également gérée correctement throw new Error('une erreur inattendue'); // pareil que return Promise.reject(new Error('une erreur inattendue'));});
La fonction de processus peut également être exécutée dans un processus distinct. Cela présente plusieurs avantages :
Le processus est en mode bac à sable, donc s'il plante, cela n'affecte pas le travailleur.
Vous pouvez exécuter du code de blocage sans affecter la file d'attente (les tâches ne seront pas bloquées).
Bien meilleure utilisation des processeurs multicœurs.
Moins de connexions à Redis.
Pour utiliser cette fonctionnalité, créez simplement un fichier séparé avec le processeur :
// processeur.jsmodule.exports = fonction (travail) { // Effectuer des travaux lourds return Promise.resolve(result);}
Et définissez le processeur comme ceci :
// Processus unique :queue.process('/path/to/my/processor.js');// Vous pouvez également utiliser la concurrence :queue.process(5, '/path/to/my/processor.js' );// et processeurs nommés:queue.process('mon processeur', 5, '/path/to/my/processor.js');
Une tâche peut être ajoutée à une file d'attente et traitée à plusieurs reprises selon une spécification cron :
paymentQueue.process(function (job) {// Vérifier les paiements }); // Répétez le travail de paiement une fois par jour à 3h15 (du matin) paymentQueue.add(paymentsData, { répétition : { cron : '15 3 * * *' } });
Comme conseil, vérifiez vos expressions ici pour vous assurer qu'elles sont correctes : générateur d'expressions cron
Une file d'attente peut être suspendue et reprise globalement (passez true
pour suspendre le traitement uniquement pour ce travailleur) :
file d'attente.pause().then(function () { // la file d'attente est maintenant en pause});queue.resume().then(function () { // la file d'attente est reprise maintenant})
Une file d'attente émet des événements utiles, par exemple...
.on('terminé', fonction (tâche, résultat) { // Travail terminé avec résultat de sortie !})
Pour plus d'informations sur les événements, y compris la liste complète des événements déclenchés, consultez la référence des événements.
Les files d'attente sont bon marché, donc si vous en avez besoin, créez-en simplement de nouvelles avec des noms différents :
const userJohn = new Queue('john');const userLisa = new Queue('lisa');...
Cependant, chaque instance de file d'attente nécessitera de nouvelles connexions Redis, vérifiez comment réutiliser les connexions ou vous pouvez également utiliser des processeurs nommés pour obtenir un résultat similaire.
REMARQUE : à partir de la version 3.2.0, il est recommandé d'utiliser plutôt des processeurs threadés.
Les files d'attente sont robustes et peuvent être exécutées en parallèle dans plusieurs threads ou processus sans aucun risque de danger ou de corruption de file d'attente. Consultez cet exemple simple utilisant un cluster pour paralléliser les tâches entre les processus :
const Queue = require('bull');const cluster = require('cluster');const numWorkers = 8;const queue = new Queue('test concurrent queue');if (cluster.isMaster) { pour (soit i = 0; i < numWorkers; i++) {cluster.fork(); } cluster.on('online', function (worker) {// Créons quelques tâches pour les travailleurs de la file d'attente pour (let i = 0; i < 500; i++) { queue.add({ foo: 'bar' }); } ; }); cluster.on('exit', function (worker, code, signal) {console.log('worker ' + worker.process.pid + 'mort'); });} autre { queue.process(function (job, jobDone) {console.log('Travail effectué par le travailleur', cluster.worker.id, job.id);jobDone(); });}
Pour la documentation complète, consultez la référence et les modèles courants :
Guide — Votre point de départ pour développer avec Bull.
Référence — Document de référence avec tous les objets et méthodes disponibles.
Modèles - un ensemble d'exemples de modèles courants.
La licence — la licence Bull — c'est le MIT.
Si vous voyez quelque chose qui pourrait utiliser plus de documents, veuillez soumettre une pull request !
La file d'attente vise une stratégie de travail "au moins une fois". Cela signifie que dans certaines situations, une tâche peut être traitée plusieurs fois. Cela se produit principalement lorsqu'un travailleur ne parvient pas à conserver le verrou pour une tâche donnée pendant toute la durée du traitement.
Lorsqu'un travailleur traite un travail, il le gardera "verrouillé" afin que les autres travailleurs ne puissent pas le traiter.
Il est important de comprendre comment fonctionne le verrouillage pour éviter que vos tâches perdent leur verrouillage (se bloquent ) et soient redémarrées en conséquence. Le verrouillage est implémenté en interne en créant un verrou pour lockDuration
sur l'intervalle lockRenewTime
(qui est généralement la moitié lockDuration
). Si lockDuration
s'écoule avant que le verrouillage puisse être renouvelé, le travail sera considéré comme bloqué et sera automatiquement redémarré ; il sera traité deux fois . Cela peut se produire lorsque :
Le processus Node exécutant votre Job Processor s'arrête de manière inattendue.
Votre processeur de travail était trop gourmand en CPU et a bloqué la boucle d'événements Node, et par conséquent, Bull n'a pas pu renouveler le verrouillage du travail (voir #488 pour savoir comment nous pourrions mieux détecter cela). Vous pouvez résoudre ce problème en divisant votre Job Processor en parties plus petites afin qu'aucune partie ne puisse bloquer la boucle d'événements Node. Alternativement, vous pouvez transmettre une valeur plus grande pour le paramètre lockDuration
(le compromis étant qu'il faudra plus de temps pour reconnaître une véritable tâche bloquée).
En tant que tel, vous devez toujours écouter l'événement stalled
et l'enregistrer dans votre système de surveillance des erreurs, car cela signifie que vos tâches seront probablement traitées deux fois.
Pour garantir que les tâches problématiques ne soient pas redémarrées indéfiniment (par exemple, si le processeur de tâches plante toujours son processus Node), les tâches seront récupérées à partir d'un état bloqué un maximum de fois maxStalledCount
(par défaut : 1
).