Motor de cola de tareas respaldado por Redis con control de tareas avanzado y coherencia eventual.
Agrupación de tareas, encadenamiento, iteradores para rangos enormes.
Ejecución de tareas pospuestas y programadas.
Distribución de carga + grupos de trabajadores.
Fácil de empotrar.
idoit
proporciona control avanzado para implementarlo
Agrupación . La tarea group
especial ejecuta las tareas secundarias y espera hasta que se completen todas. Útil para lógica de mapa/reducción.
Encadenamiento . Tarea chain
especial ejecuta a los niños uno por uno. También es útil para reducir mapas o dividir tareas muy complicadas en pasos más simples.
Iterador de mapeo . Característica especial para cargas útiles enormes, para producir trozos bajo demanda. Beneficios:
No hay retrasos en la fase de mapeo, el procesamiento de fragmentos comienza inmediatamente.
Consultas de base de datos fáciles de optimizar para crear fragmentos de igual tamaño (las consultas de omisión + límite son muy lentas con datos enormes).
Progreso . Cuando utiliza escenarios de grupos/cadenas/mapas, es fácil monitorear el progreso total a través del padre superior. Las tareas independientes largas también pueden notificar al usuario sobre el cambio de progreso.
Bolsas de trabajadores . Puede dividir tareas por diferentes procesos. Por ejemplo, si no desea que las tareas pesadas bloqueen las ligeras.
Programador . El cron incorporado permite ejecutar tareas en un cronograma determinado.
Todos los datos en redis son siempre consistentes.
La tarea no se puede perder, pero PUEDE ejecutarse dos veces en casos extremos (si el proceso falla cuando la función de la tarea estaba a punto de finalizar)
El progreso puede contar "más rápido" si se usa task.progressAdd()
y el proceso falla antes de que se complete la tarea. Pero eso no es crítico, ya que dicha información sólo puede usarse para actualizaciones de las barras de progreso de la interfaz. En la mayoría de los casos no verás la diferencia.
Se requiere node.js
6+ y redis
3.0+.
npm instala idoit --guardar
redisURL (Cadena): URL de conexión de Redis.
simultaneidad (Número): máximo de tareas a consumir en paralelo por un solo trabajador, 100 de forma predeterminada.
pool (Cadena): nombre del grupo de trabajadores, "predeterminado" si no está configurado. Se utiliza si esta instancia de cola consume tareas únicamente (después de .start()
). Puede enrutar tareas a grupos específicos de trabajadores para evitar bloqueos no deseados. Puede configurar pool
en Array, [ 'pool1', 'pool2' ]
para consumir tareas de varios grupos (para fines de desarrollo/prueba).
ns (Cadena): espacio de nombres de datos, actualmente utilizado como prefijo de claves de Redis, "idoitqueue:" de forma predeterminada.
Es una buena práctica tener grupos de trabajadores separados para las tareas de bloqueo pesado y las que no lo son. Por ejemplo, nadie debería bloquear el envío de correos electrónicos urgentes. Por lo tanto, cree varios procesos de trabajo, fíjelos a diferentes grupos y establezca la simultaneidad adecuada de tareas. Las tareas sin bloqueo se pueden consumir en paralelo y puede estar bien con concurrency
predeterminada = 100. Las tareas de bloqueo deben consumirse una por una, establezca concurrency
= 1 para esos trabajadores.
Nota. Puede suceder que elimine algunos tipos de tareas de su aplicación. En este caso, los datos huérfanos se borrarán después de 3 días.
Opciones:
nombre (Cadena): el nombre de la tarea.
baseClass (Función): opcional, constructor de la tarea base, "Tarea" de forma predeterminada.
init (Función): opcional, se utiliza para la inicialización de tareas asíncronas y debe devolver Promise
this (Objeto): tarea actual (el total de la tarea está disponible como this.total
).
taskID (función): opcional, debe devolver una nueva identificación de tarea. Solo es necesario para crear tareas "exclusivas", devuelve un valor aleatorio de forma predeterminada, llamado como: function (taskData)
. Azúcar: si pasa una cadena simple, se ajustará a la función, que siempre devuelve esta cadena.
proceso (Función) - función de tarea principal, denominada como: task.process(...args)
. Debería devolver Promise
este (Objeto) - tarea actual.
reintento (Número): opcional, número de reintentos en caso de error, valor predeterminado 2.
retryDelay (Número): opcional, retraso en ms después de los reintentos, valor predeterminado 60000 ms.
tiempo de espera (Número): opcional, tiempo de espera de ejecución, predeterminado 120000 ms.
total (Número): opcional, valor de progreso máximo, predeterminado 1. Si no modifica el comportamiento, el progreso comienza con 0 y se convierte en 1 al finalizar la tarea.
posponerDelay (Número): opcional, si se llama a posponer sin demora, se supone que la demora es igual a este valor (en milisegundos).
cron (Cadena): opcional, cadena cron ("15 */6 * * *"), nulo predeterminado.
pista (Número): predeterminado 3600000 ms (1 hora). Es hora de recordar las tareas programadas desde cron para evitar volver a ejecutarlas si varios servidores del clúster tienen relojes incorrectos. No lo establezcas demasiado alto para tareas muy frecuentes, ya que puede ocupar mucha memoria.
Obtener tarea por identificación. Devuelve una Promesa resuelta con tarea o con null
si la tarea no existe.
Campos de tareas que puede utilizar:
total - progreso total de la tarea
progreso - progreso de la tarea actual
resultado - el resultado de la tarea
error - el error de la tarea
Cancelar tarea. Devuelve una Promesa resuelta con tarea.
Nota. Puede cancelar solo tareas sin padre.
Iniciar trabajador y comenzar a consumir datos de tareas. Promise
de devolución, resuelta cuando la cola esté lista (llame a .ready()
dentro).
Si se especificó pool
en cunstructor, solo se consumirán las tareas enrutadas a esta extracción.
Deja de aceptar nuevas tareas de la cola. Promise
de devolución, resuelta cuando se completen todas las tareas activas de este trabajador.
Promise
de devolución, resuelta cuando la cola está lista para funcionar (después del evento 'conectar', ver más abajo).
Actualiza las opciones del constructor, excepto redisURL.
idoit
es una instancia EventEmitter
que activa algunos eventos:
ready
cuando la conexión de Redis esté activa y se puedan ejecutar comandos (las tareas se pueden registrar sin conexión)
error
cuando ha ocurrido un error.
task:progress
, task:progress:<task_id>
- cuando la tarea actualiza el progreso. Los datos del evento son: {id, uid, total, progreso}
task:end
, task:end:<task_id>
- cuando finaliza la tarea. Los datos del evento son: {id, uid}
Cree una nueva tarea con parámetros opcionales.
Anular las propiedades de la tarea. Por ejemplo, es posible que desee asignar tareas de grupo/cadena específicas a otro grupo.
Ejecute la tarea inmediatamente. Devuelve una Promesa resuelta con id de tarea.
Posponga la ejecución de la tarea para delay
milisegundos (o task.postponeDelay
).
Devuelve una Promesa resuelta con id de tarea.
Reinicie la tarea que se está ejecutando actualmente.
add_retry (booleano): opcional, si se desea aumentar el recuento de reintentos o no (predeterminado: falso)
Si es true
, el número de reintentos aumenta y la tarea no se reinicia en caso de que se exceda.
Si es false
, el recuento de reintentos sigue siendo el mismo, por lo que una tarea puede reiniciarse indefinidamente.
retraso (Número) retraso antes del reinicio en milisegundos (predeterminado: task.retryDelay
).
Tenga en cuenta que idoit
ya tiene una lógica de reinicio incorporada en caso de errores de tareas. Probablemente, no deberías utilizar este método directamente. Está expuesto para casos muy específicos.
Incrementar el progreso de la tarea actual.
Devuelve una Promesa resuelta con id de tarea.
Actualizar la fecha límite de la tarea actual.
Devuelve una Promesa resuelta con id de tarea.
Crea una nueva tarea, ejecutando niños en paralelo.
cola.grupo([ cola.niños1(), cola.niños2(), cola.children3()]).run()
El resultado del grupo es una matriz sin clasificar de resultados secundarios.
Crea una nueva tarea, ejecutando niños en serie. Si alguno de los niños falla, la cadena también falla.
queue.registerTask('multiplicar', (a, b) => a * b);queue.registerTask('restar', (a, b) => a - b);queue.chain([ cola.multiplicar(2, 3), // 2 * 3 = 6 cola.restar(10), // 10 - 6 = 4 cola.multiplicar(3) // 3 * 4 = 12]).ejecutar()
El resultado de la tarea anterior se pasa como último argumento de la siguiente tarea. El resultado de la cadena es el resultado de la última tarea de la cadena.
Una forma especial de ejecutar mapeos enormes en estilo perezoso (bajo demanda). Vea los comentarios a continuación.
// registra el iterador taskqueue.registerTask({ nombre: 'lazy_mapper', baseClass: Cola.Iterator, // Este método se llama al inicio de la tarea y al final de cada hijo. puede ser // una función generadora o función que devuelve `Promise`. * iterar(estado) {// ...// Tres tipos de estados de salida posibles: finalizado, no hacer nada y datos nuevos.//// 1. `null` - final alcanzado, ya no se debe llamar al iterador.// 2. `{}` - inactivo, hay suficientes subtareas en la cola, intente llamar // iterador más tarde (cuando finalice el próximo niño). // 3. {// estado - nuevo estado del iterador para recordar (por ejemplo, desplazamiento for// consulta de base de datos), cualquier dato serializable// tareas: conjunto de nuevas subtareas para colocar en la cola// }//// ¡IMPORTANTE! El iterador se puede llamar en paralelo desde diferentes trabajadores. Nosotros// utilizamos la entrada `estado` para resolver colisiones en la actualización de Redis. Entonces, si // creas nuevas subtareas: //// 1. el nuevo `estado` DEBE ser diferente (para todos los estados anteriores)// 2. la matriz de `tareas` NO DEBE estar vacía.//// En otro caso, debe indicar sobre 'fin' o 'inactivo'.//// La combinación no válida provocará 'fin' + evento de error.//return { state: newState, task: chunksArray}; }});// ejecutar iteratorqueue.lazy_mapper().run();
¿Por qué se inventó esta loca magia?
Imagine que necesita reconstruir 10 millones de publicaciones en un foro. Desea dividir el trabajo en partes pequeñas iguales, pero las publicaciones no tienen una enumeración de números enteros secuenciales, solo ID de Mongo. ¿Qué puedes hacer?
Las solicitudes directas skip
+ limit
son muy costosas en colecciones grandes en cualquier base de datos.
No puedes dividir por intervalos de fechas, porque la densidad de publicaciones varía mucho desde la primera hasta la última publicación.
Puede agregar un campo indexado con un número aleatorio a cada publicación. Luego divida por intervalos. Eso funcionará, pero provocará un acceso aleatorio al disco, lo que no es genial.
La solución es utilizar un mapeador iterativo, que puede recordar la "posición anterior". En este caso, realizará solicitudes range
+ limit
en lugar de skip
+ limit
. Eso funciona bien con las bases de datos. Los bonos adicionales son:
No es necesario mantener todas las subtareas en cola. Por ejemplo, puede crear 100 fragmentos y agregar los siguientes 100 cuando los anteriores estén a punto de terminar.
La fase de mapeo se distribuye y usted puede comenzar a monitorear el progreso total de inmediato.
Redis de ejecución rápida a través de la ventana acoplable:
# startdocker run -d -p 6379:6379 --name redis1 redis# stopdocker stop redis1 ventana acoplable redis1
Por supuesto, estamos familiarizados con el kue, el apio y el akka. Nuestro objetivo era lograr un equilibrio entre simplicidad y potencia. Entonces, no sabemos si idoit
funciona bien en un clúster con miles de instancias. Pero debería funcionar en volúmenes más pequeños y es realmente fácil de usar.
kue no estaba bien para nuestras necesidades, porque:
Su concepto de "prioridades" no es flexible y no protege bien de bloqueos por tareas pesadas.
sin agrupación/encadenamiento de tareas, etc.
No hay garantías sólidas de coherencia de los datos.
En idoit
nos preocupamos por:
operaciones de grupo/cadena de tareas y transferencia de datos entre tareas (similar al apio)
grupos de trabajadores para aislar la ejecución de tareas por tipos.
fácil de usar e instalar (solo se necesita redis, se puede ejecutar en un proceso existente)
eventual coherencia de los datos almacenados
azúcar esencial como programador incorporado
Mapeador iterativo para cargas útiles enormes (característica única, muy útil para muchas tareas de mantenimiento).
seguimiento del progreso de la tarea
evitar bloqueos globales
Redis aún puede ser un punto de falla, pero ese es un precio aceptable por la simplicidad. Por supuesto, puede obtener una mejor disponibilidad a través de buses de mensajes distribuidos como RMQ. Pero en muchos casos es más importante mantener las cosas simples. Con idoit
puedes reutilizar tecnologías existentes sin gastos adicionales.