Solid Queue — это серверная часть организации очередей на базе БД для Active Job, разработанная с учетом простоты и производительности.
Помимо обычной постановки и обработки заданий, Solid Queue поддерживает отложенные задания, управление параллелизмом, повторяющиеся задания, приостановку очередей, числовые приоритеты для каждого задания, приоритеты по порядку очереди и массовую постановку в очередь ( enqueue_all
для perform_all_later
Active Job ).
Solid Queue можно использовать с базами данных SQL, такими как MySQL, PostgreSQL или SQLite, и он использует предложение FOR UPDATE SKIP LOCKED
, если оно доступно, чтобы избежать блокировки и ожидания блокировок при опросе заданий. Он использует Active Job для повторных попыток, отбрасывания, обработки ошибок, сериализации или задержек и совместим с многопоточностью Ruby on Rails.
Solid Queue настроен по умолчанию в новых приложениях Rails 8. Но если вы используете более раннюю версию, вы можете добавить ее вручную, выполнив следующие действия:
bundle add solid_queue
bin/rails solid_queue:install
Это позволит настроить Solid Queue в качестве бэкэнда рабочего активного задания, создать файлы конфигурации config/queue.yml
и config/recurring.yml
и создать db/queue_schema.rb
. Он также создаст исполняемую оболочку bin/jobs
, которую можно использовать для запуска Solid Queue.
Как только вы это сделаете, вам нужно будет добавить конфигурацию базы данных очереди в config/database.yml
. Если вы используете SQLite, это будет выглядеть так:
production :
primary :
<< : *default
database : storage/production.sqlite3
queue :
<< : *default
database : storage/production_queue.sqlite3
migrations_paths : db/queue_migrate
...или если вы используете MySQL/PostgreSQL/Trilogy:
production :
primary : &primary_production
<< : *default
database : app_production
username : app
password : <%= ENV["APP_DATABASE_PASSWORD"] %>
queue :
<< : *primary_production
database : app_production_queue
migrations_paths : db/queue_migrate
Примечание. Вызов bin/rails solid_queue:install
автоматически добавит config.solid_queue.connects_to = { database: { writing: :queue } }
в config/environments/production.rb
, поэтому дополнительная настройка здесь не требуется (хотя вы должны убедиться, что что вы используете имя queue
в database.yml
, чтобы оно соответствовало!). Но если вы хотите использовать Solid Queue в другой среде (например, в промежуточной среде или даже в разработке), вам придется вручную добавить эту строку config.solid_queue.connects_to
в соответствующий файл среды. И, как всегда, убедитесь, что имя, которое вы используете для базы данных в config/database.yml
соответствует имени, которое вы используете в config.solid_queue.connects_to
.
Затем запустите db:prepare
в рабочей среде, чтобы убедиться, что база данных создана и схема загружена.
Теперь вы готовы начать обработку заданий, запустив bin/jobs
на сервере, который выполняет эту работу. При этом начнется обработка заданий во всех очередях с использованием конфигурации по умолчанию. Подробнее о настройке Solid Queue см. ниже.
Для небольших проектов вы можете запустить Solid Queue на том же компьютере, что и ваш веб-сервер. Когда вы будете готовы к масштабированию, Solid Queue поддерживает горизонтальное масштабирование «из коробки». Вы можете запустить Solid Queue на отдельном от вашего веб-сервера сервере или даже запускать bin/jobs
на нескольких машинах одновременно. В зависимости от конфигурации вы можете назначить на некоторых машинах работать только диспетчеров или только рабочих. Более подробную информацию об этом смотрите в разделе конфигурации.
Примечание . Будущие изменения в схеме будут осуществляться в виде регулярных миграций.
Рекомендуется запускать Solid Queue в отдельной базе данных, но также можно использовать одну базу данных как для приложения, так и для очереди. Просто выполните следующие действия:
db/queue_schema.rb
в обычную миграцию и удалите db/queue_schema.rb
config.solid_queue.connects_to
из production.rb
bin/jobs
У вас не будет нескольких баз данных, поэтому в database.yml
не обязательно должна быть основная база данных и база данных очереди.
Если вы планируете внедрять Solid Queue постепенно, переключая по одному заданию за раз, вы можете сделать это, оставив для config.active_job.queue_adapter
значение старого бэкэнда, а затем queue_adapter
непосредственно в перемещаемых заданиях:
# app/jobs/my_job.rb
class MyJob < ApplicationJob
self . queue_adapter = :solid_queue
# ...
end
Solid Queue был разработан для максимальной пропускной способности при использовании с MySQL 8+ или PostgreSQL 9.5+, поскольку они поддерживают FOR UPDATE SKIP LOCKED
. Вы можете использовать его со старыми версиями, но в этом случае вы можете столкнуться с ожиданием блокировки, если запустите несколько рабочих процессов для одной и той же очереди. Вы также можете использовать его с SQLite в небольших приложениях.
В Solid Queue у нас есть несколько типов актеров:
solid_queue_ready_executions
.solid_queue_scheduled_executions
в таблицу solid_queue_ready_executions
, чтобы работники могли их забрать. Кроме того, они выполняют некоторые работы по обслуживанию, связанные с контролем параллелизма.Супервизор Solid Queue создаст отдельный процесс для каждого контролируемого работника/диспетчера/планировщика.
По умолчанию Solid Queue попытается найти вашу конфигурацию в config/queue.yml
, но вы можете установить другой путь, используя переменную среды SOLID_QUEUE_CONFIG
или используя опцию -c/--config_file
с bin/jobs
, например:
bin/jobs -c config/calendar.yml
Вот как выглядит эта конфигурация:
production :
dispatchers :
- polling_interval : 1
batch_size : 500
concurrency_maintenance_interval : 300
workers :
- queues : " * "
threads : 3
polling_interval : 2
- queues : [ real_time, background ]
threads : 5
polling_interval : 0.1
processes : 3
Все необязательно. Если конфигурация вообще не указана, Solid Queue будет работать с одним диспетчером и одним исполнителем с настройками по умолчанию. Если вы хотите запускать только диспетчеров или воркеров, вам просто нужно включить в конфигурацию только этот раздел. Например, со следующей конфигурацией:
production :
dispatchers :
- polling_interval : 1
batch_size : 500
concurrency_maintenance_interval : 300
супервайзер будет управлять 1 диспетчером и ни одним работником.
Вот обзор различных вариантов:
polling_interval
: интервал времени в секундах, в течение которого работники и диспетчеры будут ждать, прежде чем проверять наличие дополнительных заданий. По умолчанию это время составляет 1
секунду для диспетчеров и 0.1
секунды для рабочих.
batch_size
: диспетчер будет отправлять задания пакетами этого размера. По умолчанию — 500.
concurrency_maintenance_interval
: интервал времени в секундах, в течение которого диспетчер будет ждать перед проверкой заблокированных заданий, которые можно разблокировать. Узнайте больше об элементах управления параллелизмом, чтобы узнать больше об этом параметре. По умолчанию оно составляет 600
секунд.
queues
: список очередей, из которых работники будут выбирать задания. Вы можете использовать *
для обозначения всех очередей (это также значение по умолчанию и поведение, которое вы получите, если пропустите это). Вы можете предоставить одну очередь или список очередей в виде массива. Задания будут опрашиваться из этих очередей по порядку, поэтому, например, с помощью [ real_time, background ]
никакие задания не будут браться из background
, если в real_time
больше не останется заданий. Вы также можете указать префикс с подстановочным знаком для соответствия очередям, начинающимся с префикса. Например:
staging :
workers :
- queues : staging*
threads : 3
polling_interval : 5
Это создаст работника, извлекающего задания из всех очередей, начиная с staging
. Подстановочный знак *
допускается только сам по себе или в конце имени очереди; вы не можете указать имена очередей, такие как *_some_queue
. Они будут проигнорированы.
Наконец, вы можете комбинировать префиксы с точными именами, например [ staging*, background ]
, и поведение в отношении порядка будет таким же, как и при использовании только точных имен.
Ознакомьтесь с разделами ниже, чтобы узнать, как порядок очереди ведет себя в сочетании с приоритетами, и как способ указания очередей для каждого работника может повлиять на производительность.
threads
: это максимальный размер пула потоков, который каждый работник будет использовать для выполнения заданий. Каждый рабочий процесс получит не более указанного количества заданий из своей очереди и отправит их в пул потоков для запуска. По умолчанию это 3
. Эта настройка есть только у рабочих.
processes
: это количество рабочих процессов, которые будут разветвлены супервизором с заданными настройками. По умолчанию это 1
, только один процесс. Этот параметр полезен, если вы хотите выделить очереди или очередям более одного ядра ЦП с одинаковой конфигурацией. Эта настройка есть только у рабочих.
concurrency_maintenance
: будет ли диспетчер выполнять работу по обслуживанию параллелизма. Это true
по умолчанию, и это полезно, если вы не используете какие-либо элементы управления параллелизмом и хотите его отключить, или если вы запускаете несколько диспетчеров и хотите, чтобы некоторые из них просто отправляли задания, не делая ничего другого.
Как упоминалось выше, если вы укажете список очередей для работника, они будут опрошены в указанном порядке, например, для списка real_time,background
, никакие задания не будут взяты из background
если в очереди больше нет заданий. real_time
.
Active Job также поддерживает приоритеты положительных целых чисел при постановке заданий в очередь. В Solid Queue чем меньше значение, тем выше приоритет. Значение по умолчанию — 0
.
Это полезно, когда вы запускаете в одной очереди задания с разной важностью или срочностью. В одной очереди задания будут выбираться в порядке приоритета, но в списке очередей порядок очереди имеет приоритет, поэтому в предыдущем примере с real_time,background
задания в очереди real_time
будут выбираться раньше заданий в background
очередь, даже если те, кто находится в background
очереди, имеют более высокий приоритет (меньшее значение).
Мы рекомендуем не смешивать порядок очереди с приоритетами, а выбирать либо тот, либо другой, поскольку это упростит для вас порядок выполнения заданий.
Чтобы обеспечить производительность опроса и всегда использовать покрывающий индекс, Solid Queue выполняет только два типа запросов опроса:
-- No filtering by queue
SELECT job_id
FROM solid_queue_ready_executions
ORDER BY priority ASC , job_id ASC
LIMIT ?
FOR UPDATE SKIP LOCKED;
-- Filtering by a single queue
SELECT job_id
FROM solid_queue_ready_executions
WHERE queue_name = ?
ORDER BY priority ASC , job_id ASC
LIMIT ?
FOR UPDATE SKIP LOCKED;
Первый (без фильтрации по очереди) используется, когда вы указываете
queues : *
и нет никаких приостановленных очередей, поскольку мы хотим настроить таргетинг на все очереди.
В других случаях нам нужно иметь список очередей для фильтрации по порядку, потому что мы можем фильтровать только по одной очереди за раз, чтобы гарантировать, что мы используем индекс для сортировки. Это означает, что если вы укажете свои очереди как:
queues : beta*
сначала нам нужно получить список всех существующих очередей, соответствующих этому префиксу, с помощью запроса, который будет выглядеть следующим образом:
SELECT DISTINCT (queue_name)
FROM solid_queue_ready_executions
WHERE queue_name LIKE ' beta% ' ;
Этот тип запроса DISTINCT
к столбцу, который является самым левым столбцом в индексе, может быть выполнен очень быстро в MySQL благодаря методу, называемому сканированием свободного индекса. Однако PostgreSQL и SQLite не реализуют этот метод, а это означает, что если ваша таблица solid_queue_ready_executions
очень велика из-за очень глубоких очередей, этот запрос будет выполняться медленно. Обычно ваша таблица solid_queue_ready_executions
будет небольшой, но это может случиться.
Аналогично использованию префиксов, то же самое произойдет, если вы приостановили очереди, поскольку нам нужно получить список всех очередей с помощью запроса типа
SELECT DISTINCT (queue_name)
FROM solid_queue_ready_executions
а затем удалите приостановленные. Пауза вообще должна быть чем-то редким, использоваться в особых обстоятельствах и на короткий период времени. Если вы больше не хотите обрабатывать задания из очереди, лучший способ сделать это — удалить его из списка очередей.
Подводя итог, если вы хотите обеспечить оптимальную производительность при опросе , лучший способ сделать это — всегда указывать для них точные имена и не приостанавливать очереди.
Сделайте это:
queues : background, backend
вместо этого:
queues : back*
Рабочие процессы в Solid Queue используют пул потоков для выполнения работы в нескольких потоках, настраиваемый с помощью параметра threads
выше. Помимо этого, параллелизм может быть достигнут за счет нескольких процессов на одной машине (настраиваемых с помощью разных рабочих процессов или параметра processes
выше) или за счет горизонтального масштабирования.
Супервайзер отвечает за управление этими процессами и реагирует на следующие сигналы:
TERM
, INT
: запускает корректное завершение. Супервизор отправит сигнал TERM
своим контролируемым процессам и будет ждать до времени SolidQueue.shutdown_timeout
, пока они не завершатся. Если к тому времени какие-либо контролируемые процессы все еще будут работать, он отправит им сигнал QUIT
, указывающий, что они должны выйти.QUIT
: начинает немедленное завершение. Супервизор отправит сигнал QUIT
своим контролируемым процессам, заставляя их немедленно завершить работу. При получении сигнала QUIT
, если у рабочих все еще есть задания, они будут возвращены в очередь при отмене регистрации процессов.
Если у процессов нет возможности очиститься перед завершением (например, если кто-то где-то потянул кабель), текущие задания могут остаться востребованными процессами, выполняющими их. Процессы отправляют контрольные сигналы, а супервизор проверяет и удаляет процессы с истекшими контрольными сигналами, в результате чего все заявленные задания возвращаются в их очереди. Вы можете настроить как частоту тактовых импульсов, так и порог, позволяющий считать процесс мертвым. См. раздел ниже.
Вы можете настроить базу данных, используемую Solid Queue, с помощью параметра config.solid_queue.connects_to
в файлах конфигурации config/application.rb
или config/environments/production.rb
. По умолчанию одна база данных используется как для записи, так и для чтения вызываемой queue
в соответствии с конфигурацией базы данных, которую вы настроили во время установки.
Здесь можно использовать все параметры, доступные Active Record для нескольких баз данных.
В очереди Solid вы можете подключиться к двум различным моментам в жизни руководителя:
start
: после того, как супервизор завершил загрузку и прямо перед тем, как он разветвляет рабочих и диспетчеров.stop
: после получения сигнала ( TERM
, INT
или QUIT
) и непосредственно перед началом плавного или немедленного завершения работы.И на два разных момента в жизни рабочего:
worker_start
: после того, как рабочий процесс завершил загрузку и непосредственно перед запуском цикла опроса.worker_stop
: после получения сигнала ( TERM
, INT
или QUIT
) и прямо перед началом корректного или немедленного завершения работы (что является просто exit!
).Для этого вы можете использовать следующие методы с блоком:
SolidQueue . on_start
SolidQueue . on_stop
SolidQueue . on_worker_start
SolidQueue . on_worker_stop
Например:
SolidQueue . on_start { start_metrics_server }
SolidQueue . on_stop { stop_metrics_server }
Их можно вызывать несколько раз для добавления нескольких перехватчиков, но это должно произойти до запуска Solid Queue. Инициализатор был бы хорошим местом для этого.
Примечание . Настройки в этом разделе должны быть установлены в вашем config/application.rb
или конфигурации вашей среды следующим образом: config.solid_queue.silence_polling = true
Существует несколько настроек, управляющих работой Solid Queue, которые вы также можете установить:
logger
: регистратор, который вы хотите использовать Solid Queue. По умолчанию используется регистратор приложений.
app_executor
: исполнитель Rails, используемый для обертывания асинхронных операций, по умолчанию используется исполнитель приложения.
on_thread_error
: пользовательский лямбда-процесс/процесс, вызываемый при возникновении ошибки в потоке Solid Queue, который принимает возникшее исключение в качестве аргумента. По умолчанию
-> ( exception ) { Rails . error . report ( exception , handled : false ) }
Это не используется для ошибок, возникших при выполнении задания . Ошибки, возникающие в заданиях, обрабатываются с помощью retry_on
или discard_on
Active Job и в конечном итоге приводят к сбою заданий. Это касается ошибок, возникающих внутри самой Solid Queue.
use_skip_locked
: следует ли использовать FOR UPDATE SKIP LOCKED
при выполнении блокирующего чтения. В будущем это будет автоматически определяться, а на данный момент вам нужно будет установить значение false
только в том случае, если ваша база данных не поддерживает это. Для MySQL это будут версии < 8, а для PostgreSQL — версии < 9.5. Если вы используете SQLite, это не имеет никакого эффекта, поскольку записи выполняются последовательно.
process_heartbeat_interval
: интервал подтверждения, которому будут следовать все процессы — по умолчанию равен 60 секундам.
process_alive_threshold
: как долго ждать, пока процесс не будет считаться мертвым после его последнего контрольного сигнала — по умолчанию 5 минут.
shutdown_timeout
: время, которое супервизор будет ждать с момента отправки сигнала TERM
своим контролируемым процессам перед отправкой им версии QUIT
с запросом немедленного завершения — по умолчанию 5 секунд.
silence_polling
: отключать ли звук журналов Active Record, создаваемых при опросе как рабочих, так и диспетчеров — по умолчанию true
.
supervisor_pidfile
: путь к pid-файлу, который супервизор создаст при загрузке, чтобы предотвратить запуск более одного супервизора на одном хосте или в случае, если вы хотите использовать его для проверки работоспособности. По умолчанию оно nil
.
preserve_finished_jobs
: сохранять ли завершенные задания в таблице solid_queue_jobs
— по умолчанию true
.
clear_finished_jobs_after
: период хранения завершенных заданий, если preserve_finished_jobs
имеет значение true — по умолчанию 1 день. Примечание. На данный момент автоматическая очистка завершенных заданий не осуществляется. Вам нужно будет делать это, периодически вызывая SolidQueue::Job.clear_finished_in_batches
, но в ближайшем будущем это произойдет автоматически.
default_concurrency_control_period
: значение, которое будет использоваться по умолчанию для параметра duration
в элементах управления параллелизмом. По умолчанию оно составляет 3 минуты.
Solid Queue вызовет SolidQueue::Job::EnqueueError
для любых ошибок Active Record, возникающих при постановке задания в очередь. Причина не поднимать ActiveJob::EnqueueError
заключается в том, что он обрабатывается Active Job, в результате чего perform_later
возвращает false
и устанавливает job.enqueue_error
, переводя задание в блок, который вам нужно передать в perform_later
. Это очень хорошо работает для ваших собственных заданий, но очень затрудняет обработку ошибок для заданий, поставленных в очередь Rails или других драгоценных камней, таких как Turbo::Streams::BroadcastJob
или ActiveStorage::AnalyzeJob
, потому что вы не контролируете вызов perform_later
в таких случаях.
В случае повторяющихся задач, если такая ошибка возникает при постановке в очередь задания, соответствующего задаче, она будет обработана и зарегистрирована, но не появится.
Solid Queue расширяет Active Job элементами управления параллелизмом, которые позволяют вам ограничить количество заданий определенного типа или с определенными аргументами, которые могут выполняться одновременно. При таком ограничении выполнение заданий будет заблокировано и останется заблокированным до тех пор, пока другое задание не завершится и не разблокирует их, или после истечения установленного времени истечения срока действия ( длительности ограничения параллелизма). Работы никогда не отбрасываются и не теряются, а только блокируются.
class MyJob < ApplicationJob
limits_concurrency to : max_concurrent_executions , key : -> ( arg1 , arg2 , ** ) { ... } , duration : max_interval_to_guarantee_concurrency_limit , group : concurrency_group
# ...
key
— единственный обязательный параметр, он может быть символом, строкой или процедурой, которая получает аргументы задания в качестве параметров и будет использоваться для идентификации заданий, которые необходимо ограничить вместе. Если процедура возвращает запись Active Record, ключ будет создан на основе имени и id
ее класса.to
1
.duration
установлено значение SolidQueue.default_concurrency_control_period
, которое по умолчанию составляет 3 minutes
, но вы также можете настроить его.group
используется для управления одновременной работой различных классов заданий. По умолчанию используется имя класса задания. Если задание включает в себя эти элементы управления, мы гарантируем, что максимальное количество заданий ( to
), дающих один и тот же key
, будет выполняться одновременно, и эта гарантия будет действовать в течение duration
для каждого задания, поставленного в очередь. Обратите внимание, что нет никакой гарантии относительно порядка выполнения , а только то, что задания выполняются одновременно (перекрытие).
Ограничения параллелизма используют концепцию семафоров при постановке в очередь и работают следующим образом: когда задание ставится в очередь, мы проверяем, указаны ли в нем элементы управления параллелизмом. Если да, мы проверяем семафор на предмет вычисленного ключа параллелизма. Если семафор открыт, мы запрашиваем его и определяем задание как готовое . Готово означает, что рабочие могут забрать его для выполнения. Когда выполнение задания завершается (успешно или неудачно, что приводит к сбою выполнения), мы сигнализируем семафору и пытаемся разблокировать следующее задание с тем же ключом, если таковой имеется. Разблокирование следующего задания не означает немедленного запуска этого задания, а означает его перевод из состояния заблокированного в состояние готовности . Поскольку может случиться что-то, что помешает первому заданию освободить семафор и разблокировать следующее задание (например, кто-то выдернет вилку из машины, на которой работает исполнитель), мы используем duration
как отказоустойчивую. Задания, которые были заблокированы на срок, превышающий период времени, являются кандидатами на освобождение, но только в таком количестве, которое позволяют правила параллелизма, поскольку каждое из них должно будет пройти проверку танца семафоров. Это означает, что duration
на самом деле связана не с заданием, которое стоит в очереди или выполняется, а с заданиями, которые заблокированы в ожидании.
Например:
class DeliverAnnouncementToContactJob < ApplicationJob
limits_concurrency to : 2 , key : -> ( contact ) { contact . account } , duration : 5 . minutes
def perform ( contact )
# ...
Где contact
и account
являются записями ActiveRecord
. В этом случае мы обеспечим одновременное выполнение не более двух заданий типа DeliverAnnouncementToContact
для одной и той же учетной записи. Если по какой-либо причине одно из этих заданий занимает больше 5 минут или не снимает блокировку параллелизма (сигнализирует семафор) в течение 5 минут после ее получения, новое задание с тем же ключом может получить блокировку.
Давайте посмотрим еще один пример с использованием group
:
class Box :: MovePostingsByContactToDesignatedBoxJob < ApplicationJob
limits_concurrency key : -> ( contact ) { contact } , duration : 15 . minutes , group : "ContactActions"
def perform ( contact )
# ...
class Bundle :: RebundlePostingsJob < ApplicationJob
limits_concurrency key : -> ( bundle ) { bundle . contact } , duration : 15 . minutes , group : "ContactActions"
def perform ( bundle )
# ...
В этом случае, если у нас есть задание Box::MovePostingsByContactToDesignatedBoxJob
, поставленное в очередь для записи контакта с идентификатором 123
, и другое Bundle::RebundlePostingsJob
, поставленное одновременно в очередь для записи пакета, ссылающейся на контакт 123
, только одному из них будет разрешено продолжить работу. Другой останется заблокированным до тех пор, пока не закончится первый (или не пройдет 15 минут, что произойдет раньше).
Обратите внимание, что настройка duration
косвенно зависит от значения concurrency_maintenance_interval
, которое вы установили для своих диспетчеров, поскольку это будет частота, с которой заблокированные задания проверяются и разблокируются. В общем, вам следует установить duration
таким образом, чтобы все ваши задания завершались в течение этого времени, и рассматривать задачу обслуживания параллелизма как отказоустойчивую систему на случай, если что-то пойдет не так.
Задания разблокируются в порядке приоритета, но порядок очереди при разблокировке заданий не учитывается. Это означает, что если у вас есть группа заданий, которые используют одну группу параллелизма, но находятся в разных очередях, или задания одного класса, которые вы ставите в разные очереди, порядок очереди, который вы установили для работника, не учитывается при разблокировке заблокированных те. Причина в том, что запущенное задание разблокирует следующее, а само задание не знает о порядке очереди конкретного работника (у вас даже могут быть разные работники с разными порядками очереди), оно может знать только о приоритете. Как только заблокированные задания будут разблокированы и доступны для опроса, они будут подобраны работником в соответствии с порядком его очереди.
Наконец, неудачные задания, которые повторяются автоматически или вручную, работают так же, как и новые задания, поставленные в очередь: они попадают в очередь на получение открытого семафора, и всякий раз, когда они его получают, они запускаются. Не имеет значения, получили ли они уже открытый семафор в прошлом.
Solid Queue не имеет механизма автоматического повтора, для этого он использует Active Job. Задания, которые завершились неудачей, будут храниться в системе, и для них будет создана запись о неудачном выполнении (запись в таблице solid_queue_failed_executions
). Задание будет оставаться там до тех пор, пока оно не будет удалено вручную или повторно не поставлено в очередь. Вы можете сделать это в консоли следующим образом:
failed_execution = SolidQueue :: FailedExecution . find ( ... ) # Find the failed execution related to your job
failed_execution . error # inspect the error
failed_execution . retry # This will re-enqueue the job as if it was enqueued for the first time
failed_execution . discard # This will delete the job from the system
Однако мы рекомендуем взглянуть на Mission_control-jobs — панель мониторинга, где, помимо прочего, вы можете проверять и повторять/отбрасывать неудачные задания.
Некоторые службы отслеживания ошибок, которые интегрируются с Rails, такие как Sentry или Rollbar, подключаются к Active Job и автоматически сообщают о необработанных ошибках, возникающих во время выполнения задания. Однако, если ваша система отслеживания ошибок не поддерживает или вам нужны специальные отчеты, вы можете подключиться к Active Job самостоятельно. Возможный способ сделать это:
# application_job.rb
class ApplicationJob < ActiveJob :: Base
rescue_from ( Exception ) do | exception |
Rails . error . report ( exception )
raise exception
end
end
Обратите внимание: вам придется продублировать приведенную выше логику и в ActionMailer::MailDeliveryJob
. Это связано с тем, что ActionMailer
не наследуется от ApplicationJob
, а вместо этого использует ActionMailer::MailDeliveryJob
, который наследуется от ActiveJob::Base
.
# application_mailer.rb
class ApplicationMailer < ActionMailer :: Base
ActionMailer :: MailDeliveryJob . rescue_from ( Exception ) do | exception |
Rails . error . report ( exception )
raise exception
end
end
Мы предоставляем плагин Puma, если вы хотите запустить супервизор Solid Queue вместе с Puma, а также позволить Puma отслеживать и управлять им. Вам просто нужно добавить
plugin :solid_queue
в вашу конфигурацию puma.rb
Поскольку это может быть довольно сложно, и многим людям не нужно об этом беспокоиться, по умолчанию Solid Queue настраивается в другой базе данных в качестве основного приложения, постановка заданий в очередь откладывается до тех пор, пока не будет зафиксирована какая-либо текущая транзакция, благодаря встроенной функции Active Job. возможность сделать это. Это означает, что даже если вы запустите Solid Queue в той же базе данных, что и ваше приложение, вы не сможете воспользоваться преимуществами целостности транзакций.
Если вы предпочитаете изменить это, вы можете установить для config.active_job.enqueue_after_transaction_commit
значение never
. Вы также можете установить это для каждого задания.
Если вы установите для этого параметра значение « never
», но все же хотите убедиться, что вы непреднамеренно не нарушаете целостность транзакций, вы можете убедиться, что:
Ваши задания, основанные на определенных данных, всегда ставятся в очередь с помощью обратных вызовов after_commit
или иным образом из места, где вы уверены, что любые данные, которые будет использовать задание, были зафиксированы в базе данных до того, как задание будет поставлено в очередь.
Или вы настраиваете другую базу данных для Solid Queue, даже если она совпадает с вашим приложением, гарантируя, что для постановки заданий в очередь будет использоваться другое соединение для запросов обработки потоков или выполнения заданий вашего приложения. Например:
class ApplicationRecord < ActiveRecord :: Base
self . abstract_class = true
connects_to database : { writing : :primary , reading : :replica }
config . solid_queue . connects_to = { database : { writing : :primary , reading : :replica } }
Solid Queue поддерживает определение повторяющихся задач, которые выполняются в определенное время в будущем на регулярной основе, например, заданий cron. Они управляются процессом планировщика и определяются в их собственном файле конфигурации. По умолчанию файл находится в config/recurring.yml
, но вы можете установить другой путь, используя переменную среды SOLID_QUEUE_RECURRING_SCHEDULE
или используя параметр --recurring_schedule_file
с bin/jobs
, например:
bin/jobs --recurring_schedule_file=config/schedule.yml
Сама конфигурация выглядит так:
production :
a_periodic_job :
class : MyJob
args : [ 42, { status: "custom_status" } ]
schedule : every second
a_cleanup_task :
command : " DeletedStuff.clear_all "
schedule : every day at 9am
Задачи указываются в виде хеша/словаря, где ключ будет внутренним ключом задачи. Каждая задача должна иметь либо class
, который будет классом задания для постановки в очередь, либо command
, которая будет оцениваться в контексте задания ( SolidQueue::RecurringJob
), которое будет поставлено в очередь в соответствии с его расписанием, в очередь solid_queue_recurring
.
У каждой задачи также должно быть расписание, которое анализируется с помощью Fugit, поэтому оно принимает все, что Fugit принимает в качестве cron. При желании для каждой задачи можно указать следующее:
args
: аргументы, которые будут переданы заданию, в виде одного аргумента, хеша или массива аргументов, который также может включать kwargs в качестве последнего элемента массива.Задание в приведенном выше примере конфигурации будет помещаться в очередь каждую секунду следующим образом:
MyJob . perform_later ( 42 , status : "custom_status" )
queue
: другая очередь, которая будет использоваться при постановке задания в очередь. Если нет, очередь настроена для класса задания.
priority
: числовое значение приоритета, которое будет использоваться при постановке задания в очередь.
Планировщик ставит задачи в соответствующее время, и каждая задача планирует следующую. Это во многом вдохновлено тем, что делает GoodJob.
Можно запустить несколько планировщиков с одной и той же конфигурацией recurring_tasks
, например, если у вас есть несколько серверов для резервирования и вы запускаете scheduler
более чем на одном из них. Чтобы избежать одновременной постановки в очередь повторяющихся задач, запись в новой таблице solid_queue_recurring_executions
создается в той же транзакции, в которой задание ставится в очередь. Эта таблица имеет уникальный индекс для task_key
и run_at
, что гарантирует создание только одной записи для каждой задачи за раз. Это работает только в том случае, если для preserve_finished_jobs
установлено значение true
(по умолчанию), и гарантия действует до тех пор, пока вы сохраняете задания.
Примечание . Поддерживается одно повторяющееся расписание, поэтому вы можете использовать несколько планировщиков, использующих одно и то же расписание, но не несколько планировщиков, использующих разные конфигурации.
Наконец, можно настроить задания, которые не обрабатываются Solid Queue. То есть в вашем приложении может быть такая работа:
class MyResqueJob < ApplicationJob
self . queue_adapter = :resque
def perform ( arg )
# ..
end
end
Вы все равно можете настроить это в Solid Queue:
my_periodic_resque_job :
class : MyResqueJob
args : 22
schedule : " */5 * * * * "
и задание будет поставлено в очередь через perform_later
, чтобы оно запускалось в Resque. Однако в этом случае мы не будем отслеживать для него какие-либо записи solid_queue_recurring_execution
и не будет никаких гарантий, что задание каждый раз будет поставлено в очередь только один раз.
Solid Queue был вдохновлен Resque и GoodJob. Мы рекомендуем ознакомиться с этими проектами, поскольку они являются отличными примерами, на которых мы многому научились.
Гем доступен с открытым исходным кодом в соответствии с условиями лицензии MIT.