Улучшите свои очереди с помощью профессионального интерфейса:
Получите полный обзор всех ваших очередей.
Проверяйте задания, ищите, повторяйте или рекламируйте отложенные задания.
Метрики и статистика.
и многие другие функции.
Зарегистрируйтесь на Taskforce.sh.
Минимальное использование ЦП благодаря конструкции без опроса.
Надежный дизайн на основе Redis.
Задержанные работы.
Планируйте и повторяйте задания в соответствии со спецификацией cron.
Ограничитель скорости для заданий.
Повторные попытки.
Приоритет.
Параллелизм.
Пауза/возобновление — глобально или локально.
Несколько типов заданий в очереди.
Поточные (изолированные) функции обработки.
Автоматическое восстановление после сбоев процесса.
И приближаемся к дорожной карте...
Подтверждение завершения задания (пока вы можете использовать шаблон очереди сообщений).
Трудовые отношения между родителями и детьми.
Для мониторинга можно использовать несколько сторонних интерфейсов:
BullMQ
Оперативная группа
Бык v3
Оперативная группа
рекламная доска
бык-репл
бык-монитор
Мониторо
Бык <= v2
Матадор
реакция-бык
Турейро
С экспортером очереди Prometheus Bull
Поскольку существует несколько решений для очереди заданий, вот таблица, сравнивающая их:
Dragonfly — это новая замена Redis™, которая полностью совместима с BullMQ и дает некоторые важные преимущества по сравнению с Redis™, такие как значительно более высокая производительность за счет использования всех доступных ядер ЦП, а также более быстрых и более эффективных структур данных с использованием памяти. Подробнее о том, как использовать его с BullMQ, читайте здесь. | |
Если вам нужны высококачественные рабочие экземпляры Redis для вашего проекта Bull, рассмотрите возможность подписки на Memetria для Redis, лидера хостинга Redis, который идеально работает с BullMQ. Используйте промокод «BULLMQ» при регистрации, чтобы помочь нам спонсировать разработку BullMQ! |
Особенность | BullMQ-Про | BullMQ | Бык | Куэ | Пчела | Повестка дня |
---|---|---|---|---|---|---|
Бэкэнд | Redis | Redis | Redis | Redis | Redis | монго |
Наблюдаемые | ✓ | |||||
Ограничение групповой ставки | ✓ | |||||
Групповая поддержка | ✓ | |||||
Пакетная поддержка | ✓ | |||||
Родительские/дочерние зависимости | ✓ | ✓ | ||||
Приоритеты | ✓ | ✓ | ✓ | ✓ | ✓ | |
Параллелизм | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
Отложенные задания | ✓ | ✓ | ✓ | ✓ | ✓ | |
Глобальные события | ✓ | ✓ | ✓ | ✓ | ||
Ограничитель скорости | ✓ | ✓ | ✓ | |||
Пауза/возобновление | ✓ | ✓ | ✓ | ✓ | ||
Рабочий в песочнице | ✓ | ✓ | ✓ | |||
Повторяемые задания | ✓ | ✓ | ✓ | ✓ | ||
Атомные операции | ✓ | ✓ | ✓ | ✓ | ||
Упорство | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
пользовательский интерфейс | ✓ | ✓ | ✓ | ✓ | ✓ | |
Оптимизирован для | Вакансии / Сообщения | Вакансии / Сообщения | Вакансии / Сообщения | Вакансии | Сообщения | Вакансии |
npm установить Bull --save
или
пряжа добавить быка
Требования: Bull требуется версия Redis выше или равная 2.8.18
.
npm install @types/bull --save-dev
пряжа добавить --dev @types/bull
Определения в настоящее время хранятся в репозитории DefiniteTyped.
Мы приветствуем любые виды вкладов: исправления кода, новые функции или улучшения документации. Форматирование кода осуществляется с помощью prettier. Для коммитов следуйте общепринятым соглашениям о коммитах. Весь код должен пройти правила lint и наборы тестов, прежде чем его можно будет объединить с разработкой.
const Queue = require('bull');const videoQueue = new Queue('транскодирование видео', 'redis://127.0.0.1:6379');const audioQueue = new Queue('транскодирование аудио', { redis: {port : 6379, хост: '127.0.0.1', пароль: 'foobared' } }); // Укажите соединение Redis с помощью objectconst imageQueue = new Queue('image transcoding');const pdfQueue = new Queue('pdf transcoding');videoQueue.process(function (job, Done) { // job.data содержит пользовательские данные, переданные при создании задания // job.id содержит идентификатор этой работы. // асинхронно перекодируем видео и сообщаем о прогрессе работа.прогресс(42); // вызов завершен после завершения сделанный(); // или выдать ошибку, если ошибка Done(new Error('ошибка перекодирования')); // или передать результат Done(null, {framerate: 29.5 /* и т.д. */ }); // Если задание выдает необработанное исключение, оно также обрабатывается правильно throw new Error('какая-то неожиданная ошибка');});audioQueue.process(function (job, Done) { // асинхронно перекодируем звук и сообщаем о прогрессе работа.прогресс(42); // вызов завершен после завершения сделанный(); // или выдать ошибку, если ошибка Done(new Error('ошибка перекодирования')); // или передать результат Done(null, { Samplerate: 48000 /* и т.д. */ }); // Если задание выдает необработанное исключение, оно также обрабатывается правильно throw new Error('какая-то неожиданная ошибка');});imageQueue.process(function (job, Done) { // асинхронно перекодируем изображение и сообщаем о прогрессе работа.прогресс(42); // вызов завершен после завершения сделанный(); // или выдать ошибку, если ошибка Done(new Error('ошибка перекодирования')); // или передать результат Done(null, { width: 1280, height: 720 /* и т.д. */ }); // Если задание выдает необработанное исключение, оно также обрабатывается правильно throw new Error('какая-то неожиданная ошибка');});pdfQueue.process(function (job) { // Процессоры также могут возвращать обещания вместо использования обратного вызова Done return pdfAsyncProcessor();});videoQueue.add({ video: 'http://example.com/video1.mov' });audioQueue.add({ audio: 'http://example.com/audio1.mp3 ' });imageQueue.add({ image: 'http://example.com/image1.tiff' });
В качестве альтернативы вы можете возвращать обещания вместо использования обратного вызова done
:
videoQueue.process(function (job) { // не забудьте удалить обратный вызов Done! // Просто возвращаем обещание return fetchVideo(job.data.url).then(transcodeVideo); // Обрабатывает отказ от обещания return Promise.reject(new Error('ошибка перекодирования')); // Передает значение, с которым решено обещание, событию «завершено» return Promise.resolve({framerate: 29.5 /* и т.д. */ }); // Если задание выдает необработанное исключение, оно также обрабатывается правильно throw new Error('какая-то неожиданная ошибка'); // то же, что return Promise.reject(new Error('некоторая неожиданная ошибка'));});
Функцию процесса также можно запустить в отдельном процессе. Это имеет несколько преимуществ:
Процесс изолирован в песочнице, поэтому его сбой не повлияет на работника.
Вы можете запустить блокирующий код, не затрагивая очередь (задания не будут останавливаться).
Гораздо лучшее использование многоядерных процессоров.
Меньше подключений к Redis.
Чтобы использовать эту возможность, достаточно создать отдельный файл с процессором:
// процессор.jsmodule.exports = функция (задание) { // Выполняем тяжелую работу вернуть Promise.resolve(результат);}
И определите процессор следующим образом:
// Одиночный процесс:queue.process('/path/to/my/processor.js');// Вы также можете использовать параллелизм:queue.process(5, '/path/to/my/processor.js' );// и именованные процессоры:queue.process('мой процессор', 5, '/path/to/my/processor.js');
Задание можно добавить в очередь и многократно обрабатывать в соответствии со спецификацией cron:
PaymentQueue.process(function (job) {// Проверка платежей }); // Повторяем платежное задание один раз в день в 3:15 (утра) PaymentQueue.add(PaymentsData, {repeat: { cron: '15 3 * * *' } });
Совет: проверьте свои выражения здесь, чтобы убедиться в их правильности: генератор выражений cron.
Очередь может быть приостановлена и возобновлена глобально (передайте true
, чтобы приостановить обработку только для этого работника):
очередь.пауза().тогда(функция () { // очередь приостановлена});queue.resume().then(function () { // очередь возобновляется})
Очередь генерирует некоторые полезные события, например...
.on('завершено', функция (задание, результат) { // Задание завершено с выводом результата!})
Дополнительную информацию о событиях, включая полный список инициируемых событий, можно найти в справочнике по событиям.
Очереди дешевы, поэтому, если вам нужно их много, просто создавайте новые с разными именами:
const userJohn = новая очередь('john');const userLisa = новая очередь('lisa');...
Однако для каждого экземпляра очереди потребуются новые соединения Redis. Проверьте, как повторно использовать соединения, или вы также можете использовать именованные процессоры для достижения аналогичного результата.
ПРИМЕЧАНИЕ. Начиная с версии 3.2.0 и выше вместо этого рекомендуется использовать многопоточные процессоры.
Очереди надежны и могут выполняться параллельно в нескольких потоках или процессах без какого-либо риска возникновения опасностей или повреждения очереди. Проверьте этот простой пример использования кластера для распараллеливания заданий между процессами:
const Queue = require('bull');const Cluster = require('cluster');const numWorkers = 8;const очереди = new Queue('проверить параллельную очередь');if (cluster.isMaster) { for (let i = 0; i <numWorkers; i++) {cluster.fork(); } Cluster.on('online', function (worker) {// Давайте создадим несколько заданий для работников очереди for (let i = 0; i < 500; i++) {queue.add({ foo: 'bar' }); }; }); Cluster.on('exit', function (worker, code, signal) {console.log('worker' + worker.process.pid + 'умер'); });} еще { очередь.процесс(функция (задание, jobDone) {console.log('Задание выполнено работником', кластер.worker.id, job.id);jobDone(); });}
Для получения полной документации ознакомьтесь со справочными материалами и общими шаблонами:
Руководство — ваша отправная точка для разработки с Bull.
Справочник — Справочный документ со всеми доступными объектами и методами.
Шаблоны — набор примеров распространенных шаблонов.
Лицензия — лицензия Bull — это MIT.
Если вы видите что-либо, требующее дополнительной документации, отправьте запрос на включение!
Очередь нацелена на рабочую стратегию «хотя бы один раз». Это означает, что в некоторых ситуациях задание может быть обработано более одного раза. Чаще всего это происходит, когда работнику не удается сохранить блокировку данного задания в течение всей продолжительности обработки.
Когда работник обрабатывает задание, он будет держать задание «заблокированным», чтобы другие работники не могли его обработать.
Важно понимать, как работает блокировка, чтобы ваши задания не потеряли блокировку ( зависли ) и в результате не перезапустились бы. Блокировка реализуется внутренне путем создания блокировки для lockDuration
на интервале lockRenewTime
(который обычно равен половине lockDuration
). Если lockDuration
истечет до того, как блокировку можно будет возобновить, задание будет считаться остановленным и автоматически перезапускается; оно будет обработано дважды . Это может произойти, когда:
Процесс Node, выполняющий ваш обработчик заданий, неожиданно завершается.
Ваш процессор заданий слишком сильно нагружал процессор и остановил цикл событий Node, в результате чего Bull не смог возобновить блокировку задания (см. #488, чтобы узнать, как лучше это обнаружить). Это можно исправить, разбив обработчик заданий на более мелкие части, чтобы ни одна часть не могла блокировать цикл событий Node. В качестве альтернативы вы можете передать большее значение для параметра lockDuration
(компромисс заключается в том, что распознавание реального остановленного задания займет больше времени).
Таким образом, вам всегда следует прослушивать stalled
событие и регистрировать его в своей системе мониторинга ошибок, поскольку это означает, что ваши задания, скорее всего, будут подвергаться двойной обработке.
В качестве гарантии, что проблемные задания не будут перезапускаться бесконечно (например, если обработчик заданий всегда приводит к сбою своего процесса Node), задания будут восстанавливаться из остановленного состояния максимум раз maxStalledCount
(по умолчанию: 1
).