Potencia tus colas con una interfaz profesional:
Obtenga una descripción completa de todas sus colas.
Inspeccione trabajos, busque, vuelva a intentar o promueva trabajos retrasados.
Métricas y estadísticas.
y muchas más funciones.
Regístrese en Taskforce.sh
Uso mínimo de CPU debido a un diseño sin sondeo.
Diseño robusto basado en Redis.
Trabajos retrasados.
Programe y repita trabajos de acuerdo con una especificación cron.
Limitador de tarifas para trabajos.
Reintentos.
Prioridad.
Concurrencia.
Pausar/reanudar: global o localmente.
Múltiples tipos de trabajos por cola.
Funciones de procesamiento enhebradas (en zona de pruebas).
Recuperación automática de fallas en el proceso.
Y próximamente en la hoja de ruta...
Acuse de recibo de finalización del trabajo (mientras tanto, puede utilizar el patrón de cola de mensajes).
Relaciones laborales entre padres e hijos.
Existen algunas UI de terceros que puede utilizar para monitorear:
ToroMQ
grupo de trabajo
toro v3
grupo de trabajo
tablero de toros
respuesta al toro
monitor de toros
Monitoreo
Toro <= v2
Matador
reaccionar-toro
Toureiro
Con Prometheus Bull Queue Exporter
Dado que existen algunas soluciones de colas de trabajos, aquí hay una tabla que las compara:
Dragonfly es un nuevo reemplazo directo de Redis™ que es totalmente compatible con BullMQ y ofrece algunas ventajas importantes sobre Redis™, como un rendimiento enormemente mejor al utilizar todos los núcleos de CPU disponibles y estructuras de datos más rápidas y eficientes en memoria. Lea más aquí sobre cómo usarlo con BullMQ. | |
Si necesita instancias de Redis de producción de alta calidad para su proyecto Bull, considere suscribirse a Memetria para Redis, líder en alojamiento de Redis que funciona perfectamente con BullMQ. Utilice el código de promoción "BULLMQ" al registrarse para ayudarnos a patrocinar el desarrollo de BullMQ. |
Característica | BullMQ-Pro | ToroMQ | Toro | Kue | Abeja | Orden del día |
---|---|---|---|---|---|---|
backend | Redistribuir | Redistribuir | Redistribuir | Redistribuir | Redistribuir | mongo |
observables | ✓ | |||||
Límite de tarifa de grupo | ✓ | |||||
Soporte grupal | ✓ | |||||
Soporte de lotes | ✓ | |||||
Dependencias entre padres e hijos | ✓ | ✓ | ||||
Prioridades | ✓ | ✓ | ✓ | ✓ | ✓ | |
concurrencia | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
Trabajos retrasados | ✓ | ✓ | ✓ | ✓ | ✓ | |
Eventos globales | ✓ | ✓ | ✓ | ✓ | ||
Limitador de velocidad | ✓ | ✓ | ✓ | |||
Pausa/Reanudar | ✓ | ✓ | ✓ | ✓ | ||
Trabajador de espacio aislado | ✓ | ✓ | ✓ | |||
Trabajos repetibles | ✓ | ✓ | ✓ | ✓ | ||
Operaciones atómicas | ✓ | ✓ | ✓ | ✓ | ||
Persistencia | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
interfaz de usuario | ✓ | ✓ | ✓ | ✓ | ✓ | |
Optimizado para | Trabajos / Mensajes | Trabajos / Mensajes | Trabajos / Mensajes | Empleos | Mensajes | Empleos |
npm instala toro --guardar
o
hilo agregar toro
Requisitos: Bull requiere una versión de Redis mayor o igual a 2.8.18
.
npm instala @types/bull --save-dev
hilo agregar --dev @types/bull
Actualmente, las definiciones se mantienen en el repositorio DefinitelyTyped.
Damos la bienvenida a todo tipo de contribuciones, ya sea correcciones de código, nuevas funciones o mejoras de documentos. El formato del código lo aplica más bonito. Para confirmaciones, siga la convención de confirmaciones convencional. Todo el código debe pasar reglas de pelusa y conjuntos de pruebas antes de poder fusionarse en desarrollo.
const Queue = require('bull');const videoQueue = new Queue('transcodificación de video', 'redis://127.0.0.1:6379');const audioQueue = new Queue('transcodificación de audio', { redis: { puerto : 6379, host: '127.0.0.1', contraseña: 'foobared' } }); // Especifique la conexión de Redis usando objectconst imageQueue = new Queue('transcodificación de imágenes');const pdfQueue = new Queue('transcodificación de pdf');videoQueue.process(function (trabajo, hecho) { // job.data contiene los datos personalizados pasados cuando se creó el trabajo // job.id contiene la identificación de este trabajo. // transcodificar vídeo de forma asincrónica e informar el progreso trabajo.progreso(42); // llamada realizada cuando termine hecho(); // o dar un error si error hecho (nuevo error ('transcodificación de error')); // o pasarle un resultado hecho(null, { framerate: 29.5 /* etc... */ }); // Si el trabajo arroja una excepción no controlada, también se maneja correctamente throw new Error('algún error inesperado');});audioQueue.process(function (trabajo, hecho) { // transcodifica audio de forma asincrónica e informa el progreso trabajo.progreso(42); // llamada realizada cuando termine hecho(); // o dar un error si error hecho (nuevo error ('transcodificación de error')); // o pasarle un resultado hecho(nulo, { frecuencia de muestreo: 48000 /* etc... */ }); // Si el trabajo arroja una excepción no controlada, también se maneja correctamente throw new Error('algún error inesperado');});imageQueue.process(function (trabajo, hecho) { // transcodificar la imagen de forma asincrónica e informar el progreso trabajo.progreso(42); // llamada realizada cuando termine hecho(); // o dar un error si error hecho (nuevo error ('transcodificación de error')); // o pasarle un resultado hecho(nulo, {ancho: 1280, alto: 720 /* etc... */ }); // Si el trabajo arroja una excepción no controlada, también se maneja correctamente throw new Error('algún error inesperado');});pdfQueue.process(function (trabajo) { // Los procesadores también pueden devolver promesas en lugar de utilizar la devolución de llamada hecha return pdfAsyncProcessor();});videoQueue.add({ video: 'http://example.com/video1.mov' });audioQueue.add({ audio: 'http://example.com/audio1.mp3 ' });imageQueue.add({ imagen: 'http://example.com/image1.tiff' });
Alternativamente, puedes devolver promesas en lugar de utilizar la devolución de llamada done
:
videoQueue.process(function (job) { // ¡no olvides eliminar la devolución de llamada realizada! // Simplemente devuelve una promesa return fetchVideo(job.data.url).luego(transcodeVideo); // Maneja el rechazo de la promesa return Promise.reject(new Error('transcodificación de error')); // Pasa el valor con el que se resuelve la promesa al evento "completado" return Promise.resolve({ framerate: 29.5 /* etc... */ }); // Si el trabajo arroja una excepción no controlada, también se maneja correctamente lanzar nuevo Error('algún error inesperado'); // igual que return Promise.reject(new Error('algún error inesperado'));});
La función de proceso también se puede ejecutar en un proceso separado. Esto tiene varias ventajas:
El proceso está protegido por lo que si falla no afecta al trabajador.
Puede ejecutar código de bloqueo sin afectar la cola (los trabajos no se detendrán).
Utilización mucho mejor de las CPU multinúcleo.
Menos conexiones a redis.
Para utilizar esta función, simplemente cree un archivo separado con el procesador:
// procesador.jsmodule.exports = función (trabajo) { // Haz un trabajo pesado devolver Promesa.resolve(resultado);}
Y define el procesador así:
// Proceso único:queue.process('/path/to/my/processor.js');// También puedes usar concurrencia:queue.process(5, '/path/to/my/processor.js' );// y procesadores con nombre:queue.process('mi procesador', 5, '/ruta/a/mi/procesador.js');
Se puede agregar un trabajo a una cola y procesarlo repetidamente según una especificación cron:
pagosQueue.process(función (trabajo) {// Verificar pagos }); // Repetir el trabajo de pago una vez al día a las 3:15 (am) pagosQueue.add(pagosData, { repetir: { cron: '15 3 * * *' } });
Como consejo, revisa tus expresiones aquí para verificar que sean correctas: generador de expresiones cron
Una cola se puede pausar y reanudar globalmente (pase true
para pausar el procesamiento solo para este trabajador):
cola.pausa().entonces(función () { // la cola está en pausa ahora});queue.resume().then(function () { // la cola se reanuda ahora})
Una cola emite algunos eventos útiles, por ejemplo...
.on('completado', función (trabajo, resultado) { // ¡Trabajo completado con resultado de salida!})
Para obtener más información sobre eventos, incluida la lista completa de eventos que se activan, consulte la referencia de Eventos
Las colas son baratas, así que si necesitas muchas simplemente crea otras nuevas con nombres diferentes:
const usuarioJohn = nueva cola('john');const usuarioLisa = nueva cola('lisa');...
Sin embargo, cada instancia de cola requerirá nuevas conexiones de Redis, consulte cómo reutilizar las conexiones o también puede usar procesadores con nombre para lograr un resultado similar.
NOTA: A partir de la versión 3.2.0 y superiores, se recomienda utilizar procesadores con subprocesos.
Las colas son sólidas y se pueden ejecutar en paralelo en varios subprocesos o procesos sin ningún riesgo de peligros o corrupción de colas. Consulte este ejemplo sencillo que utiliza un clúster para paralelizar trabajos entre procesos:
const Queue = require('bull');const cluster = require('cluster');const numWorkers = 8;const queue = new Queue('prueba de cola concurrente');if (cluster.isMaster) { for (let i = 0; i < numWorkers; i++) {cluster.fork(); } cluster.on('online', function (worker) {// Creemos algunos trabajos para los trabajadores de la colafor (let i = 0; i < 500; i++) { queue.add({ foo: 'bar' }); }; }); cluster.on('salida', función (trabajador, código, señal) {console.log('trabajador ' + trabajador.proceso.pid + 'muerto'); });} demás { queue.process(function (trabajo, trabajoDone) {console.log('Trabajo realizado por el trabajador', cluster.worker.id, job.id);jobDone(); });}
Para obtener la documentación completa, consulte la referencia y los patrones comunes:
Guía: su punto de partida para desarrollar con Bull.
Referencia: documento de referencia con todos los objetos y métodos disponibles.
Patrones: un conjunto de ejemplos de patrones comunes.
Licencia, la licencia Bull, es el MIT.
Si ve algo que pueda necesitar más documentos, envíe una solicitud de extracción.
La cola apunta a una estrategia de trabajo "al menos una vez". Esto significa que, en algunas situaciones, un trabajo podría procesarse más de una vez. Esto sucede principalmente cuando un trabajador no mantiene un candado para un trabajo determinado durante la duración total del procesamiento.
Cuando un trabajador está procesando un trabajo, lo mantendrá "bloqueado" para que otros trabajadores no puedan procesarlo.
Es importante comprender cómo funciona el bloqueo para evitar que sus trabajos pierdan el bloqueo (se detengan ) y, como resultado, se reinicien. El bloqueo se implementa internamente creando un bloqueo para lockDuration
en el intervalo lockRenewTime
(que suele ser la mitad lockDuration
). Si lockDuration
transcurre antes de que se pueda renovar el bloqueo, el trabajo se considerará detenido y se reiniciará automáticamente; será procesado dos veces . Esto puede suceder cuando:
El proceso de Nodo que ejecuta su procesador de trabajos finaliza inesperadamente.
Su procesador de trabajo consumía demasiado la CPU y detuvo el ciclo de eventos del Nodo y, como resultado, Bull no pudo renovar el bloqueo del trabajo (consulte el n.° 488 para saber cómo podemos detectar mejor esto). Puede solucionar este problema dividiendo su procesador de trabajos en partes más pequeñas para que ninguna parte pueda bloquear el bucle de eventos del Nodo. Alternativamente, puede pasar un valor mayor para la configuración lockDuration
(con la desventaja de que tomará más tiempo reconocer un trabajo realmente estancado).
Como tal, siempre debe escuchar el evento stalled
y registrarlo en su sistema de monitoreo de errores, ya que esto significa que es probable que sus trabajos se procesen dos veces.
Como medida de seguridad para que los trabajos problemáticos no se reinicien indefinidamente (por ejemplo, si el procesador de trabajos siempre falla su proceso de Nodo), los trabajos se recuperarán de un estado estancado un máximo de maxStalledCount
veces (predeterminado: 1
).