Turbine suas filas com um front-end profissional:
Obtenha uma visão geral completa de todas as suas filas.
Inspecione vagas, pesquise, tente novamente ou promova vagas atrasadas.
Métricas e estatísticas.
e muitos mais recursos.
Inscreva-se em Taskforce.sh
Uso mínimo da CPU devido a um design sem polling.
Design robusto baseado em Redis.
Trabalhos atrasados.
Agende e repita jobs de acordo com uma especificação cron.
Limitador de taxa para trabalhos.
Novas tentativas.
Prioridade.
Simultaneidade.
Pausar/retomar — globalmente ou localmente.
Vários tipos de trabalho por fila.
Funções de processamento encadeadas (em área restrita).
Recuperação automática de falhas de processo.
E chegando no roteiro...
Confirmação de conclusão do trabalho (enquanto isso, você pode usar o padrão de fila de mensagens).
Relações de trabalho pai-filho.
Existem algumas UIs de terceiros que você pode usar para monitoramento:
TouroMQ
Força-tarefa
Touro v3
Força-tarefa
quadro de touros
resposta de touro
monitor de touros
Monitor
Touro <= v2
Matador
reagir-touro
Toureiro
Com o exportador Prometheus Bull Queue
Como existem algumas soluções de fila de trabalhos, aqui está uma tabela comparando-as:
Dragonfly é um novo substituto imediato do Redis™ que é totalmente compatível com BullMQ e traz algumas vantagens importantes sobre o Redis™, como melhor desempenho massivo ao utilizar todos os núcleos de CPU disponíveis e estruturas de dados mais rápidas e eficientes em termos de memória. Leia mais aqui sobre como usá-lo com BullMQ. | |
Se você precisa de instâncias Redis de produção de alta qualidade para o seu projeto Bull, considere assinar o Memetria for Redis, líder em hospedagem Redis que funciona perfeitamente com BullMQ. Use o código promocional "BULLMQ" ao se inscrever para nos ajudar a patrocinar o desenvolvimento do BullMQ! |
Recurso | BullMQ-Pro | TouroMQ | Touro | Kue | Abelha | Agenda |
---|---|---|---|---|---|---|
Back-end | redis | redis | redis | redis | redis | mongo |
Observáveis | ✓ | |||||
Limite de taxa de grupo | ✓ | |||||
Suporte de grupo | ✓ | |||||
Suporte a lotes | ✓ | |||||
Dependências pai/filho | ✓ | ✓ | ||||
Prioridades | ✓ | ✓ | ✓ | ✓ | ✓ | |
Simultaneidade | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
Trabalhos atrasados | ✓ | ✓ | ✓ | ✓ | ✓ | |
Eventos globais | ✓ | ✓ | ✓ | ✓ | ||
Limitador de taxa | ✓ | ✓ | ✓ | |||
Pausar/Retomar | ✓ | ✓ | ✓ | ✓ | ||
Trabalhador em área restrita | ✓ | ✓ | ✓ | |||
Trabalhos repetíveis | ✓ | ✓ | ✓ | ✓ | ||
Operações atômicas | ✓ | ✓ | ✓ | ✓ | ||
Persistência | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
IU | ✓ | ✓ | ✓ | ✓ | ✓ | |
Otimizado para | Trabalhos / Mensagens | Trabalhos / Mensagens | Trabalhos / Mensagens | Empregos | Mensagens | Empregos |
npm instalar touro --save
ou
fio adicionar touro
Requisitos: Bull requer uma versão Redis maior ou igual a 2.8.18
.
npm install @types/bull --save-dev
fio adicionar --dev @types/bull
As definições são atualmente mantidas no repositório DefinitelyTyped.
Aceitamos todos os tipos de contribuições, sejam correções de código, novos recursos ou melhorias de documentos. A formatação do código é imposta por mais bonito. Para commits, siga a convenção de commits convencionais. Todo o código deve passar pelas regras do lint e pelos conjuntos de testes antes de poder ser mesclado no desenvolvimento.
const Queue = require('bull');const videoQueue = new Queue('transcodificação de vídeo', 'redis://127.0.0.1:6379');const audioQueue = new Queue('transcodificação de áudio', { redis: { porta : 6379, host: '127.0.0.1', senha: 'foobared' } }); // Especifique a conexão Redis usando objectconst imageQueue = new Queue('image transcoding');const pdfQueue = new Queue('pdf transcoding');videoQueue.process(function (job, done) { // job.data contém os dados personalizados passados quando o trabalho foi criado // job.id contém o id deste trabalho. // transcodifica o vídeo de forma assíncrona e relata o progresso trabalho.progress(42); //chamada concluída quando terminar feito(); //ou dá um erro se erro pronto(new Error('erro de transcodificação')); // ou passe um resultado feito(nulo, { taxa de quadros: 29,5 /* etc... */ }); // Se o job lançar uma exceção não tratada, ele também será tratado corretamente throw new Error('algum erro inesperado');});audioQueue.process(function (trabalho, concluído) { // transcodifica o áudio de forma assíncrona e relata o progresso trabalho.progress(42); //chamada concluída quando terminar feito(); //ou dá um erro se erro pronto(new Error('erro de transcodificação')); // ou passe um resultado feito(null, { taxa de amostragem: 48000 /* etc... */ }); // Se o job lançar uma exceção não tratada, ele também será tratado corretamente throw new Error('algum erro inesperado');});imageQueue.process(function (trabalho, concluído) { // transcodifica a imagem de forma assíncrona e relata o progresso trabalho.progress(42); //chamada concluída quando terminar feito(); //ou dá um erro se erro pronto(new Error('erro de transcodificação')); // ou passe um resultado pronto(null, {largura: 1280, altura: 720 /* etc... */ }); // Se o job lançar uma exceção não tratada, ele também será tratado corretamente throw new Error('algum erro inesperado');});pdfQueue.process(function (job) { // Os processadores também podem retornar promessas em vez de usar o callback done return pdfAsyncProcessor();});videoQueue.add({ vídeo: 'http://example.com/video1.mov' });audioQueue.add({ áudio: 'http://example.com/audio1.mp3 ' });imageQueue.add({ imagem: 'http://example.com/image1.tiff' });
Alternativamente, você pode retornar promessas em vez de usar o retorno de chamada done
:
videoQueue.process(function (job) { // não se esqueça de remover o retorno de chamada concluído! // Simplesmente retorna uma promessa retornar fetchVideo(job.data.url).then(transcodeVideo); // Lida com rejeição de promessa return Promise.reject(new Error('erro de transcodificação')); // Passa o valor com o qual a promessa é resolvida para o evento "completed" return Promise.resolve({ taxa de quadros: 29,5 /* etc... */ }); // Se o job lançar uma exceção não tratada, ele também será tratado corretamente throw new Error('algum erro inesperado'); // igual a return Promise.reject(new Error('algum erro inesperado'));});
A função de processo também pode ser executada em um processo separado. Isto tem várias vantagens:
O processo está em área restrita, portanto, se travar, não afetará o trabalhador.
Você pode executar o código de bloqueio sem afetar a fila (os trabalhos não serão interrompidos).
Utilização muito melhor de CPUs multi-core.
Menos conexões com redis.
Para utilizar este recurso basta criar um arquivo separado com o processador:
//processador.jsmodule.exports = função (trabalho) { // Faça algum trabalho pesado retornar Promise.resolve(resultado);}
E defina o processador assim:
// Processo único:queue.process('/path/to/my/processor.js');// Você também pode usar simultaneidade:queue.process(5, '/path/to/my/processor.js' );// e processadores nomeados:queue.process('meu processador', 5, '/caminho/para/meu/processador.js');
Um trabalho pode ser adicionado a uma fila e processado repetidamente de acordo com uma especificação cron:
pagamentosQueue.process(function (job) {// Verificar pagamentos }); // Repita o trabalho de pagamento uma vez por dia às 3h15 (da manhã) pagamentosQueue.add(paymentsData, {repetir: { cron: '15 3 * * *' } });
Como dica, verifique suas expressões aqui para verificar se estão corretas: gerador de expressão cron
Uma fila pode ser pausada e retomada globalmente (passe true
para pausar o processamento apenas para este trabalhador):
fila.pause().then(function() { // a fila está pausada agora});queue.resume().then(function () { // a fila é retomada agora})
Uma fila emite alguns eventos úteis, por exemplo...
.on('concluído', function (trabalho, resultado) { // Trabalho concluído com resultado de saída!})
Para obter mais informações sobre eventos, incluindo a lista completa de eventos disparados, confira a referência de eventos
As filas são baratas, então se você precisar de muitas delas basta criar novas com nomes diferentes:
const userJohn = new Queue('john');const userLisa = new Queue('lisa');...
No entanto, cada instância de fila exigirá novas conexões Redis. Verifique como reutilizar conexões ou você também pode usar processadores nomeados para obter um resultado semelhante.
NOTA: A partir da versão 3.2.0 e superior, é recomendado usar processadores encadeados.
As filas são robustas e podem ser executadas em paralelo em vários threads ou processos sem qualquer risco de perigos ou corrupção de fila. Verifique este exemplo simples usando cluster para paralelizar jobs entre processos:
const Queue = require('bull');const cluster = require('cluster');const numWorkers = 8;const queue = new Queue('testar fila simultânea');if (cluster.isMaster) { for (seja i = 0; i < numWorkers; i++) {cluster.fork(); } cluster.on('online', function (worker) {// Vamos criar alguns jobs para a fila workersfor (let i = 0; i < 500; i++) { queue.add({ foo: 'bar' }); }; }); cluster.on('exit', função (trabalhador, código, sinal) {console.log('trabalhador' + trabalhador.process.pid + 'morreu'); });} outro { queue.process(function (job, jobDone) {console.log('Trabalho realizado pelo trabalhador', cluster.worker.id, job.id);jobDone(); });}
Para a documentação completa, confira a referência e os padrões comuns:
Guia — Seu ponto de partida para desenvolver com a Bull.
Referência — Documento de referência com todos os objetos e métodos disponíveis.
Padrões — um conjunto de exemplos de padrões comuns.
Licença – a licença Bull – é o MIT.
Se você encontrar algo que precise de mais documentos, envie uma solicitação de pull!
A fila visa uma estratégia de trabalho “pelo menos uma vez”. Isto significa que, em algumas situações, um trabalho pode ser processado mais de uma vez. Isso acontece principalmente quando um trabalhador não consegue manter um bloqueio para um determinado trabalho durante a duração total do processamento.
Quando um trabalhador está processando um trabalho, ele o manterá "bloqueado" para que outros trabalhadores não possam processá-lo.
É importante entender como o bloqueio funciona para evitar que seus trabalhos percam o bloqueio - fiquem paralisados - e sejam reiniciados como resultado. O bloqueio é implementado internamente criando um bloqueio para lockDuration
no intervalo lockRenewTime
(que geralmente é metade lockDuration
). Se lockDuration
decorrer antes que o bloqueio possa ser renovado, o trabalho será considerado paralisado e reiniciado automaticamente; será processado duas vezes . Isso pode acontecer quando:
O processo do Node que executa seu processador de trabalho é encerrado inesperadamente.
Seu processador de trabalho consumia muito CPU e travou o loop de eventos do Node e, como resultado, Bull não conseguiu renovar o bloqueio de trabalho (consulte #488 para saber como podemos detectar isso melhor). Você pode corrigir isso dividindo seu processador de trabalho em partes menores para que nenhuma parte possa bloquear o loop de eventos do Node. Alternativamente, você pode passar um valor maior para a configuração lockDuration
(com a desvantagem de que levará mais tempo para reconhecer um trabalho realmente paralisado).
Dessa forma, você deve sempre ouvir o evento stalled
e registrá-lo em seu sistema de monitoramento de erros, pois isso significa que seus trabalhos provavelmente serão processados duas vezes.
Como medida de segurança para que trabalhos problemáticos não sejam reiniciados indefinidamente (por exemplo, se o processador de trabalhos sempre travar seu processo Node), os trabalhos serão recuperados de um estado paralisado no máximo maxStalledCount
vezes (padrão: 1
).