Механизм очереди задач на базе Redis с расширенным контролем задач и конечной согласованностью.
Группировка задач, цепочка, итераторы для огромных диапазонов.
Отложенное и запланированное выполнение задач.
Распределение нагрузки + рабочие пулы.
Легко встроить.
idoit
обеспечивает расширенный контроль для реализации этого
Группировка . Специальная group
задача выполняет дочерние задачи и ждет, пока все они будут выполнены. Полезно для логики отображения/сокращения.
Цепочка . Специальная chain
задач выполняет дочерние элементы один за другим. Также полезно для сокращения карты или разделения очень сложных задач на более простые шаги.
Итератор отображения . Специальная функция для огромных полезных нагрузок, позволяющая создавать фрагменты по требованию. Преимущества:
Никаких задержек на этапе сопоставления, обработка фрагментов начинается немедленно.
Легко оптимизировать запросы к БД для создания фрагментов одинакового размера (запросы пропуска + ограничения очень медленны при работе с огромными данными).
Прогресс . Когда вы используете сценарии групп/цепочек/карт, общий прогресс легко отслеживать через верхнего родителя. Длинные автономные задачи также могут уведомлять пользователя об изменении хода выполнения.
Рабочие пулы . Вы можете разделить задачи по разным процессам. Например, если вы не хотите, чтобы тяжелые задачи блокировали легкие.
Шедулер . Встроенный cron позволяет выполнять задачи по заданному расписанию.
Все данные в Redis всегда согласованы.
Задача не может быть потеряна, но МОЖЕТ запускаться дважды в крайних случаях (если процесс завершается, когда функция задачи была близка к завершению)
Прогресс может считаться «быстрее», если используется task.progressAdd()
и процесс завершается до завершения задачи. Но это не критично, поскольку такую информацию можно использовать только для обновления индикаторов выполнения интерфейса. В большинстве случаев вы не увидите разницы.
Требуется node.js
6+ и redis
3.0+.
npm установить idoit --save
redisURL (String) — URL-адрес подключения Redis.
параллелизм (число) — максимальное количество задач, которые может выполнять параллельно один работник, по умолчанию 100.
пул (String) — имя рабочего пула, «по умолчанию», если не установлено. Используется, если этот экземпляр очереди использует только задачи (после .start()
). Вы можете направлять задачи определенным пулам исполнителей, чтобы избежать нежелательных блокировок. Вы можете установить pool
Array, [ 'pool1', 'pool2' ]
для использования задач из нескольких пулов (в целях разработки/тестирования).
ns (String) — пространство имен данных, в настоящее время используется как префикс ключей Redis, по умолчанию «idoitqueue:».
Хорошей практикой является создание отдельных рабочих пулов для тяжелых блокирующих и неблокирующих задач. Например, никто не должен блокировать отправку срочных писем. Итак, создайте несколько рабочих процессов, прикрепите их к разным пулам и установите правильную параллельность задач. Неблокирующие задачи могут выполняться параллельно, и вы можете использовать concurrency
по умолчанию = 100. Блокирующие задачи должны выполняться одна за другой, установите concurrency
= 1 для этих рабочих процессов.
Примечание. Может случиться так, что вы удалите некоторые типы задач из своего приложения. В этом случае потерянные данные будут удалены через 3 дня.
Параметры:
name (String) — имя задачи.
baseClass (Функция) — необязательный, конструктор базовой задачи, по умолчанию «Задача».
init (функция) — необязательно, используется для инициализации асинхронной задачи, должно возвращать Promise
this (Объект) — текущая задача (общее количество задач доступно как this.total
).
TaskID (Функция) — необязательно, должно возвращать новый идентификатор задачи. Требуется только для создания «эксклюзивных» задач, по умолчанию возвращает случайное значение, называемое: function (taskData)
. Сахар: если вы передадите простую строку, она будет перенесена в функцию, которая всегда возвращает эту строку.
процесс (Функция) — основная функция задачи, называемая: task.process(...args)
. Должен вернуть Promise
this (Объект) — текущая задача.
повтор (Количество) — необязательно, количество повторов при ошибке, по умолчанию 2.
retryDelay (Число) — необязательно, задержка в мс после повторных попыток, по умолчанию 60000 мс.
таймаут (число) — необязательно, таймаут выполнения, по умолчанию 120000 мс.
итого (число) — необязательно, максимальное значение прогресса, по умолчанию 1. Если вы не измените поведение, прогресс начинается с 0 и становится 1 по завершении задачи.
debugDelay (Число) — необязательно, если отложенный вызов вызывается без задержки, задержка предполагается равной этому значению (в миллисекундах).
cron (String) — необязательно, строка cron («15 */6 * * *»), по умолчанию ноль.
трек (номер) — по умолчанию 3600000мс (1час). Время запомнить запланированные задачи из cron, чтобы избежать повторного запуска, если несколько серверов в кластере имеют неправильные часы. Не устанавливайте слишком высокое значение для очень частых задач, поскольку оно может занимать много памяти.
Получить задачу по идентификатору. Возвращает обещание, решенное с помощью задачи или с null
, если задача не существует.
Поля задач, которые вы можете использовать:
total - общий прогресс задачи
прогресс - текущий прогресс задачи
result - результат задачи
error - ошибка задачи
Отменить задачу. Возвращает обещание, решенное с помощью задачи.
Примечание. Вы можете отменить только задачи без родителя.
Запустите работника и начните потреблять данные задачи. Возвращение Promise
, разрешается, когда очередь готова (вызов .ready()
внутри).
Если pool
был указан в cunstructor, будут использоваться только задачи, направленные на этот запрос.
Прекратите принимать новые задачи из очереди. Return Promise
, разрешается после завершения всех активных задач в этом работнике.
Return Promise
, разрешается, когда очередь готова к работе (после события подключения, см. ниже).
Обновите параметры конструктора, кроме redisURL.
idoit
— это экземпляр EventEmitter
, который запускает некоторые события:
ready
, когда соединение Redis установлено и команды могут выполняться (задачи могут быть зарегистрированы без соединения)
error
, когда произошла ошибка.
task:progress
, task:progress:<task_id>
— когда выполняется обновление задачи. Данные о событии: {id, uid, итог, прогресс}
task:end
, task:end:<task_id>
— когда задача завершается. Данные о событии: { id, uid }
Создайте новую задачу с необязательными параметрами.
Переопределить свойства задачи. Например, вы можете захотеть назначить определенные задачи группы/цепочки другому пулу.
Запустите задачу немедленно. Возвращает обещание, решенное с идентификатором задачи.
Отложить выполнение задачи delay
миллисекунды (или на task.postponeDelay
).
Возвращает обещание, решенное с идентификатором задачи.
Перезапустите текущую задачу.
add_retry (Boolean) — необязательно, следует ли увеличивать количество повторов или нет (по умолчанию: false)
если true
, количество повторов увеличивается, и задача не перезапускается в случае его превышения
если false
, количество повторов остается прежним, поэтому задача может перезапускаться бесконечно
задержка (число) задержка перед перезапуском в миллисекундах (по умолчанию: task.retryDelay
).
Обратите внимание: idoit
уже есть встроенная логика перезапуска при ошибках задачи. Вероятно, вам не следует использовать этот метод напрямую. Это раскрывается для очень конкретных случаев.
Увеличение текущего выполнения задачи.
Возвращает обещание, решенное с идентификатором задачи.
Обновить текущий срок выполнения задачи.
Возвращает обещание, решенное с идентификатором задачи.
Создайте новую задачу, выполняя дочерние элементы параллельно.
очередь.группа([ очередь.дети1(), очередь.дети2(), очередь.детей3()]).run()
Групповой результат — это несортированный массив дочерних результатов.
Создайте новую задачу, последовательно выполняя детей. Если какой-либо из дочерних элементов выходит из строя, то и цепочка тоже выходит из строя.
очередь.registerTask('умножить', (a, b) => a * b);queue.registerTask('вычесть', (a, b) => a - b);queue.chain([ очередь.multiply(2, 3), // 2 * 3 = 6 очередь.вычитание(10), // 10 - 6 = 4 очередь.multiply(3) // 3 * 4 = 12]).run()
Результат предыдущей задачи передается в качестве последнего аргумента следующей задачи. Результатом цепочки является результат последней задачи в цепочке.
Особый способ запуска огромных карт в ленивом стиле (по требованию). Смотрите комментарии ниже.
// регистрируем итератор Taskqueue.registerTask({ имя: 'lazy_mapper', базовыйкласс: Очередь.Итератор, // Этот метод вызывается в начале задачи и в конце каждого дочернего процесса. Это может быть // функция-генератор или функция, возвращающая `Promise`. * iterate(state) {// ...// Возможны три типа выходных состояний: завершено, ничего не делать и новые данные.//// 1. `null` — конец достигнут, итератор больше не должен вызываться.// 2. `{}` - бездействие, в очереди достаточно подзадач, попробуйте вызвать // итератор позже (когда завершится следующий дочерний элемент). // 3. {// состояние - новое состояние итератора, которое нужно запомнить (например, смещение для // БД запрос), любые сериализуемые данные // задачи — массив новых подзадач для помещения в очередь // }//// ВАЖНО! Итератор может вызываться параллельно из разных воркеров. Мы // используем входное состояние для разрешения коллизий при обновлении Redis. Итак, если вы// создаете новые подзадачи://// 1. новое `state` ДОЛЖНО быть другим (для всех предыдущих состояний)// 2. массив `tasks` НЕ ДОЛЖЕН быть пустым.//// В противном случае вы должен сигнализировать о «конце» или «ожидании». //// Неверная комбинация приведет к событию «конец» + ошибка. // return { state: newState, Tasks: chunksArray}; }});// запускаем iteratorqueue.lazy_mapper().run();
Зачем было придумано это безумное волшебство?
Представьте, что вам нужно восстановить 10 миллионов сообщений на форуме. Вы хотите разделить работу на равные небольшие куски, но в сообщениях нет последовательного целочисленного перечисления, есть только идентификаторы монго. Что ты можешь сделать?
Запросы прямого skip
и limit
очень дороги для больших коллекций в любой базе данных.
Вы не можете разделить по интервалам дат, потому что плотность публикаций сильно варьируется от первого к последнему посту.
Вы можете добавить индексированное поле со случайным номером к каждому сообщению. Затем разбить по интервалам. Это сработает, но приведет к произвольному доступу к диску — не круто.
Решение состоит в том, чтобы использовать итеративный картограф, который может запомнить «предыдущую позицию». В этом случае вы будете выполнять запросы range
+ limit
вместо skip
+ limit
. Это хорошо работает с базами данных. Дополнительными бонусами являются:
Вам не нужно держать все подзадачи в очереди. Например, вы можете создать 100 фрагментов и добавить следующие 100, когда предыдущие завершатся.
Фаза картирования становится распределенной, и вы можете немедленно начать отслеживать общий прогресс.
Быстрый запуск Redis через Docker:
# startdocker run -d -p 6379:6379 --name redis1 redis# stopdocker stop redis1 докер РМ Redis1
Конечно, нам знакомы куэ, сельдерей и акка. Нашей целью было найти баланс между простотой и мощью. Итак, мы не знаем, хорошо ли работает idoit
в кластере с тысячами экземпляров. Но в меньших объемах его вполне хватит, и его действительно легко использовать.
kue не подходил для наших нужд, потому что:
концепция «приоритетов» не является гибкой и плохо защищает от блокировок при тяжелых задачах
нет группировки/цепочки задач и т. д.
нет надежных гарантий согласованности данных
В idoit
мы заботились о:
Операции группы/цепочки задач и передача данных между задачами (аналогично сельдерею)
рабочие пулы для изоляции выполнения задач по типам.
прост в использовании и установке (нужен только Redis, может работать в существующем процессе)
конечная согласованность хранимых данных
незаменимый сахар, такой как встроенный планировщик
итеративный картограф для огромных полезных нагрузок (уникальная функция, очень полезная для многих задач обслуживания)
отслеживание прогресса задач
избегать глобальных блокировок
Redis по-прежнему может быть точкой отказа, но это приемлемая цена за простоту. Конечно, вы можете добиться большей доступности с помощью распределенных шин сообщений, таких как RMQ. Но во многих случаях важнее сохранять простоту. С idoit
вы можете повторно использовать существующие технологии без дополнительных затрат.