Moteur de file d'attente de tâches basé sur Redis avec contrôle avancé des tâches et cohérence éventuelle.
Regroupement de tâches, chaînage, itérateurs pour de vastes plages.
Exécution de tâches reportées et planifiées.
Répartition de la charge + pools de travailleurs.
Facile à intégrer.
idoit
fournit un contrôle avancé pour implémenter ainsi
Regroupement . Une tâche group
spéciale exécute les tâches des enfants et attend que tout soit terminé. Utile pour la logique de mappage/réduction.
Enchaînement . Une tâche chain
spéciale exécute les enfants un par un. Également utile pour réduire la carte ou diviser des tâches très compliquées en étapes plus simples.
Itérateur de mappage . Fonctionnalité spéciale pour les charges utiles énormes, pour produire des morceaux à la demande. Avantages:
Aucun retard lors de la phase de mappage, le traitement des morceaux démarre immédiatement.
Requêtes de base de données faciles à optimiser pour créer des morceaux de taille égale (les requêtes de saut + limite sont très lentes sur des données volumineuses).
Progrès . Lorsque vous utilisez des scénarios de groupes/chaînes/cartes, il est facile de surveiller la progression totale via le parent principal. Les tâches autonomes longues peuvent également informer l'utilisateur des changements de progression.
Pools de travailleurs . Vous pouvez diviser les tâches selon différents processus. Par exemple, si vous ne souhaitez pas que les tâches lourdes bloquent les tâches légères.
Planificateur . Le cron intégré permet d'exécuter des tâches selon un calendrier donné.
Toutes les données dans Redis sont toujours cohérentes.
La tâche ne peut pas être perdue, mais PEUT s'exécuter deux fois dans les cas extrêmes (si le processus plante alors que la fonction de la tâche était sur le point de se terminer)
La progression peut être comptée "plus rapidement" si task.progressAdd()
est utilisée et le processus plante avant la fin de la tâche. Mais ce n'est pas critique, puisque ces informations ne peuvent être utilisées que pour les mises à jour des barres de progression de l'interface. Dans la plupart des cas, vous ne verrez pas la différence.
node.js
6+ et redis
3.0+ requis.
npm installer idoit --save
redisURL (String) - URL de connexion Redis.
concurrence (Nombre) - tâches maximales à consommer en parallèle par un seul travailleur, 100 par défaut.
pool (String) - nom du pool de travailleurs, "par défaut" s'il n'est pas défini. Utilisé si cette instance de file d'attente consomme uniquement des tâches (après .start()
). Vous pouvez acheminer les tâches vers des pools spécifiques de travailleurs pour éviter les verrous indésirables. Vous pouvez définir pool
sur Array, [ 'pool1', 'pool2' ]
pour consommer des tâches de plusieurs pools (à des fins de développement/test).
ns (String) - espace de noms de données, actuellement utilisé comme préfixe de clés Redis, "idoitqueue:" par défaut.
C'est une bonne pratique d'avoir des pools de travailleurs séparés pour les tâches lourdes bloquantes et celles non bloquantes. Par exemple, personne ne devrait bloquer l’envoi d’e-mails urgents. Créez donc plusieurs processus de travail, épinglez-les dans différents pools et définissez la simultanéité appropriée des tâches. Les tâches non bloquantes peuvent être exécutées en parallèle, et vous pouvez accepter concurrency
par défaut = 100. Les tâches bloquantes doivent être consommées une par une, définissez concurrency
= 1 pour ces travailleurs.
Note. Il peut arriver que vous supprimiez certains types de tâches de votre application. Dans ce cas, les données orphelines seront effacées après 3 jours.
Possibilités :
name (String) - le nom de la tâche.
baseClass (Fonction) - facultatif, constructeur de la tâche de base, "Task" par défaut.
init (Fonction) - facultatif, utilisé pour l'initialisation des tâches asynchrones, doit renvoyer Promise
this (Object) - tâche en cours (le total de la tâche est disponible sous le nom this.total
).
taskID (Fonction) - facultatif, doit renvoyer un nouvel identifiant de tâche. Nécessaire uniquement pour créer des tâches "exclusives", renvoie une valeur aléatoire par défaut, appelée comme : function (taskData)
. Sugar : si vous transmettez une chaîne simple, elle sera enveloppée pour fonctionner, qui renvoie toujours cette chaîne.
process (Fonction) - fonction de tâche principale, appelée comme suit : task.process(...args)
. Devrait renvoyer Promise
this (Objet) - tâche en cours.
nouvelle tentative (Nombre) - facultatif, nombre de nouvelles tentatives en cas d'erreur, par défaut 2.
retryDelay (Nombre) - facultatif, délai en ms après les tentatives, par défaut 60 000 ms.
timeout (Nombre) - facultatif, délai d'attente d'exécution, par défaut 120 000 ms.
total (Nombre) - facultatif, valeur de progression maximale, par défaut 1. Si vous ne modifiez pas le comportement, la progression commence par 0 et devient 1 à la fin de la tâche.
reporterDelay (Nombre) - facultatif, si le report est appelé sans délai, le délai est supposé être égal à cette valeur (en millisecondes).
cron (String) - facultatif, chaîne cron ("15 */6 * * *"), valeur nulle par défaut.
piste (numéro) - par défaut 3600000ms (1h). Il est temps de se souvenir des tâches planifiées de cron pour éviter une réexécution si plusieurs serveurs du cluster ont des horloges incorrectes. Ne réglez pas une valeur trop élevée pour les tâches très fréquentes, car cela peut occuper beaucoup de mémoire.
Obtenez la tâche par identifiant. Renvoie une promesse résolue avec une tâche ou avec null
si la tâche n'existe pas.
Champs de tâches que vous pouvez utiliser :
total - progression totale de la tâche
progression - progression de la tâche en cours
résultat - le résultat de la tâche
erreur - l'erreur de tâche
Annuler la tâche. Renvoie une promesse résolue avec une tâche.
Note. Vous pouvez annuler uniquement les tâches sans parent.
Démarrez le travailleur et commencez à consommer les données de la tâche. Return Promise
, résolu lorsque la file d'attente est prête (appelez .ready()
à l'intérieur).
Si pool
a été spécifié dans le constructeur, seules les tâches acheminées vers cette extraction seront consommées.
Arrêtez d'accepter de nouvelles tâches de la file d'attente. Return Promise
, résolu lorsque toutes les tâches actives de ce travailleur sont terminées.
Return Promise
, résolu lorsque la file d'attente est prête à fonctionner (après l'événement 'connect', voir ci-dessous).
Mettre à jour les options du constructeur, à l'exception de redisURL.
idoit
est une instance EventEmitter
qui déclenche certains événements :
ready
lorsque la connexion Redis est établie et que les commandes peuvent être exécutées (les tâches peuvent être enregistrées sans connexion)
error
lorsqu'une erreur s'est produite.
task:progress
, task:progress:<task_id>
- lorsque la mise à jour de la tâche progresse. Les données de l'événement sont : { id, uid, total, progress }
task:end
, task:end:<task_id>
- à la fin de la tâche. Les données de l'événement sont : { id, uid }
Créez une nouvelle tâche avec des paramètres facultatifs.
Remplacez les propriétés de la tâche. Par exemple, vous souhaiterez peut-être attribuer des tâches de groupe/chaîne spécifiques à un autre pool.
Exécutez la tâche immédiatement. Renvoie une promesse résolue avec l'identifiant de la tâche.
Reportez l'exécution de la tâche pour delay
quelques millisecondes (ou pour task.postponeDelay
).
Renvoie une promesse résolue avec l'identifiant de la tâche.
Redémarrez la tâche en cours d'exécution.
add_retry (Boolean) - facultatif, s'il faut augmenter ou non le nombre de tentatives (par défaut : false)
si true
, le nombre de nouvelles tentatives est augmenté et la tâche n'est pas redémarrée en cas de dépassement
si false
, le nombre de tentatives reste le même, donc une tâche peut se redémarrer indéfiniment
delay (Nombre) délai avant le redémarrage en millisecondes (par défaut : task.retryDelay
).
Notez idoit
dispose déjà d'une logique de redémarrage intégrée en cas d'erreur de tâche. Vous ne devriez probablement pas utiliser cette méthode directement. Il est exposé pour des cas très précis.
Augmentez la progression de la tâche actuelle.
Renvoie une promesse résolue avec l'identifiant de la tâche.
Mettre à jour la date limite actuelle de la tâche.
Renvoie une promesse résolue avec l'identifiant de la tâche.
Créez une nouvelle tâche en exécutant les enfants en parallèle.
file d'attente.group([ file d'attente.enfants1(), file d'attente.enfants2(), queue.children3()]).run()
Le résultat du groupe est un tableau non trié de résultats d’enfants.
Créez une nouvelle tâche en exécutant les enfants en série. Si l'un des enfants échoue, la chaîne échoue également.
queue.registerTask('multiply', (a, b) => a * b);queue.registerTask('subtract', (a, b) => a - b);queue.chain([ file d'attente.multiply(2, 3), // 2 * 3 = 6 file d'attente.subtract(10), // 10 - 6 = 4 queue.multiply(3) // 3 * 4 = 12]).run()
Résultat de la tâche précédente transmis comme dernier argument de la tâche suivante. Le résultat de la chaîne est le résultat de la dernière tâche de la chaîne.
Une façon spéciale d'exécuter un mappage énorme dans un style paresseux (à la demande). Voir les commentaires ci-dessous.
// enregistre l'itérateur taskqueue.registerTask({ nom : 'lazy_mapper', baseClass : Queue.Itérateur, // Cette méthode est appelée au début de la tâche et à la fin de chaque enfant. Cela peut être // une fonction génératrice ou une fonction qui renvoie `Promise`. * iterate(state) {// ...// Trois types d'états de sortie possibles : terminé, ne rien faire et nouvelles données.//// 1. `null` - fin atteinte, l'itérateur ne doit plus être appelé.// 2. `{}` - inactif, il y a suffisamment de sous-tâches dans la file d'attente, essayez d'appeler // l'itérateur plus tard (lorsque le prochain enfant aura terminé).// 3. {// state - nouvel état de l'itérateur à retenir (par exemple, décalage pour // requête de base de données), toutes les données sérialisables // tâches - tableau de nouvelles sous-tâches à placer dans la file d'attente // }//// IMPORTANT ! L'itérateur peut être appelé en parallèle à partir de différents travailleurs. Nous // utilisons l'entrée `state` pour résoudre les collisions lors de la mise à jour Redis. Donc, si vous// créez de nouvelles sous-tâches://// 1. le nouvel « état » DOIT être différent (pour tous les états précédents)// 2. Le tableau « tâches » NE DOIT PAS être vide.//// Dans les autres cas, vous devrait signaler 'end' ou 'idle'.//// Une combinaison invalide provoquera 'end' + événement d'erreur.//return { state: newState, tâches: chunksArray}; }});// exécutez iteratorqueue.lazy_mapper().run();
Pourquoi cette folle magie a-t-elle été inventée ?
Imaginez que vous ayez besoin de reconstruire 10 millions de messages sur le forum. Vous souhaitez diviser le travail en petits morceaux égaux, mais les publications n'ont pas d'énumération entière séquentielle, seulement des identifiants mongo. Que pouvez-vous faire ?
Les demandes directes skip
+ limit
sont très coûteuses sur les grandes collections de n'importe quelle base de données.
Vous ne pouvez pas diviser par intervalles de dates, car la densité des publications varie beaucoup du premier au dernier message.
Vous pouvez ajouter un champ indexé avec un nombre aléatoire à chaque publication. Puis divisez par intervalles. Cela fonctionnera, mais entraînera un accès aléatoire au disque – ce n’est pas cool.
La solution consiste à utiliser un mappeur itératif, qui peut mémoriser la "position précédente". Dans ce cas, vous ferez des requêtes range
+ limit
au lieu de skip
+ limit
. Cela fonctionne bien avec les bases de données. Les bonus supplémentaires sont :
Vous n'avez pas besoin de conserver toutes les sous-tâches en file d'attente. Par exemple, vous pouvez créer 100 morceaux et ajouter les 100 suivants lorsque les précédents sont sur le point de se terminer.
La phase de cartographie est distribuée et vous pouvez commencer immédiatement à surveiller la progression totale.
Redis exécuté rapidement via Docker :
# startdocker run -d -p 6379:6379 --name redis1 redis# stopdocker stop redis1 docker rm redis1
Bien entendu, nous connaissons le kue, le céleri et l’akka. Notre objectif était d'avoir un équilibre entre simplicité et puissance. Nous ne savons donc pas si idoit
fonctionne bien dans un cluster contenant des milliers d'instances. Mais cela devrait convenir dans des volumes plus petits et c'est vraiment facile à utiliser.
kue ne répondait pas à nos besoins, car :
son concept de "priorités" n'est pas flexible et ne protège pas bien des blocages dus aux tâches lourdes
pas de regroupement/chaînage de tâches, etc.
aucune garantie solide de cohérence des données
En idoit
nous nous souciions de :
opérations de groupe de tâches/chaîne et transmission de données entre les tâches (similaire au céleri)
pools de tâches pour isoler l’exécution des tâches par types.
facile à utiliser et à installer (seul Redis est nécessaire, peut s'exécuter dans un processus existant)
cohérence éventuelle des données stockées
sucre essentiel comme planificateur intégré
mappeur itératif pour d'énormes charges utiles (fonctionnalité unique, très utile pour de nombreuses tâches de maintenance)
suivi de l'avancement des tâches
éviter les verrous globaux
Redis peut toujours être un point d'échec, mais c'est un prix acceptable pour la simplicité. Bien sûr, vous pouvez obtenir une meilleure disponibilité via des bus de messages distribués comme RMQ. Mais dans de nombreux cas, il est plus important de garder les choses simples. Avec idoit
vous pouvez réutiliser les technologies existantes sans dépenses supplémentaires.