Solid Queue adalah backend antrian berbasis DB untuk Pekerjaan Aktif, dirancang dengan mempertimbangkan kesederhanaan dan kinerja.
Selain enqueuing dan pemrosesan pekerjaan reguler, Solid Queue mendukung pekerjaan tertunda, kontrol konkurensi, pekerjaan berulang, jeda antrian, prioritas numerik per pekerjaan, prioritas berdasarkan urutan antrian, dan enqueueing massal ( enqueue_all
untuk perform_all_later
Pekerjaan Aktif).
Antrian Padat dapat digunakan dengan database SQL seperti MySQL, PostgreSQL atau SQLite, dan memanfaatkan klausa FOR UPDATE SKIP LOCKED
, jika tersedia, untuk menghindari pemblokiran dan menunggu kunci saat melakukan pemungutan suara. Ini bergantung pada Pekerjaan Aktif untuk percobaan ulang, pembuangan, penanganan kesalahan, serialisasi, atau penundaan, dan kompatibel dengan multi-threading Ruby on Rails.
Antrean Padat dikonfigurasikan secara default di aplikasi Rails 8 baru. Namun jika Anda menjalankan versi sebelumnya, Anda dapat menambahkannya secara manual dengan mengikuti langkah-langkah berikut:
bundle add solid_queue
bin/rails solid_queue:install
Ini akan mengonfigurasi Solid Queue sebagai backend Pekerjaan Aktif produksi, membuat file konfigurasi config/queue.yml
dan config/recurring.yml
, dan membuat db/queue_schema.rb
. Ini juga akan membuat pembungkus bin/jobs
yang dapat dieksekusi yang dapat Anda gunakan untuk memulai Solid Queue.
Setelah Anda selesai melakukannya, Anda harus menambahkan konfigurasi untuk database antrian di config/database.yml
. Jika Anda menggunakan SQLite, tampilannya akan seperti ini:
production :
primary :
<< : *default
database : storage/production.sqlite3
queue :
<< : *default
database : storage/production_queue.sqlite3
migrations_paths : db/queue_migrate
...atau jika Anda menggunakan MySQL/PostgreSQL/Trilogi:
production :
primary : &primary_production
<< : *default
database : app_production
username : app
password : <%= ENV["APP_DATABASE_PASSWORD"] %>
queue :
<< : *primary_production
database : app_production_queue
migrations_paths : db/queue_migrate
Catatan: Memanggil bin/rails solid_queue:install
akan secara otomatis menambahkan config.solid_queue.connects_to = { database: { writing: :queue } }
ke config/environments/production.rb
, jadi tidak diperlukan konfigurasi tambahan di sana (walaupun Anda harus memastikannya bahwa Anda menggunakan nama queue
di database.yml
agar cocok!). Namun jika Anda ingin menggunakan Solid Queue di lingkungan yang berbeda (seperti staging atau bahkan pengembangan), Anda harus menambahkan baris config.solid_queue.connects_to
secara manual ke file lingkungan masing-masing. Dan, seperti biasa, pastikan nama yang Anda gunakan untuk database di config/database.yml
cocok dengan nama yang Anda gunakan di config.solid_queue.connects_to
.
Kemudian jalankan db:prepare
dalam produksi untuk memastikan database dibuat dan skema dimuat.
Sekarang Anda siap untuk mulai memproses pekerjaan dengan menjalankan bin/jobs
di server yang melakukan pekerjaan tersebut. Ini akan mulai memproses pekerjaan di semua antrian menggunakan konfigurasi default. Lihat di bawah untuk mempelajari lebih lanjut tentang mengonfigurasi Antrean Padat.
Untuk proyek kecil, Anda dapat menjalankan Solid Queue di mesin yang sama dengan server web Anda. Saat Anda siap melakukan penskalaan, Solid Queue mendukung penskalaan horizontal yang siap digunakan. Anda dapat menjalankan Solid Queue di server terpisah dari server web Anda, atau bahkan menjalankan bin/jobs
di beberapa mesin secara bersamaan. Bergantung pada konfigurasinya, Anda dapat menetapkan beberapa mesin untuk menjalankan operator saja atau pekerja saja. Lihat bagian konfigurasi untuk rincian lebih lanjut tentang ini.
Catatan : perubahan skema di masa mendatang akan dilakukan dalam bentuk migrasi reguler.
Disarankan untuk menjalankan Antrian Padat dalam database terpisah, namun dimungkinkan juga untuk menggunakan satu database tunggal untuk aplikasi dan antrean. Cukup ikuti langkah-langkah ini:
db/queue_schema.rb
ke migrasi normal dan hapus db/queue_schema.rb
config.solid_queue.connects_to
dari production.rb
bin/jobs
Anda tidak akan memiliki banyak database, jadi database.yml
tidak perlu memiliki database utama dan database antrian.
Jika Anda berencana untuk mengadopsi Solid Queue secara bertahap dengan mengganti satu pekerjaan pada saat itu, Anda dapat melakukannya dengan membiarkan config.active_job.queue_adapter
disetel ke backend lama, lalu menyetel queue_adapter
langsung di tugas yang Anda pindahkan:
# app/jobs/my_job.rb
class MyJob < ApplicationJob
self . queue_adapter = :solid_queue
# ...
end
Solid Queue dirancang untuk throughput tertinggi saat digunakan dengan MySQL 8+ atau PostgreSQL 9.5+, karena mendukung FOR UPDATE SKIP LOCKED
. Anda dapat menggunakannya dengan versi yang lebih lama, namun dalam kasus ini, Anda mungkin mengalami lock wait jika menjalankan beberapa pekerja untuk antrean yang sama. Anda juga dapat menggunakannya dengan SQLite pada aplikasi yang lebih kecil.
Kami memiliki beberapa jenis aktor di Solid Queue:
solid_queue_ready_executions
.solid_queue_scheduled_executions
ke tabel solid_queue_ready_executions
sehingga pekerja dapat mengambilnya. Selain itu, mereka melakukan beberapa pekerjaan pemeliharaan terkait kontrol konkurensi.Supervisor Solid Queue akan melakukan proses terpisah untuk setiap pekerja/pengirim/penjadwal yang diawasi.
Secara default, Solid Queue akan mencoba menemukan konfigurasi Anda di bawah config/queue.yml
, namun Anda dapat menyetel jalur berbeda menggunakan variabel lingkungan SOLID_QUEUE_CONFIG
atau dengan menggunakan opsi -c/--config_file
dengan bin/jobs
, seperti ini:
bin/jobs -c config/calendar.yml
Seperti inilah konfigurasinya:
production :
dispatchers :
- polling_interval : 1
batch_size : 500
concurrency_maintenance_interval : 300
workers :
- queues : " * "
threads : 3
polling_interval : 2
- queues : [ real_time, background ]
threads : 5
polling_interval : 0.1
processes : 3
Semuanya opsional. Jika tidak ada konfigurasi sama sekali yang disediakan, Solid Queue akan berjalan dengan satu operator dan satu pekerja dengan pengaturan default. Jika Anda hanya ingin menjalankan operator atau pekerja, Anda hanya perlu menyertakan bagian tersebut saja dalam konfigurasi. Misalnya dengan konfigurasi berikut:
production :
dispatchers :
- polling_interval : 1
batch_size : 500
concurrency_maintenance_interval : 300
supervisor akan menjalankan 1 petugas operator dan tidak ada pekerja.
Berikut ikhtisar berbagai opsi:
polling_interval
: interval waktu dalam hitungan detik dimana pekerja dan petugas operator akan menunggu sebelum memeriksa pekerjaan lainnya. Waktu ini defaultnya adalah 1
detik untuk petugas operator dan 0.1
detik untuk pekerja.
batch_size
: petugas operator akan mengirimkan pekerjaan dalam batch sebesar ini. Standarnya adalah 500.
concurrency_maintenance_interval
: interval waktu dalam detik di mana petugas operator akan menunggu sebelum memeriksa pekerjaan yang diblokir dan dapat dibuka blokirnya. Baca selengkapnya tentang kontrol konkurensi untuk mempelajari lebih lanjut tentang pengaturan ini. Standarnya adalah 600
detik.
queues
: daftar antrian dimana pekerja akan memilih pekerjaan. Anda dapat menggunakan *
untuk menunjukkan semua antrean (yang juga merupakan default dan perilaku yang akan Anda dapatkan jika menghilangkannya). Anda dapat memberikan antrian tunggal, atau daftar antrian sebagai array. Pekerjaan akan disurvei dari antrean tersebut secara berurutan, jadi misalnya, dengan [ real_time, background ]
, tidak ada pekerjaan yang diambil dari background
kecuali tidak ada lagi pekerjaan yang menunggu di real_time
. Anda juga dapat memberikan awalan dengan wildcard untuk mencocokkan antrean yang dimulai dengan awalan. Misalnya:
staging :
workers :
- queues : staging*
threads : 3
polling_interval : 5
Ini akan membuat pekerja mengambil pekerjaan dari semua antrian yang dimulai dengan staging
. Wildcard *
hanya diperbolehkan pada dirinya sendiri atau di akhir nama antrian; Anda tidak dapat menentukan nama antrian seperti *_some_queue
. Ini akan diabaikan.
Terakhir, Anda dapat menggabungkan awalan dengan nama persis, seperti [ staging*, background ]
, dan perilaku terkait urutan akan sama dengan hanya menggunakan nama persis.
Periksa bagian di bawah tentang bagaimana perilaku urutan antrean dikombinasikan dengan prioritas, dan bagaimana cara Anda menentukan antrean per pekerja dapat memengaruhi kinerja.
threads
: ini adalah ukuran maksimal kumpulan thread yang harus dimiliki setiap pekerja untuk menjalankan pekerjaan. Setiap pekerja akan mengambil jumlah pekerjaan ini paling banyak dari antreannya dan akan mempostingnya ke kumpulan thread untuk dijalankan. Secara default, ini adalah 3
. Hanya pekerja yang memiliki pengaturan ini.
processes
: ini adalah jumlah proses pekerja yang akan dicabangkan oleh supervisor dengan pengaturan yang diberikan. Secara default, ini adalah 1
, hanya satu proses. Pengaturan ini berguna jika Anda ingin mendedikasikan lebih dari satu inti CPU ke antrian atau antrian dengan konfigurasi yang sama. Hanya pekerja yang memiliki pengaturan ini.
concurrency_maintenance
: apakah petugas operator akan melakukan pekerjaan pemeliharaan konkurensi. Hal ini true
secara default, dan berguna jika Anda tidak menggunakan kontrol konkurensi apa pun dan ingin menonaktifkannya atau jika Anda menjalankan beberapa operator dan ingin beberapa di antaranya hanya mengirimkan tugas tanpa melakukan hal lain.
Seperti disebutkan di atas, jika Anda menentukan daftar antrean untuk seorang pekerja, antrean tersebut akan disurvei sesuai urutan yang diberikan, misalnya untuk daftar real_time,background
, tidak ada pekerjaan yang akan diambil dari background
kecuali tidak ada lagi pekerjaan yang menunggu di real_time
.
Pekerjaan Aktif juga mendukung prioritas bilangan bulat positif ketika mengantri pekerjaan. Pada Solid Queue, semakin kecil nilainya, semakin tinggi prioritasnya. Standarnya adalah 0
.
Ini berguna ketika Anda menjalankan pekerjaan dengan kepentingan atau urgensi berbeda dalam antrean yang sama. Dalam antrian yang sama, pekerjaan akan dipilih berdasarkan prioritas, tetapi dalam daftar antrian, urutan antrian akan diutamakan, jadi pada contoh sebelumnya dengan real_time,background
, pekerjaan di antrian real_time
akan dipilih sebelum pekerjaan di background
antrian, meskipun antrian di background
mempunyai prioritas yang lebih tinggi (nilai lebih kecil).
Kami menyarankan untuk tidak mencampurkan urutan antrean dengan prioritas, melainkan memilih salah satu, karena hal ini akan membuat urutan pelaksanaan pekerjaan lebih mudah bagi Anda.
Untuk menjaga kinerja jajak pendapat dan memastikan indeks penutup selalu digunakan, Solid Queue hanya melakukan dua jenis kueri jajak pendapat:
-- No filtering by queue
SELECT job_id
FROM solid_queue_ready_executions
ORDER BY priority ASC , job_id ASC
LIMIT ?
FOR UPDATE SKIP LOCKED;
-- Filtering by a single queue
SELECT job_id
FROM solid_queue_ready_executions
WHERE queue_name = ?
ORDER BY priority ASC , job_id ASC
LIMIT ?
FOR UPDATE SKIP LOCKED;
Yang pertama (tidak ada pemfilteran berdasarkan antrian) digunakan saat Anda menentukan
queues : *
dan tidak ada antrian yang dijeda, karena kami ingin menargetkan semua antrian.
Dalam kasus lain, kita perlu memiliki daftar antrean untuk difilter, secara berurutan, karena kita hanya dapat memfilter berdasarkan satu antrean dalam satu waktu untuk memastikan kita menggunakan indeks untuk mengurutkan. Ini berarti jika Anda menentukan antrian Anda sebagai:
queues : beta*
kita harus mendapatkan daftar semua antrean yang cocok dengan awalan tersebut terlebih dahulu, dengan kueri yang akan terlihat seperti ini:
SELECT DISTINCT (queue_name)
FROM solid_queue_ready_executions
WHERE queue_name LIKE ' beta% ' ;
Jenis kueri DISTINCT
pada kolom yang merupakan kolom paling kiri dalam indeks dapat dilakukan dengan sangat cepat di MySQL berkat teknik yang disebut Loose Index Scan. Namun PostgreSQL dan SQLite tidak menerapkan teknik ini, yang berarti jika tabel solid_queue_ready_executions
Anda sangat besar karena antrian Anda menjadi sangat dalam, kueri ini akan menjadi lambat. Biasanya tabel solid_queue_ready_executions
Anda berukuran kecil, tetapi itu bisa terjadi.
Sama halnya dengan penggunaan awalan, hal yang sama akan terjadi jika Anda menjeda antrean, karena kita perlu mendapatkan daftar semua antrean dengan kueri seperti
SELECT DISTINCT (queue_name)
FROM solid_queue_ready_executions
lalu hapus yang dijeda. Jeda secara umum harus menjadi sesuatu yang jarang terjadi, digunakan dalam keadaan khusus, dan untuk jangka waktu singkat. Jika Anda tidak ingin lagi memproses pekerjaan dari antrean, cara terbaik untuk melakukannya adalah dengan menghapusnya dari daftar antrean Anda.
Singkatnya, jika Anda ingin memastikan kinerja optimal pada jajak pendapat , cara terbaik untuk melakukannya adalah dengan selalu menentukan nama yang tepat untuk jajak pendapat tersebut, dan tidak menjeda antrean apa pun.
Lakukan ini:
queues : background, backend
alih-alih ini:
queues : back*
Pekerja di Antrean Padat menggunakan kumpulan thread untuk menjalankan pekerjaan di beberapa thread, dapat dikonfigurasi melalui parameter threads
di atas. Selain itu, paralelisme dapat dicapai melalui beberapa proses pada satu mesin (dapat dikonfigurasi melalui pekerja berbeda atau parameter processes
di atas) atau dengan penskalaan horizontal.
Supervisor bertanggung jawab mengelola proses-proses ini, dan merespons sinyal-sinyal berikut:
TERM
, INT
: memulai penghentian dengan baik. Supervisor akan mengirimkan sinyal TERM
ke proses yang diawasinya, dan akan menunggu hingga waktu SolidQueue.shutdown_timeout
hingga selesai. Jika masih ada proses yang diawasi pada saat itu, maka proses tersebut akan mengirimkan sinyal QUIT
untuk menunjukkan bahwa proses tersebut harus keluar.QUIT
: memulai penghentian segera. Supervisor akan mengirimkan sinyal QUIT
ke proses yang diawasinya, menyebabkan proses tersebut segera keluar. Saat menerima sinyal QUIT
, jika pekerja masih memiliki pekerjaan dalam penerbangan, pekerjaan tersebut akan dikembalikan ke antrian ketika proses dibatalkan pendaftarannya.
Jika proses tidak memiliki kesempatan untuk dibersihkan sebelum keluar (misalnya jika seseorang menarik kabel di suatu tempat), pekerjaan dalam penerbangan mungkin tetap diklaim oleh proses yang menjalankannya. Proses mengirimkan detak jantung, dan supervisor memeriksa serta memangkas proses dengan detak jantung yang sudah kadaluwarsa, yang akan melepaskan pekerjaan yang diklaim kembali ke antreannya. Anda dapat mengonfigurasi frekuensi detak jantung dan ambang batas untuk menganggap proses mati. Lihat bagian di bawah untuk ini.
Anda dapat mengkonfigurasi database yang digunakan oleh Solid Queue melalui opsi config.solid_queue.connects_to
di file konfigurasi config/application.rb
atau config/environments/production.rb
. Secara default, satu database digunakan untuk menulis dan membaca yang disebut queue
untuk mencocokkan konfigurasi database yang Anda atur saat instalasi.
Semua opsi yang tersedia pada Rekaman Aktif untuk beberapa database dapat digunakan di sini.
Dalam antrian Solid, Anda dapat terhubung ke dua titik berbeda dalam kehidupan supervisor:
start
: setelah supervisor selesai melakukan booting dan tepat sebelum memisahkan pekerja dan petugas operator.stop
: setelah menerima sinyal ( TERM
, INT
atau QUIT
) dan tepat sebelum memulai penghentian dengan baik atau langsung.Dan menjadi dua titik berbeda dalam kehidupan seorang pekerja:
worker_start
: setelah pekerja selesai booting dan tepat sebelum memulai loop polling.worker_stop
: setelah menerima sinyal ( TERM
, INT
atau QUIT
) dan tepat sebelum memulai penutupan dengan baik atau langsung (yaitu exit!
).Anda dapat menggunakan metode berikut dengan blok untuk melakukan ini:
SolidQueue . on_start
SolidQueue . on_stop
SolidQueue . on_worker_start
SolidQueue . on_worker_stop
Misalnya:
SolidQueue . on_start { start_metrics_server }
SolidQueue . on_stop { stop_metrics_server }
Ini dapat dipanggil beberapa kali untuk menambahkan beberapa kait, tetapi ini harus dilakukan sebelum Antrean Padat dimulai. Inisialisasi akan menjadi tempat yang baik untuk melakukan ini.
Catatan : Pengaturan di bagian ini harus diatur di config/application.rb
atau konfigurasi lingkungan Anda seperti ini: config.solid_queue.silence_polling = true
Ada beberapa pengaturan yang mengontrol cara kerja Solid Queue yang dapat Anda atur juga:
logger
: logger yang Anda ingin gunakan Solid Queue. Defaultnya adalah pencatat aplikasi.
app_executor
: pelaksana Rails yang digunakan untuk menggabungkan operasi asinkron, defaultnya adalah pelaksana aplikasi
on_thread_error
: lambda/Proc khusus untuk dipanggil ketika ada kesalahan dalam thread Antrean Padat yang menjadikan pengecualian sebagai argumen. Defaultnya adalah
-> ( exception ) { Rails . error . report ( exception , handled : false ) }
Ini tidak digunakan untuk kesalahan yang muncul dalam pelaksanaan pekerjaan . Kesalahan yang terjadi dalam pekerjaan ditangani oleh retry_on
atau discard_on
Pekerjaan Aktif, dan pada akhirnya akan mengakibatkan pekerjaan gagal. Ini untuk kesalahan yang terjadi dalam Solid Queue itu sendiri.
use_skip_locked
: apakah akan menggunakan FOR UPDATE SKIP LOCKED
saat melakukan pembacaan penguncian. Ini akan terdeteksi secara otomatis di masa mendatang, dan untuk saat ini, Anda hanya perlu menyetelnya ke false
jika database Anda tidak mendukungnya. Untuk MySQL, versinya <8, dan untuk PostgreSQL, versi <9.5. Jika Anda menggunakan SQLite, ini tidak berpengaruh, karena penulisannya berurutan.
process_heartbeat_interval
: interval detak jantung yang akan diikuti oleh semua proses—defaultnya adalah 60 detik.
process_alive_threshold
: berapa lama menunggu hingga suatu proses dianggap mati setelah detak jantung terakhirnya—defaultnya adalah 5 menit.
shutdown_timeout
: waktu menunggu supervisor sejak mengirimkan sinyal TERM
ke proses yang diawasinya sebelum mengirimkan versi QUIT
kepada mereka untuk meminta penghentian segera—defaultnya adalah 5 detik.
silence_polling
: apakah akan membungkam log Rekaman Aktif yang dikeluarkan saat melakukan polling untuk pekerja dan petugas operator—defaultnya adalah true
.
supervisor_pidfile
: jalur ke pidfile yang akan dibuat supervisor saat booting untuk mencegah menjalankan lebih dari satu supervisor di host yang sama, atau jika Anda ingin menggunakannya untuk pemeriksaan kesehatan. Ini nil
secara default.
preserve_finished_jobs
: apakah akan menyimpan pekerjaan yang sudah selesai di tabel solid_queue_jobs
—defaultnya adalah true
.
clear_finished_jobs_after
: periode untuk menyimpan pekerjaan yang sudah selesai, jika preserve_finished_jobs
benar—defaultnya adalah 1 hari. Catatan: Saat ini, tidak ada pembersihan otomatis pada pekerjaan yang sudah selesai. Anda harus melakukan ini dengan memanggil SolidQueue::Job.clear_finished_in_batches
secara berkala, tetapi ini akan terjadi secara otomatis dalam waktu dekat.
default_concurrency_control_period
: nilai yang akan digunakan sebagai default untuk parameter duration
dalam kontrol konkurensi. Defaultnya adalah 3 menit.
Solid Queue akan memunculkan SolidQueue::Job::EnqueueError
untuk setiap kesalahan Rekaman Aktif yang terjadi saat mengantri pekerjaan. Alasan tidak menaikkan ActiveJob::EnqueueError
adalah karena tugas ini ditangani oleh Pekerjaan Aktif, menyebabkan perform_later
mengembalikan false
dan menyetel job.enqueue_error
, sehingga pekerjaan tersebut dimasukkan ke dalam blok yang harus Anda teruskan ke perform_later
. Ini bekerja sangat baik untuk pekerjaan Anda sendiri, tetapi membuat kegagalan menjadi sangat sulit untuk ditangani untuk pekerjaan yang dilakukan oleh Rails atau permata lainnya, seperti Turbo::Streams::BroadcastJob
atau ActiveStorage::AnalyzeJob
, karena Anda tidak mengontrol panggilan ke perform_later
dalam kasus tersebut.
Dalam kasus tugas berulang, jika kesalahan tersebut muncul saat memasukkan pekerjaan yang sesuai dengan tugas tersebut ke dalam antrean, kesalahan tersebut akan ditangani dan dicatat tetapi tidak akan muncul.
Antrian Padat memperluas Pekerjaan Aktif dengan kontrol konkurensi, yang memungkinkan Anda membatasi berapa banyak pekerjaan dengan tipe tertentu atau dengan argumen tertentu yang dapat dijalankan pada waktu yang sama. Jika dibatasi dengan cara ini, tugas akan diblokir agar tidak berjalan, dan tugas tersebut akan tetap diblokir hingga tugas lain selesai dan membuka blokirnya, atau setelah waktu kedaluwarsa yang ditetapkan ( durasi batas konkurensi) terlewati. Pekerjaan tidak pernah dibuang atau hilang, hanya diblokir.
class MyJob < ApplicationJob
limits_concurrency to : max_concurrent_executions , key : -> ( arg1 , arg2 , ** ) { ... } , duration : max_interval_to_guarantee_concurrency_limit , group : concurrency_group
# ...
key
adalah satu-satunya parameter yang diperlukan, dan dapat berupa simbol, string, atau proc yang menerima argumen pekerjaan sebagai parameter dan akan digunakan untuk mengidentifikasi pekerjaan yang perlu dibatasi bersama-sama. Jika proc mengembalikan rekaman Rekaman Aktif, kunci akan dibuat dari nama kelas dan id
.to
adalah 1
secara default.duration
diatur ke SolidQueue.default_concurrency_control_period
secara default, yang defaultnya adalah 3 minutes
, tetapi Anda juga dapat mengonfigurasinya.group
digunakan untuk mengontrol konkurensi kelas pekerjaan yang berbeda secara bersamaan. Defaultnya adalah nama kelas pekerjaan. Ketika suatu pekerjaan mencakup kontrol-kontrol ini, kami akan memastikan bahwa, paling banyak, jumlah pekerjaan (ditunjukkan to
) yang menghasilkan key
yang sama akan dilakukan secara bersamaan, dan jaminan ini akan berlaku selama duration
setiap pekerjaan yang masuk dalam antrean. Perhatikan bahwa tidak ada jaminan tentang urutan eksekusi , hanya tentang pekerjaan yang dilakukan pada waktu yang sama (tumpang tindih).
Batas konkurensi menggunakan konsep semaphore saat melakukan enqueuing, dan berfungsi sebagai berikut: saat tugas berada dalam antrean, kami memeriksa apakah tugas tersebut menentukan kontrol konkurensi. Jika ya, kami memeriksa semafor untuk kunci konkurensi yang dihitung. Jika semaphore terbuka, kami mengklaimnya dan menetapkan pekerjaan sebagai ready . Siap berarti dapat diambil oleh pekerja untuk dieksekusi. Ketika pekerjaan selesai dijalankan (baik berhasil atau tidak, mengakibatkan eksekusi gagal), kami memberi sinyal semaphore dan mencoba membuka blokir pekerjaan berikutnya dengan kunci yang sama, jika ada. Membuka blokir pekerjaan berikutnya tidak berarti langsung menjalankan pekerjaan itu, namun memindahkannya dari diblokir ke siap . Karena sesuatu dapat terjadi yang mencegah pekerjaan pertama melepaskan semaphore dan membuka blokir pekerjaan berikutnya (misalnya, seseorang mencabut steker di mesin tempat pekerja tersebut berjalan), kita mempunyai duration
sebagai pengaman kegagalan. Pekerjaan yang telah diblokir selama lebih dari durasi adalah kandidat yang akan dilepaskan, namun hanya sebanyak pekerjaan yang diizinkan oleh aturan konkurensi, karena masing-masing pekerjaan harus melalui pemeriksaan tarian semaphore. Artinya, duration
sebenarnya bukan tentang pekerjaan yang sedang dalam antrean atau sedang dijalankan, melainkan tentang pekerjaan yang diblokir menunggu.
Misalnya:
class DeliverAnnouncementToContactJob < ApplicationJob
limits_concurrency to : 2 , key : -> ( contact ) { contact . account } , duration : 5 . minutes
def perform ( contact )
# ...
Dimana contact
dan account
adalah catatan ActiveRecord
. Dalam hal ini, kami akan memastikan bahwa paling banyak dua pekerjaan sejenis DeliverAnnouncementToContact
untuk akun yang sama akan berjalan secara bersamaan. Jika, karena alasan apa pun, salah satu tugas tersebut memerlukan waktu lebih dari 5 menit atau tidak melepaskan kunci konkurensinya (menandakan semaphore) dalam waktu 5 menit setelah memperolehnya, tugas baru dengan kunci yang sama mungkin mendapatkan kunci tersebut.
Mari kita lihat contoh lain menggunakan group
:
class Box :: MovePostingsByContactToDesignatedBoxJob < ApplicationJob
limits_concurrency key : -> ( contact ) { contact } , duration : 15 . minutes , group : "ContactActions"
def perform ( contact )
# ...
class Bundle :: RebundlePostingsJob < ApplicationJob
limits_concurrency key : -> ( bundle ) { bundle . contact } , duration : 15 . minutes , group : "ContactActions"
def perform ( bundle )
# ...
Dalam hal ini, jika kita memiliki pekerjaan Box::MovePostingsByContactToDesignatedBoxJob
yang diantrekan untuk catatan kontak dengan id 123
dan Bundle::RebundlePostingsJob
yang diantri secara bersamaan untuk catatan bundel yang mereferensikan contact 123
, hanya satu dari mereka yang akan diizinkan untuk melanjutkan. Yang lainnya akan tetap diblokir sampai yang pertama selesai (atau 15 menit berlalu, apa pun yang terjadi terlebih dahulu).
Perhatikan bahwa pengaturan duration
bergantung secara tidak langsung pada nilai concurrency_maintenance_interval
yang Anda tetapkan untuk operator Anda, karena itulah frekuensi pemeriksaan dan pemblokiran pekerjaan yang diblokir. Secara umum, Anda harus menetapkan duration
sedemikian rupa sehingga semua pekerjaan Anda akan selesai dengan baik di bawah durasi tersebut dan menganggap tugas pemeliharaan konkurensi sebagai tindakan yang aman jika terjadi kesalahan.
Pekerjaan dibuka blokirnya berdasarkan prioritas tetapi urutan antrean tidak diperhitungkan untuk membuka blokir pekerjaan. Artinya, jika Anda memiliki sekelompok pekerjaan yang berbagi grup konkurensi namun berada dalam antrean yang berbeda, atau pekerjaan dari kelas yang sama yang Anda antri dalam antrean berbeda, urutan antrean yang Anda tetapkan untuk seorang pekerja tidak diperhitungkan saat membuka blokir yang diblokir yang. Alasannya adalah bahwa pekerjaan yang berjalan akan membuka blokir pekerjaan berikutnya, dan pekerjaan itu sendiri tidak mengetahui urutan antrian pekerja tertentu (Anda bahkan dapat memiliki pekerja yang berbeda dengan urutan antrian yang berbeda), ia hanya dapat mengetahui tentang prioritas. Setelah pekerjaan yang diblokir dibuka blokirnya dan tersedia untuk pemungutan suara, pekerjaan tersebut akan diambil oleh pekerja mengikuti urutan antreannya.
Terakhir, tugas yang gagal yang dicoba ulang secara otomatis atau manual berfungsi dengan cara yang sama seperti tugas baru yang masuk dalam antrean: tugas tersebut masuk dalam antrean untuk mendapatkan semaphore terbuka, dan setiap kali mendapatkannya, tugas tersebut akan dijalankan. Tidak masalah jika mereka sudah mendapatkan semaphore terbuka di masa lalu.
Antrian Padat tidak menyertakan mekanisme coba ulang otomatis apa pun, ini bergantung pada Pekerjaan Aktif untuk ini. Pekerjaan yang gagal akan disimpan dalam sistem, dan eksekusi yang gagal (catatan dalam tabel solid_queue_failed_executions
) akan dibuat untuk tugas tersebut. Pekerjaan akan tetap di sana sampai dibuang secara manual atau dimasukkan kembali ke dalam antrean. Anda dapat melakukan ini di konsol sebagai:
failed_execution = SolidQueue :: FailedExecution . find ( ... ) # Find the failed execution related to your job
failed_execution . error # inspect the error
failed_execution . retry # This will re-enqueue the job as if it was enqueued for the first time
failed_execution . discard # This will delete the job from the system
Namun, kami menyarankan untuk melihat misi_kontrol-pekerjaan, sebuah dasbor tempat, antara lain, Anda dapat memeriksa dan mencoba kembali/membuang pekerjaan yang gagal.
Beberapa layanan pelacakan kesalahan yang terintegrasi dengan Rails, seperti Sentry atau Rollbar, terhubung ke Pekerjaan Aktif dan secara otomatis melaporkan kesalahan yang tidak tertangani yang terjadi selama pelaksanaan pekerjaan. Namun, jika sistem pelacakan kesalahan Anda tidak menyediakannya, atau jika Anda memerlukan pelaporan khusus, Anda dapat menghubungkan sendiri ke Pekerjaan Aktif. Cara yang mungkin untuk melakukan hal ini adalah:
# application_job.rb
class ApplicationJob < ActiveJob :: Base
rescue_from ( Exception ) do | exception |
Rails . error . report ( exception )
raise exception
end
end
Perhatikan bahwa, Anda juga harus menduplikasi logika di atas pada ActionMailer::MailDeliveryJob
. Itu karena ActionMailer
tidak mewarisi dari ApplicationJob
melainkan menggunakan ActionMailer::MailDeliveryJob
yang mewarisi dari ActiveJob::Base
.
# application_mailer.rb
class ApplicationMailer < ActionMailer :: Base
ActionMailer :: MailDeliveryJob . rescue_from ( Exception ) do | exception |
Rails . error . report ( exception )
raise exception
end
end
Kami menyediakan plugin Puma jika Anda ingin menjalankan supervisor Solid Queue bersama Puma dan meminta Puma memantau dan mengelolanya. Anda hanya perlu menambahkan
plugin :solid_queue
ke konfigurasi puma.rb
Anda.
Karena ini bisa sangat rumit dan banyak orang tidak perlu khawatir tentang hal ini, secara default Antrean Padat dikonfigurasi dalam database yang berbeda dengan aplikasi utama, antrian pekerjaan ditunda hingga transaksi yang sedang berlangsung dilakukan berkat bawaan Pekerjaan Aktif. kemampuan untuk melakukan hal ini. Artinya, meskipun Anda menjalankan Solid Queue di DB yang sama dengan aplikasi Anda, Anda tidak akan memanfaatkan integritas transaksional ini.
Jika Anda memilih untuk mengubahnya, Anda dapat mengatur config.active_job.enqueue_after_transaction_commit
menjadi never
. Anda juga dapat mengaturnya berdasarkan per pekerjaan.
Jika Anda menyetelnya ke never
namun tetap ingin memastikan bahwa Anda tidak secara tidak sengaja berada pada integritas transaksional, Anda dapat memastikan bahwa:
Pekerjaan Anda yang mengandalkan data tertentu selalu dimasukkan dalam antrean pada panggilan balik after_commit
atau dari tempat di mana Anda yakin bahwa data apa pun yang akan digunakan pekerjaan tersebut telah dimasukkan ke dalam database sebelum pekerjaan tersebut dimasukkan dalam antrean.
Atau, Anda mengonfigurasi database yang berbeda untuk Antrean Padat, meskipun sama dengan aplikasi Anda, memastikan bahwa koneksi berbeda pada thread yang menangani permintaan atau menjalankan tugas untuk aplikasi Anda akan digunakan untuk mengantrekan tugas. Misalnya:
class ApplicationRecord < ActiveRecord :: Base
self . abstract_class = true
connects_to database : { writing : :primary , reading : :replica }
config . solid_queue . connects_to = { database : { writing : :primary , reading : :replica } }
Solid Queue mendukung pendefinisian tugas berulang yang dijalankan pada waktu tertentu di masa depan, secara teratur seperti pekerjaan cron. Ini dikelola oleh proses penjadwal dan ditentukan dalam file konfigurasinya sendiri. Secara default, file terletak di config/recurring.yml
, tetapi Anda dapat mengatur jalur berbeda menggunakan variabel lingkungan SOLID_QUEUE_RECURRING_SCHEDULE
atau dengan menggunakan opsi --recurring_schedule_file
dengan bin/jobs
, seperti ini:
bin/jobs --recurring_schedule_file=config/schedule.yml
Konfigurasinya sendiri terlihat seperti ini:
production :
a_periodic_job :
class : MyJob
args : [ 42, { status: "custom_status" } ]
schedule : every second
a_cleanup_task :
command : " DeletedStuff.clear_all "
schedule : every day at 9am
Tugas ditentukan sebagai hash/kamus, yang mana kuncinya akan menjadi kunci tugas secara internal. Setiap tugas harus memiliki class
, yang akan menjadi kelas pekerjaan yang akan diantrekan, atau sebuah command
, yang akan dievaluasi dalam konteks pekerjaan ( SolidQueue::RecurringJob
) yang akan dimasukkan dalam antrean sesuai dengan jadwalnya, di antrian solid_queue_recurring
.
Setiap tugas juga perlu memiliki jadwal, yang diurai menggunakan Fugit, sehingga menerima apa pun yang diterima Fugit sebagai cron. Secara opsional, Anda dapat menyediakan yang berikut ini untuk setiap tugas:
args
: argumen yang akan diteruskan ke pekerjaan, sebagai argumen tunggal, hash, atau array argumen yang juga dapat menyertakan kwargs sebagai elemen terakhir dalam array.Pekerjaan dalam contoh konfigurasi di atas akan dimasukkan dalam antrean setiap detik sebagai:
MyJob . perform_later ( 42 , status : "custom_status" )
queue
: antrian berbeda yang akan digunakan saat mengantri pekerjaan. Jika tidak ada, antrian disiapkan untuk kelas pekerjaan.
priority
: nilai prioritas numerik yang akan digunakan saat mengantrekan pekerjaan.
Tugas dimasukkan dalam antrean pada waktunya masing-masing oleh penjadwal, dan setiap tugas menjadwalkan tugas berikutnya. Ini cukup terinspirasi oleh apa yang dilakukan GoodJob.
Dimungkinkan untuk menjalankan beberapa penjadwal dengan konfigurasi recurring_tasks
yang sama, misalnya, jika Anda memiliki beberapa server untuk redundansi, dan Anda menjalankan scheduler
di lebih dari satu server tersebut. Untuk menghindari enqueuing tugas duplikat pada saat yang sama, sebuah entri dalam tabel solid_queue_recurring_executions
baru dibuat dalam transaksi yang sama dengan pekerjaan yang dimasukkan ke dalam antrean. Tabel ini memiliki indeks unik pada task_key
dan run_at
, memastikan hanya satu entri per tugas per waktu yang akan dibuat. Ini hanya berfungsi jika Anda telah preserve_finished_jobs
disetel ke true
(default), dan jaminan berlaku selama Anda tetap mempertahankan pekerjaan tersebut.
Catatan : satu jadwal berulang didukung, sehingga Anda dapat memiliki beberapa penjadwal menggunakan jadwal yang sama, namun tidak beberapa penjadwal yang menggunakan konfigurasi berbeda.
Terakhir, dimungkinkan untuk mengonfigurasi pekerjaan yang tidak ditangani oleh Solid Queue. Artinya, Anda dapat memiliki pekerjaan seperti ini di aplikasi Anda:
class MyResqueJob < ApplicationJob
self . queue_adapter = :resque
def perform ( arg )
# ..
end
end
Anda masih dapat mengonfigurasi ini di Antrean Padat:
my_periodic_resque_job :
class : MyResqueJob
args : 22
schedule : " */5 * * * * "
dan pekerjaan akan dimasukkan ke dalam antrean melalui perform_later
sehingga akan dijalankan di Resque. Namun, dalam hal ini kami tidak akan melacak rekaman solid_queue_recurring_execution
untuk tugas tersebut dan tidak akan ada jaminan bahwa pekerjaan hanya dimasukkan ke dalam antrean sekali setiap kali.
Solid Queue terinspirasi oleh resque dan GoodJob. Kami merekomendasikan untuk memeriksa proyek-proyek ini karena ini adalah contoh bagus yang telah kami pelajari banyak.
Permata ini tersedia sebagai sumber terbuka berdasarkan ketentuan Lisensi MIT.