Solid Queue は、シンプルさとパフォーマンスを念頭に置いて設計された、アクティブ ジョブ用の DB ベースのキューイング バックエンドです。
通常のジョブのキューへの登録と処理に加え、Solid Queue は遅延ジョブ、同時実行制御、繰り返しジョブ、キューの一時停止、ジョブごとの数値優先順位、キュー順序別の優先順位、および一括エンキュー (アクティブ ジョブのperform_all_later
のenqueue_all
) をサポートします。
Solid Queue は、MySQL、PostgreSQL、SQLite などの SQL データベースで使用でき、可能な場合はFOR UPDATE SKIP LOCKED
句を利用して、ジョブのポーリング時のブロックやロックの待機を回避します。再試行、破棄、エラー処理、シリアル化、遅延をアクティブ ジョブに依存しており、Ruby on Rails のマルチスレッドと互換性があります。
新しい Rails 8 アプリケーションでは、Solid Queue がデフォルトで構成されます。ただし、以前のバージョンを実行している場合は、次の手順に従って手動で追加できます。
bundle add solid_queue
bin/rails solid_queue:install
これにより、Solid Queue が運用アクティブ ジョブ バックエンドとして構成され、構成ファイルconfig/queue.yml
およびconfig/recurring.yml
が作成され、 db/queue_schema.rb
が作成されます。また、Solid Queue の起動に使用できるbin/jobs
実行可能ラッパーも作成されます。
それが完了したら、 config/database.yml
にキュー データベースの構成を追加する必要があります。 SQLite を使用している場合は、次のようになります。
production :
primary :
<< : *default
database : storage/production.sqlite3
queue :
<< : *default
database : storage/production_queue.sqlite3
migrations_paths : db/queue_migrate
...または、MySQL/PostgreSQL/Trilogy を使用している場合:
production :
primary : &primary_production
<< : *default
database : app_production
username : app
password : <%= ENV["APP_DATABASE_PASSWORD"] %>
queue :
<< : *primary_production
database : app_production_queue
migrations_paths : db/queue_migrate
注: bin/rails solid_queue:install
呼び出すと、 config.solid_queue.connects_to = { database: { writing: :queue } }
がconfig/environments/production.rb
に自動的に追加されるため、追加の構成は必要ありません (ただし、これを一致させるには、 database.yml
内のqueue
名を使用する必要があります!)。ただし、Solid Queue を別の環境 (ステージングや開発など) で使用する場合は、 config.solid_queue.connects_to
行をそれぞれの環境ファイルに手動で追加する必要があります。そして、いつものように、 config/database.yml
でデータベースに使用している名前がconfig.solid_queue.connects_to
で使用している名前と一致していることを確認してください。
次に、本番環境でdb:prepare
を実行して、データベースが作成され、スキーマがロードされていることを確認します。
これで、作業を行っているサーバー上でbin/jobs
実行して、ジョブの処理を開始する準備が整いました。これにより、デフォルト設定を使用してすべてのキュー内のジョブの処理が開始されます。 Solid Queue の構成の詳細については、以下を参照してください。
小規模なプロジェクトの場合は、Web サーバーと同じマシン上で Solid Queue を実行できます。スケーリングの準備ができたら、Solid Queue はすぐに使用できる水平スケーリングをサポートします。 Web サーバーとは別のサーバーで Solid Queue を実行したり、複数のマシンでbin/jobs
同時に実行したりすることもできます。構成に応じて、ディスパッチャのみまたはワーカーのみを実行するように一部のマシンを指定できます。詳細については、構成セクションを参照してください。
注: スキーマに対する将来の変更は、定期的な移行の形で行われる予定です。
Solid Queue を別のデータベースで実行することをお勧めしますが、アプリとキューの両方に 1 つのデータベースを使用することも可能です。次の手順に従ってください。
db/queue_schema.rb
の内容を通常の移行にコピーし、 db/queue_schema.rb
を削除します。production.rb
からconfig.solid_queue.connects_to
を削除しますbin/jobs
実行する準備ができました複数のデータベースを持たないため、 database.yml
プライマリ データベースとキュー データベースを含める必要はありません。
一度に 1 つのジョブを切り替えて Solid Queue を段階的に導入することを計画している場合は、 config.active_job.queue_adapter
古いバックエンドに設定したままにし、移動するジョブにqueue_adapter
直接設定することで実現できます。
# app/jobs/my_job.rb
class MyJob < ApplicationJob
self . queue_adapter = :solid_queue
# ...
end
Solid Queue は、 FOR UPDATE SKIP LOCKED
をサポートしているため、MySQL 8 以降または PostgreSQL 9.5 以降で使用した場合に最高のスループットを実現するように設計されています。古いバージョンでも使用できますが、その場合、同じキューに対して複数のワーカーを実行すると、ロック待機が発生する可能性があります。小規模なアプリケーションでは SQLite と一緒に使用することもできます。
Solid Queue にはいくつかのタイプのアクターがあります。
solid_queue_ready_executions
テーブルに基づいて機能します。solid_queue_scheduled_executions
テーブルからsolid_queue_ready_executions
テーブルに移動するだけです。さらに、同時実行制御に関連するメンテナンス作業も行います。Solid Queue のスーパーバイザーは、監視対象のワーカー/ディスパッチャー/スケジューラーごとに個別のプロセスをフォークします。
デフォルトでは、Solid Queue はconfig/queue.yml
で構成を検索しようとしますが、次のように、環境変数SOLID_QUEUE_CONFIG
を使用するか、 bin/jobs
で-c/--config_file
オプションを使用して、別のパスを設定することもできます。
bin/jobs -c config/calendar.yml
この構成は次のようになります。
production :
dispatchers :
- polling_interval : 1
batch_size : 500
concurrency_maintenance_interval : 300
workers :
- queues : " * "
threads : 3
polling_interval : 2
- queues : [ real_time, background ]
threads : 5
polling_interval : 0.1
processes : 3
すべてはオプションです。構成がまったく提供されない場合、Solid Queue はデフォルト設定で 1 つのディスパッチャーと 1 つのワーカーで実行されます。ディスパッチャーまたはワーカーのみを実行したい場合は、そのセクションのみを構成に含める必要があります。たとえば、次の構成の場合:
production :
dispatchers :
- polling_interval : 1
batch_size : 500
concurrency_maintenance_interval : 300
スーパーバイザは 1 つのディスパッチャーを実行し、ワーカーは実行しません。
さまざまなオプションの概要は次のとおりです。
polling_interval
: ワーカーとディスパッチャが追加のジョブをチェックするまで待機する時間間隔 (秒単位)。この時間のデフォルトは、ディスパッチャーの場合は1
秒、ワーカーの場合は0.1
秒です。
batch_size
: ディスパッチャは、このサイズのバッチでジョブをディスパッチします。デフォルトは 500 です。
concurrency_maintenance_interval
: ディスパッチャーが、ブロックを解除できるブロックされたジョブをチェックするまで待機する時間間隔 (秒単位)。この設定の詳細については、同時実行制御の詳細を参照してください。デフォルトは600
秒です。
queues
: ワーカーがジョブを選択するキューのリスト。 *
使用してすべてのキューを示すことができます (これがデフォルトでもあり、これを省略した場合の動作になります)。単一のキューを指定することも、配列としてキューのリストを指定することもできます。ジョブはこれらのキューから順番にポーリングされるため、たとえば[ real_time, background ]
を使用すると、 real_time
で待機しているジョブがなくなっていない限り、 background
からジョブは取得されません。ワイルドカードを使用してプレフィックスを指定して、プレフィックスで始まるキューを照合することもできます。例えば:
staging :
workers :
- queues : staging*
threads : 3
polling_interval : 5
これにより、 staging
から始まるすべてのキューからジョブをフェッチするワーカーが作成されます。ワイルドカード*
は、単独で、またはキュー名の末尾でのみ使用できます。 *_some_queue
などのキュー名を指定することはできません。これらは無視されます。
最後に、 [ staging*, background ]
のようにプレフィックスを正確な名前と組み合わせることができ、順序に関する動作は正確な名前のみの場合と同じになります。
優先順位と組み合わせたキューの順序の動作と、ワーカーごとのキューの指定方法がパフォーマンスにどのような影響を与えるかについては、以下のセクションを確認してください。
threads
: これは、各ワーカーがジョブを実行するために必要なスレッド プールの最大サイズです。各ワーカーは最大でこの数のジョブをキューからフェッチし、実行するためにスレッド プールにポストします。デフォルトでは、これは3
です。この設定はワーカーのみにあります。
processes
: これは、指定された設定でスーパーバイザーによってフォークされるワーカー プロセスの数です。デフォルトでは、これは1
であり、単一のプロセスにすぎません。この設定は、複数の CPU コアを 1 つのキューまたは同じ構成のキュー専用にしたい場合に便利です。この設定はワーカーのみにあります。
concurrency_maintenance
: ディスパッチャが同時実行性のメンテナンス作業を実行するかどうか。これはデフォルトでtrue
であり、同時実行制御を使用していない場合にそれを無効にしたい場合、または複数のディスパッチャを実行していて、一部のディスパッチャが他には何もせずにジョブをディスパッチするだけである場合に便利です。
上で述べたように、ワーカーのキューのリストを指定すると、リストreal_time,background
など、指定された順序でポーリングされます。 background
中のジョブがない限り、バックグラウンドからジョブは取得されません。 real_time
。
Active Job は、ジョブをキューに入れる際の正の整数の優先順位もサポートします。 Solid Queueでは、値が小さいほど優先度が高くなります。デフォルトは0
です。
これは、重要性や緊急性が異なるジョブを同じキューで実行する場合に便利です。同じキュー内ではジョブは優先順位に従って選択されますが、キューのリストではキューの順序が優先されるため、 real_time,background
使用した前の例では、 real_time
キュー内のジョブがバックbackground
のジョブより前に選択されます。 background
キュー内のキューに高い優先順位 (小さい値) が設定されている場合でも、キューに追加されます。
キューの順序と優先順位を混同せず、どちらか一方を選択することをお勧めします。これにより、ジョブの実行順序がよりわかりやすくなります。
ポーリングのパフォーマンスを維持し、カバリング インデックスが常に使用されるようにするために、Solid Queue は次の 2 種類のポーリング クエリのみを実行します。
-- No filtering by queue
SELECT job_id
FROM solid_queue_ready_executions
ORDER BY priority ASC , job_id ASC
LIMIT ?
FOR UPDATE SKIP LOCKED;
-- Filtering by a single queue
SELECT job_id
FROM solid_queue_ready_executions
WHERE queue_name = ?
ORDER BY priority ASC , job_id ASC
LIMIT ?
FOR UPDATE SKIP LOCKED;
最初のもの (キューによるフィルタリングなし) は、次の場合に使用されます。
queues : *
すべてのキューを対象にしたいため、一時停止されているキューはありません。
他の場合には、インデックスを使用して並べ替えることを確実にするために、一度に 1 つのキューでしかフィルターできないため、順番にフィルターするキューのリストが必要になります。これは、キューを次のように指定した場合を意味します。
queues : beta*
まず、次のようなクエリを使用して、そのプレフィックスに一致するすべての既存のキューのリストを取得する必要があります。
SELECT DISTINCT (queue_name)
FROM solid_queue_ready_executions
WHERE queue_name LIKE ' beta% ' ;
インデックスの左端の列に対するこのタイプのDISTINCT
クエリは、Loose Index Scan と呼ばれる手法のおかげで、MySQL で非常に高速に実行できます。ただし、PostgreSQL と SQLite はこの手法を実装していません。つまり、キューが非常に深くなり、 solid_queue_ready_executions
テーブルが非常に大きい場合、このクエリは遅くなります。通常、 solid_queue_ready_executions
テーブルは小さいですが、それが発生する可能性があります。
プレフィックスを使用する場合と同様に、キューを一時停止した場合にも同じことが起こります。これは、次のようなクエリですべてのキューのリストを取得する必要があるためです。
SELECT DISTINCT (queue_name)
FROM solid_queue_ready_executions
そして一時停止したものを削除します。一般に、一時停止はまれなものであり、特殊な状況で短期間使用される必要があります。キューからのジョブをもう処理したくない場合、最善の方法は、キューのリストからジョブを削除することです。
要約すると、ポーリングで最適なパフォーマンスを確保したい場合、そのための最良の方法は、常に正確な名前を指定し、キューを一時停止しないことです。
これを実行します:
queues : background, backend
これの代わりに:
queues : back*
Solid Queue のワーカーは、スレッド プールを使用して複数のスレッドで作業を実行します。これは、上記のthreads
パラメータで構成できます。これに加えて、1 台のマシン上の複数のプロセス (別のワーカーまたは上記のprocesses
パラメーターで構成可能) または水平スケーリングによって並列処理を実現できます。
スーパーバイザはこれらのプロセスの管理を担当し、次の信号に応答します。
TERM
、 INT
: 正常な終了を開始します。スーパーバイザは監視対象プロセスにTERM
シグナルを送信し、プロセスが完了するまで最大SolidQueue.shutdown_timeout
時間まで待機します。それまでに監視対象プロセスがまだ存在している場合は、終了する必要があることを示すQUIT
シグナルをそれらのプロセスに送信します。QUIT
: 即時終了を開始します。スーパーバイザは監視対象プロセスにQUIT
信号を送信し、プロセスを直ちに終了させます。 QUIT
シグナルを受信したときに、ワーカーにまだ実行中のジョブがある場合、プロセスの登録が解除されると、これらのジョブはキューに返されます。
プロセスが終了する前にクリーンアップする機会がない場合 (たとえば、誰かがどこかでケーブルを引っ張った場合)、実行中のジョブは、実行中のプロセスによって要求されたままになる可能性があります。プロセスはハートビートを送信し、スーパーバイザは期限切れのハートビートを持つプロセスをチェックしてプルーニングし、要求されたジョブをキューに戻します。ハートビートの頻度と、プロセスが停止したとみなすしきい値の両方を構成できます。これについては、以下のセクションを参照してください。
Solid Queue で使用されるデータベースはconfig/application.rb
またはconfig/environments/production.rb
構成ファイルのconfig.solid_queue.connects_to
オプションを使用して構成できます。デフォルトでは、インストール中に設定したデータベース構成と一致するように、呼び出されたqueue
の書き込みと読み取りの両方に単一のデータベースが使用されます。
複数のデータベースに対して Active Record で使用できるすべてのオプションをここで使用できます。
Solid キューでは、スーパーバイザの生活の 2 つの異なる時点に接続できます。
start
: スーパーバイザが起動を完了した後、ワーカーとディスパッチャをフォークする直前。stop
: シグナル ( TERM
、 INT
またはQUIT
) を受信した後、正常なシャットダウンまたは即時シャットダウンを開始する直前。そして、労働者の人生における 2 つの異なる時点に分けて説明します。
worker_start
: ワーカーの起動が完了した後、ポーリング ループを開始する直前。worker_stop
: シグナル ( TERM
、 INT
またはQUIT
) を受信した後、正常なシャットダウンまたは即時シャットダウンを開始する直前 (これは単にexit!
)。これを行うには、ブロックで次のメソッドを使用できます。
SolidQueue . on_start
SolidQueue . on_stop
SolidQueue . on_worker_start
SolidQueue . on_worker_stop
例えば:
SolidQueue . on_start { start_metrics_server }
SolidQueue . on_stop { stop_metrics_server }
これらを複数回呼び出して複数のフックを追加できますが、Solid Queue が開始される前に実行する必要があります。これを行うには初期化子が適しています。
注: このセクションの設定は、 config/application.rb
または環境設定で次のように設定する必要があります: config.solid_queue.silence_polling = true
Solid Queue の動作を制御するいくつかの設定があり、これらも設定できます。
logger
: Solid Queue で使用するロガー。デフォルトはアプリロガーです。
app_executor
: 非同期操作をラップするために使用される Rails エグゼキュータ。デフォルトはアプリ エグゼキュータです。
on_thread_error
: 発生した例外を引数として受け取る Solid Queue スレッド内でエラーが発生したときに呼び出すカスタム lambda/Proc。デフォルトは
-> ( exception ) { Rails . error . report ( exception , handled : false ) }
これは、ジョブの実行内で発生したエラーには使用されません。ジョブ内で発生したエラーは、アクティブなジョブのretry_on
またはdiscard_on
によって処理され、最終的にはジョブが失敗します。これは、Solid Queue 自体内で発生したエラーに対するものです。
use_skip_locked
: ロック読み取りを実行するときにFOR UPDATE SKIP LOCKED
使用するかどうか。これは将来的には自動的に検出される予定で、現時点ではデータベースがサポートしていない場合にのみこれをfalse
に設定する必要があります。 MySQL の場合はバージョン 8 未満、PostgreSQL の場合はバージョン 9.5 未満になります。 SQLite を使用する場合、書き込みは順次に行われるため、これは効果がありません。
process_heartbeat_interval
: すべてのプロセスが従うハートビート間隔。デフォルトは 60 秒です。
process_alive_threshold
: 最後のハートビート後にプロセスが停止したとみなされるまでの待機時間 (デフォルトは 5 分)。
shutdown_timeout
: スーパーバイザが監視対象プロセスにTERM
シグナルを送信してから、即時終了を要求するQUIT
バージョンを送信するまでに待機する時間。デフォルトは 5 秒です。
silence_polling
: ワーカーとディスパッチャーの両方をポーリングするときに出力されるアクティブ レコード ログをサイレントにするかどうか (デフォルトはtrue
。
supervisor_pidfile
: 同じホスト内で複数のスーパーバイザが実行されないように、またはヘルス チェックに使用する場合にスーパーバイザが起動時に作成する pidfile へのパス。デフォルトではnil
です。
preserve_finished_jobs
: 完了したジョブをsolid_queue_jobs
テーブルに保持するかどうか (デフォルトはtrue
。
clear_finished_jobs_after
: preserve_finished_jobs
が true の場合に、完了したジョブを保持する期間。デフォルトは 1 日です。注:現時点では、完了したジョブの自動クリーンアップはありません。これを行うには、 SolidQueue::Job.clear_finished_in_batches
定期的に呼び出す必要がありますが、これは近い将来自動的に行われる予定です。
default_concurrency_control_period
: 同時実行制御のduration
パラメーターのデフォルトとして使用される値。デフォルトは 3 分です。
Solid Queue は、ジョブをキューに入れるときにアクティブ レコード エラーが発生すると、 SolidQueue::Job::EnqueueError
を生成します。 ActiveJob::EnqueueError
が発生しない理由は、これが Active Job によって処理されるため、 perform_later
false
を返してjob.enqueue_error
を設定し、ジョブをperform_later
に渡す必要があるブロックに渡すためです。これは独自のジョブでは非常にうまく機能しますが、Rails や他の gem ( Turbo::Streams::BroadcastJob
やActiveStorage::AnalyzeJob
など) によってエンキューされたジョブの場合は、 perform_later
の呼び出しを制御しないため、失敗の処理が非常に困難になります。その場合。
定期的なタスクの場合、そのタスクに対応するジョブをキューに入れるときにそのようなエラーが発生した場合、そのエラーは処理されてログに記録されますが、バブルアップすることはありません。
Solid Queue は、同時実行制御を使用して Active Job を拡張します。これにより、特定のタイプまたは特定の引数を持つジョブを同時に実行できる数を制限できます。この方法で制限すると、ジョブは実行がブロックされ、別のジョブが終了してブロックを解除するか、設定された有効期限 (同時実行制限の期間) が経過するまで、ブロックされたままになります。ジョブは破棄されたり失われたりすることはなく、ブロックされるだけです。
class MyJob < ApplicationJob
limits_concurrency to : max_concurrent_executions , key : -> ( arg1 , arg2 , ** ) { ... } , duration : max_interval_to_guarantee_concurrency_limit , group : concurrency_group
# ...
key
唯一の必須パラメータであり、ジョブ引数をパラメータとして受け取るシンボル、文字列、またはプロシージャのいずれかにすることができ、一緒に制限する必要があるジョブを識別するために使用されます。プロシージャが Active Record レコードを返す場合、キーはクラス名とid
から構築されます。to
のデフォルトは1
です。duration
デフォルトでSolidQueue.default_concurrency_control_period
に設定されており、これ自体のデフォルトは3 minutes
ですが、同様に構成することもできます。group
さまざまなジョブ クラスの同時実行性をまとめて制御するために使用されます。デフォルトはジョブ クラス名です。ジョブにこれらのコントロールが含まれている場合、同じkey
生成する最大数のジョブ ( to
示されている) が同時に実行されることが保証され、この保証はキューに入れられた各ジョブの存続duration
中継続します。実行順序については保証がなく、同時に実行される (重複する) ジョブについてのみ保証されることに注意してください。
同時実行制限は、キューに入れるときにセマフォの概念を使用し、次のように機能します。ジョブがキューに入れられると、ジョブが同時実行制御を指定しているかどうかがチェックされます。存在する場合は、計算された同時実行キーのセマフォをチェックします。セマフォが開いている場合は、それを要求し、ジョブをreadyとして設定します。 Ready は、ワーカーが実行のために取得できることを意味します。ジョブの実行が終了すると (成功または失敗して実行が失敗した場合)、セマフォに信号を送り、同じキーを使用して次のジョブのブロックを解除しようとします (存在する場合)。次のジョブのブロックを解除することは、そのジョブをすぐに実行することを意味するのではなく、ブロックされた状態から準備完了状態に移行することを意味します。最初のジョブがセマフォを解放して次のジョブのブロックを解除することを妨げる何かが発生する可能性があるため (たとえば、ワーカーが実行されているマシンのプラグが誰かに抜かれるなど)、フェイルセーフとしてduration
設けています。一定の期間を超えてブロックされていたジョブは解放の対象となりますが、それぞれのジョブがセマフォ ダンス チェックを通過する必要があるため、同時実行ルールで許可される数だけ解放されます。これは、実際のduration
、キューに入れられているジョブや実行中のジョブに関するものではなく、ブロックされて待機しているジョブに関するものであることを意味します。
例えば:
class DeliverAnnouncementToContactJob < ApplicationJob
limits_concurrency to : 2 , key : -> ( contact ) { contact . account } , duration : 5 . minutes
def perform ( contact )
# ...
contact
とaccount
ActiveRecord
レコードです。この場合、同じアカウントに対してDeliverAnnouncementToContact
という種類のジョブが最大 2 つ同時に実行されるようにします。何らかの理由で、これらのジョブの 1 つに 5 分以上かかる場合、またはジョブを取得してから 5 分以内に同時実行ロックが解放されない場合 (セマフォに信号を送信する)、同じキーを持つ新しいジョブがロックを取得する可能性があります。
group
使用した別の例を見てみましょう。
class Box :: MovePostingsByContactToDesignatedBoxJob < ApplicationJob
limits_concurrency key : -> ( contact ) { contact } , duration : 15 . minutes , group : "ContactActions"
def perform ( contact )
# ...
class Bundle :: RebundlePostingsJob < ApplicationJob
limits_concurrency key : -> ( bundle ) { bundle . contact } , duration : 15 . minutes , group : "ContactActions"
def perform ( bundle )
# ...
この場合、ID 123
の連絡先レコードに対してBox::MovePostingsByContactToDesignatedBoxJob
ジョブがエンキューされ、 contact 123
参照するバンドル レコードに対して同時にエンキューされた別のBundle::RebundlePostingsJob
ジョブがある場合、そのうちの 1 つだけが続行を許可されます。もう 1 つは、最初のものが終了するまで (または、先に何が起こっても 15 分が経過するまで) ブロックされたままになります。
duration
設定は、ディスパッチャーに設定したconcurrency_maintenance_interval
の値に間接的に依存することに注意してください。これは、ブロックされたジョブがチェックされ、ブロック解除される頻度となるためです。一般に、すべてのジョブがその期間内で適切に完了するようにduration
設定し、同時実行性の維持タスクを何か問題が発生した場合のフェイルセーフとして考える必要があります。
ジョブは優先順位に従ってブロック解除されますが、ジョブのブロック解除ではキューの順序は考慮されません。つまり、同時実行グループを共有しているが別のキューにあるジョブのグループ、または別のキューにエンキューしている同じクラスのジョブがある場合、ブロックされているブロックを解除するときに、ワーカーに設定したキューの順序は考慮されません。もの。その理由は、実行されるジョブは次のジョブのブロックを解除し、ジョブ自体は特定のワーカーのキュー順序については認識せず (異なるキュー順序を持つ別のワーカーが存在する可能性もあります)、優先度についてのみ知ることができるためです。ブロックされたジョブのブロックが解除され、ポーリングが可能になると、キューの順序に従ってワーカーによってジョブが選択されます。
最後に、自動または手動で再試行される失敗したジョブは、キューに入れられる新しいジョブと同じように機能します。オープン セマフォを取得するためにキューに入れられ、セマフォを取得するたびに実行されます。過去にすでにオープン セマフォを取得していたかどうかは関係ありません。
Solid Queue には自動再試行メカニズムが含まれておらず、これはアクティブ ジョブに依存しています。失敗したジョブはシステムに保持され、これらのジョブに対して失敗した実行( solid_queue_failed_executions
テーブル内のレコード) が作成されます。ジョブは、手動で破棄されるか再エンキューされるまで、そこに残ります。これはコンソールで次のように実行できます。
failed_execution = SolidQueue :: FailedExecution . find ( ... ) # Find the failed execution related to your job
failed_execution . error # inspect the error
failed_execution . retry # This will re-enqueue the job as if it was enqueued for the first time
failed_execution . discard # This will delete the job from the system
ただし、特に、失敗したジョブを調べて再試行/破棄できるダッシュボードである、mission_control-jobs を確認することをお勧めします。
Sentry や Rollbar など、Rails と統合された一部のエラー追跡サービスは、アクティブ ジョブにフックし、ジョブの実行中に発生した未処理のエラーを自動的に報告します。ただし、エラー追跡システムがそうでない場合、またはカスタム レポートが必要な場合は、自分でアクティブ ジョブに接続できます。これを行うには次のような方法が考えられます。
# application_job.rb
class ApplicationJob < ActiveJob :: Base
rescue_from ( Exception ) do | exception |
Rails . error . report ( exception )
raise exception
end
end
ActionMailer::MailDeliveryJob
でも上記のロジックを複製する必要があることに注意してください。これは、 ActionMailer
ApplicationJob
を継承せず、代わりにActiveJob::Base
を継承するActionMailer::MailDeliveryJob
を使用するためです。
# application_mailer.rb
class ApplicationMailer < ActionMailer :: Base
ActionMailer :: MailDeliveryJob . rescue_from ( Exception ) do | exception |
Rails . error . report ( exception )
raise exception
end
end
Solid Queue のスーパーバイザーを Puma と一緒に実行し、Puma に監視および管理させたい場合は、Puma プラグインを提供します。追加するだけです
plugin :solid_queue
puma.rb
設定に追加します。
これは非常に注意が必要なため、多くの人は心配する必要はありません。デフォルトでは、Solid Queue はメイン アプリとして別のデータベースに構成され、アクティブ ジョブの組み込み機能のおかげで、進行中のトランザクションがコミットされるまでジョブのキューへの登録は延期されます。これを行う能力。これは、アプリと同じ DB で Solid Queue を実行したとしても、このトランザクションの整合性を利用できないことを意味します。
これを変更したい場合は、 config.active_job.enqueue_after_transaction_commit
never
に設定できます。これをジョブごとに設定することもできます。
これをnever
に設定しても、トランザクションの整合性が誤って保たれていないことを確認したい場合は、次のことを確認できます。
特定のデータに依存するジョブは、 after_commit
コールバック、またはジョブがキューに入れられる前にジョブが使用するデータがデータベースにコミットされていることが確実な場所から常にエンキューされます。
または、アプリケーションと同じデータベースであっても、Solid Queue 用に別のデータベースを構成し、アプリケーションのリクエストを処理したりジョブを実行したりするスレッド上の別の接続がジョブをキューに入れるために使用されるようにします。例えば:
class ApplicationRecord < ActiveRecord :: Base
self . abstract_class = true
connects_to database : { writing : :primary , reading : :replica }
config . solid_queue . connects_to = { database : { writing : :primary , reading : :replica } }
Solid Queue は、cron ジョブのように、将来の特定の時間に定期的に実行される繰り返しタスクの定義をサポートします。これらはスケジューラ プロセスによって管理され、独自の構成ファイルで定義されます。デフォルトでは、ファイルはconfig/recurring.yml
にありますが、次のように、環境変数SOLID_QUEUE_RECURRING_SCHEDULE
を使用するか、 bin/jobs
で--recurring_schedule_file
オプションを使用することによって、別のパスを設定できます。
bin/jobs --recurring_schedule_file=config/schedule.yml
構成自体は次のようになります。
production :
a_periodic_job :
class : MyJob
args : [ 42, { status: "custom_status" } ]
schedule : every second
a_cleanup_task :
command : " DeletedStuff.clear_all "
schedule : every day at 9am
タスクはハッシュ/辞書として指定され、キーは内部的にタスクのキーになります。各タスクには、キューに入れるジョブ クラスとなるclass
、またはスケジュールに従ってキューに入れられるジョブ ( SolidQueue::RecurringJob
) のコンテキストで評価されるcommand
のいずれかが必要です。 solid_queue_recurring
キュー。
各タスクにはスケジュールも必要です。スケジュールは Fugit を使用して解析されるため、Fugit が cron として受け入れるものはすべて受け入れられます。オプションで、タスクごとに次のものを指定できます。
args
: 単一の引数、ハッシュ、または配列の最後の要素として kwargs を含めることもできる引数の配列として、ジョブに渡される引数。上記の構成例のジョブは、次のように毎秒キューに入れられます。
MyJob . perform_later ( 42 , status : "custom_status" )
queue
: ジョブをキューに入れるときに使用される別のキュー。存在しない場合は、ジョブ クラスに対してキューが設定されます。
priority
: ジョブをキューに入れるときに使用される優先順位の数値。
タスクはスケジューラによって対応する時間にキューに入れられ、各タスクが次のタスクをスケジュールします。これは、GoodJob が行っていることにかなり影響を受けています。
たとえば、冗長性のために複数のサーバーがあり、そのうちの複数のサーバーでスケジューラーを実行する場合、同じrecurring_tasks
構成で複数のscheduler
を実行することができます。重複したタスクが同時にキューに入れられるのを避けるために、ジョブがキューに入れられるのと同じトランザクション内に新しいsolid_queue_recurring_executions
テーブルのエントリが作成されます。このテーブルには、 task_key
およびrun_at
に一意のインデックスがあり、タスクごとに 1 回につき 1 つのエントリのみが作成されることが保証されます。これは、 preserve_finished_jobs
true
(デフォルト) に設定している場合にのみ機能し、ジョブを保持している限り保証が適用されます。
注: 単一の繰り返しスケジュールがサポートされているため、同じスケジュールを使用する複数のスケジューラを使用できますが、異なる構成を使用する複数のスケジューラを使用することはできません。
最後に、Solid Queue によって処理されないジョブを構成することができます。つまり、アプリ内で次のようなジョブを実行できます。
class MyResqueJob < ApplicationJob
self . queue_adapter = :resque
def perform ( arg )
# ..
end
end
これは引き続き Solid Queue で設定できます。
my_periodic_resque_job :
class : MyResqueJob
args : 22
schedule : " */5 * * * * "
ジョブは、 perform_later
経由でキューに入れられるため、Resque で実行されます。ただし、この場合、 solid_queue_recurring_execution
レコードは追跡されず、ジョブが毎回 1 回だけキューに入れられるという保証はありません。
Solid Queue は、resque と GoodJob からインスピレーションを得て作られました。これらのプロジェクトは私たちが多くを学んだ素晴らしい例であるため、チェックすることをお勧めします。
この gem は、MIT ライセンスの条件に基づいてオープン ソースとして利用できます。