Mecanismo de fila de tarefas apoiado por Redis com controle avançado de tarefas e consistência eventual.
Agrupamento de tarefas, encadeamento, iteradores para intervalos enormes.
Execução de tarefa adiada e agendada.
Distribuição de carga + pools de trabalhadores.
Fácil de incorporar.
idoit
fornece controle avançado para implementar
Agrupamento . Tarefa group
especial executa tarefas infantis e espera até que todas sejam concluídas. Útil para lógica de mapa/redução.
Encadeamento . A tarefa especial chain
executa as crianças uma por uma. Também é útil para reduzir mapas ou dividir tarefas muito complicadas em etapas mais simples.
Iterador de mapeamento . Recurso especial para cargas enormes, para produzir pedaços sob demanda. Benefícios:
Sem atrasos na fase de mapeamento, o processamento dos pedaços começa imediatamente.
Fácil de otimizar consultas de banco de dados para criar blocos de tamanho igual (consultas de salto + limite são muito lentas em dados enormes).
Progresso . Ao usar cenários de grupos/cadeia/mapa, é fácil monitorar o progresso total por meio do pai superior. Tarefas autônomas longas também podem notificar o usuário sobre alterações no progresso.
Pools de trabalhadores . Você pode dividir tarefas por processos diferentes. Por exemplo, se você não deseja que tarefas pesadas bloqueiem as leves.
Sheduler . O cron integrado permite executar tarefas em um determinado cronograma.
Todos os dados no redis são sempre consistentes.
A tarefa não pode ser perdida, mas PODE ser executada duas vezes em casos extremos (se o processo falhar quando a função da tarefa estiver prestes a terminar)
O progresso pode contar "mais rápido" se task.progressAdd()
for usado e o processo travar antes da conclusão da tarefa. Mas isso não é crítico, já que essas informações podem ser usadas apenas para atualizações das barras de progresso da interface. Na maioria dos casos você não verá a diferença.
node.js
6+ e redis
3.0+ necessários.
npm instalar idoit --save
redisURL (String) - url de conexão redis.
simultaneidade (Número) - máximo de tarefas a serem consumidas em paralelo por um único trabalhador, 100 por padrão.
pool (String) - nome do pool de trabalhadores, "padrão" se não estiver definido. Usado se esta instância de fila consumir apenas tarefas (após .start()
). Você pode rotear tarefas para grupos específicos de trabalhadores para evitar bloqueios indesejados. Você pode definir pool
como Array, [ 'pool1', 'pool2' ]
para consumir tarefas de vários pools (para fins de desenvolvimento/teste).
ns (String) - namespace de dados, atualmente usado como prefixo de chaves redis, "idoitqueue:" por padrão.
É uma boa prática ter pools de trabalhadores separados para tarefas pesadas com bloqueio e sem bloqueio. Por exemplo, ninguém deve bloquear o envio de e-mails urgentes. Portanto, crie vários processos de trabalho, fixe-os em pools diferentes e defina a simultaneidade de tarefas adequada. As tarefas sem bloqueio podem ser consumidas em paralelo e você pode aceitar concurrency
padrão = 100. As tarefas com bloqueio devem ser consumidas uma por uma, defina concurrency
= 1 para esses trabalhadores.
Observação. Pode acontecer que você remova alguns tipos de tarefas do seu aplicativo. Neste caso, os dados órfãos serão apagados após 3 dias.
Opções:
name (String) - o nome da tarefa.
baseClass (Function) - opcional, construtor da tarefa base, "Task" por padrão.
init (Function) - opcional, usado para inicialização de tarefa assíncrona, deve retornar Promise
this (Object) - tarefa atual (o total da tarefa está disponível como this.total
).
taskID (Function) - opcional, deve retornar o novo id da tarefa. Necessário apenas para criar tarefas "exclusivas", retorna um valor aleatório por padrão, chamado como: function (taskData)
. Sugar: se você passar uma string simples, ela será empacotada para a função, que sempre retorna essa string.
process (Function) - função de tarefa principal, chamada como: task.process(...args)
. Deveria retornar Promise
este (Objeto) - tarefa atual.
nova tentativa (Número) - opcional, número de novas tentativas em caso de erro, padrão 2.
retryDelay (Number) - opcional, atraso em ms após novas tentativas, padrão 60.000 ms.
timeout (Número) - opcional, tempo limite de execução, padrão 120.000 ms.
total (Número) - opcional, valor máximo de progresso, padrão 1. Se você não modificar o comportamento, o progresso começa com 0 e se torna 1 no final da tarefa.
posterDelay (Número) - opcional, se adiar for chamado sem atraso, o atraso será considerado igual a este valor (em milissegundos).
cron (String) - opcional, string cron ("15 */6 * * *"), padrão nulo.
faixa (Número) - padrão 3600000ms (1h). É hora de lembrar as tarefas agendadas do cron para evitar reexecuções se vários servidores no cluster tiverem relógios errados. Não defina muito alto para tarefas muito frequentes, pois pode ocupar muita memória.
Obtenha tarefa por id. Retorna uma promessa resolvida com tarefa ou null
se a tarefa não existir.
Campos de tarefa que você pode usar:
total - progresso total da tarefa
progresso - progresso da tarefa atual
resultado - o resultado da tarefa
erro - o erro da tarefa
Cancelar tarefa. Retorna uma promessa resolvida com tarefa.
Observação. Você pode cancelar apenas tarefas sem pai.
Inicie o trabalhador e comece o consumo de dados da tarefa. Retorna Promise
, resolvido quando a fila estiver pronta (chame .ready()
dentro).
Se pool
foi especificado no construtor, apenas as tarefas roteadas para este pull serão consumidas.
Pare de aceitar novas tarefas da fila. Return Promise
, resolvido quando todas as tarefas ativas neste trabalhador forem concluídas.
Return Promise
, resolvido quando a fila estiver pronta para operar (após o evento 'connect', veja abaixo).
Atualize as opções do construtor, exceto redisURL.
idoit
é uma instância EventEmitter
que dispara alguns eventos:
ready
quando a conexão redis estiver ativa e os comandos puderem ser executados (as tarefas podem ser registradas sem conexão)
error
quando ocorreu um erro.
task:progress
, task:progress:<task_id>
- quando a atualização da tarefa está em andamento. Os dados do evento são: {id, uid, total, progress}
task:end
, task:end:<task_id>
- quando a tarefa termina. Os dados do evento são: {id, uid}
Crie uma nova tarefa com parâmetros opcionais.
Substitua as propriedades da tarefa. Por exemplo, você pode desejar atribuir tarefas específicas de grupo/cadeia a outro pool.
Execute a tarefa imediatamente. Retorna uma promessa resolvida com o ID da tarefa.
Adie a execução da tarefa para delay
milissegundos (ou para task.postponeDelay
).
Retorna uma promessa resolvida com o ID da tarefa.
Reinicie a tarefa atualmente em execução.
add_retry (Boolean) - opcional, aumentar ou não a contagem de novas tentativas (padrão: false)
se true
, a contagem de novas tentativas aumenta e a tarefa não é reiniciada caso seja excedida
se false
, a contagem de novas tentativas permanece a mesma, então uma tarefa pode ser reiniciada indefinidamente
delay (Número) atraso antes da reinicialização em milissegundos (padrão: task.retryDelay
).
Observe que idoit
já possui lógica de reinicialização integrada para erros de tarefas. Provavelmente, você não deveria usar esse método diretamente. Está exposto para casos muito específicos.
Aumente o progresso da tarefa atual.
Retorna uma promessa resolvida com o ID da tarefa.
Atualize o prazo da tarefa atual.
Retorna uma promessa resolvida com o ID da tarefa.
Crie uma nova tarefa, executando filhos em paralelo.
fila.grupo([ fila.crianças1(), fila.crianças2(), fila.children3()]).run()
O resultado do grupo é uma matriz não classificada de resultados filhos.
Crie uma nova tarefa, executando crianças em série. Se algum dos filhos falhar, a cadeia também falha.
queue.registerTask('multiplicar', (a, b) => a * b);queue.registerTask('subtrair', (a, b) => a - b);queue.chain([ fila.multiply(2, 3), // 2 * 3 = 6 fila.subtract(10), // 10 - 6 = 4 fila.multiply(3) // 3 * 4 = 12]).run()
O resultado da tarefa anterior passa como último argumento da próxima tarefa. O resultado da cadeia é o resultado da última tarefa da cadeia.
Uma maneira especial de executar mapeamentos enormes em estilo preguiçoso (sob demanda). Veja os comentários abaixo.
// registra o iterador taskqueue.registerTask({ nome: 'lazy_mapper', classe base: Queue.Iterator, // Este método é chamado no início da tarefa e em cada final filho. Pode ser // uma função geradora ou função que retorna `Promise`. * iterate(state) {// ...// Três tipos de estados de saída possíveis: finalizado, não fazer nada e novos dados.//// 1. `null` - final alcançado, o iterador não deve mais ser chamado.// 2. `{}` - inativo, há subtarefas suficientes na fila, tente chamar // o iterador mais tarde (quando o próximo filho terminar). // 3. {// estado - novo estado do iterador a ser lembrado (por exemplo, deslocamento para // consulta db), quaisquer dados serializáveis // tarefas - matriz de novas subtarefas para enviar para a fila // }//// IMPORTANTE! O iterador pode ser chamado em paralelo por diferentes trabalhadores. Nós // usamos a entrada `state` para resolver colisões na atualização do redis. Então, se você// criar novas subtarefas://// 1. o novo `estado` DEVE ser diferente (para todos os estados anteriores)// 2. o array `tasks` NÃO DEVE estar vazio.//// Em outro caso você deve sinalizar sobre 'fim' ou 'inativo'.//// Combinação inválida causará 'fim' + evento de erro.//return { state: newState, tasks: chunksArray}; }});//executa iteratorqueue.lazy_mapper().run();
Por que essa magia maluca foi inventada?
Imagine que você precisa reconstruir 10 milhões de postagens no fórum. Você deseja dividir o trabalho em pequenos pedaços iguais, mas as postagens não têm enumeração inteira sequencial, apenas IDs do mongo. O que você pode fazer?
Solicitações diretas skip
+ limit
são muito caras em grandes coleções em qualquer banco de dados.
Não é possível dividir por intervalos de datas, pois a densidade das postagens varia muito da primeira para a última postagem.
Você pode adicionar um campo indexado com um número aleatório a cada postagem. Em seguida, divida por intervalos. Isso funcionará, mas causará acesso aleatório ao disco - não é legal.
A solução é usar um mapeador iterativo, que pode lembrar a "posição anterior". Nesse caso, você fará solicitações de range
+ limit
em vez de skip
+ limit
. Isso funciona bem com bancos de dados. Os bônus adicionais são:
Você não precisa manter todas as subtarefas na fila. Por exemplo, você pode criar 100 blocos e adicionar os próximos 100 quando os anteriores estiverem prestes a terminar.
A fase de mapeamento é distribuída e você pode começar a monitorar o progresso total imediatamente.
Execute rapidamente o redis via docker:
# startdocker run -d -p 6379:6379 --name redis1 redis# stopdocker stop redis1 docker rm redis1
É claro que estamos familiarizados com kue, aipo e akka. Nosso objetivo era ter um equilíbrio entre simplicidade e poder. Portanto, não sabemos se idoit
funciona bem em cluster com milhares de instâncias. Mas deve funcionar em volumes menores e é muito fácil de usar.
kue não atendeu às nossas necessidades, porque:
seu conceito de "prioridades" não é flexível e não protege bem contra bloqueios por tarefas pesadas
sem agrupamento/encadeamento de tarefas e assim por diante
não há fortes garantias de consistência de dados
No idoit
nos preocupamos com:
operações de grupo/cadeia de tarefas e passagem de dados entre tarefas (semelhante ao aipo)
pools de trabalhadores para isolar a execução de tarefas por tipos.
fácil de usar e instalar (é necessário apenas redis, pode ser executado no processo existente)
eventual consistência dos dados armazenados
açúcar essencial como agendador integrado
mapeador iterativo para cargas enormes (recurso exclusivo, muito útil para muitas tarefas de manutenção)
acompanhamento do progresso da tarefa
evite bloqueios globais
O Redis ainda pode ser um ponto de falha, mas esse é um preço aceitável pela simplicidade. É claro que você pode obter uma melhor disponibilidade por meio de barramentos de mensagens distribuídas como o RMQ. Mas em muitos casos é mais importante manter as coisas simples. Com idoit
você pode reutilizar tecnologias existentes sem despesas adicionais.