Tingkatkan antrian Anda dengan front end profesional:
Dapatkan gambaran lengkap tentang semua antrean Anda.
Periksa pekerjaan, cari, coba lagi, atau promosikan pekerjaan yang tertunda.
Metrik dan statistik.
dan masih banyak lagi fitur lainnya.
Daftar di Taskforce.sh
Penggunaan CPU minimal karena desain bebas polling.
Desain kokoh berdasarkan Redis.
Pekerjaan tertunda.
Jadwalkan dan ulangi pekerjaan sesuai dengan spesifikasi cron.
Pembatas tarif untuk pekerjaan.
Percobaan ulang.
Prioritas.
Konkurensi.
Jeda/lanjutkan—secara global atau lokal.
Beberapa jenis pekerjaan per antrian.
Fungsi pemrosesan berulir (kotak pasir).
Pemulihan otomatis dari proses macet.
Dan muncul di peta jalan...
Pengakuan penyelesaian pekerjaan (sementara itu Anda dapat menggunakan pola antrian pesan).
Hubungan pekerjaan orang tua-anak.
Ada beberapa UI pihak ketiga yang dapat Anda gunakan untuk pemantauan:
BantengMQ
Gugus Tugas
Banteng v3
Gugus Tugas
papan banteng
balasan banteng
monitor banteng
Monitor
Banteng <= v2
Matador
bereaksi-banteng
Toureiro
Dengan Eksportir Antrian Prometheus Bull
Karena ada beberapa solusi antrian pekerjaan, berikut adalah tabel yang membandingkannya:
Dragonfly adalah pengganti drop-in Redis™ baru yang sepenuhnya kompatibel dengan BullMQ dan menghadirkan beberapa keunggulan penting dibandingkan Redis™ seperti kinerja yang jauh lebih baik dengan memanfaatkan semua inti CPU yang tersedia dan struktur data yang lebih cepat dan lebih hemat memori. Baca selengkapnya di sini tentang cara menggunakannya dengan BullMQ. | |
Jika Anda memerlukan instance Redis produksi berkualitas tinggi untuk proyek Bull Anda, harap pertimbangkan untuk berlangganan Memetria untuk Redis, pemimpin dalam hosting Redis yang bekerja sempurna dengan BullMQ. Gunakan kode promo "BULLMQ" saat mendaftar untuk membantu kami mensponsori pengembangan BullMQ! |
Fitur | BullMQ-Pro | BantengMQ | Banteng | Kue | Lebah | Agenda |
---|---|---|---|---|---|---|
Bagian belakang | redis | redis | redis | redis | redis | mongo |
Dapat diamati | ✓ | |||||
Batas Tarif Grup | ✓ | |||||
Dukungan Grup | ✓ | |||||
Dukungan Batch | ✓ | |||||
Ketergantungan Orang Tua/Anak | ✓ | ✓ | ||||
Prioritas | ✓ | ✓ | ✓ | ✓ | ✓ | |
Konkurensi | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
Pekerjaan tertunda | ✓ | ✓ | ✓ | ✓ | ✓ | |
Peristiwa global | ✓ | ✓ | ✓ | ✓ | ||
Pembatas Nilai | ✓ | ✓ | ✓ | |||
Jeda/Lanjutkan | ✓ | ✓ | ✓ | ✓ | ||
Pekerja dalam kotak pasir | ✓ | ✓ | ✓ | |||
Pekerjaan yang dapat diulang | ✓ | ✓ | ✓ | ✓ | ||
Operasi atom | ✓ | ✓ | ✓ | ✓ | ||
Kegigihan | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
UI | ✓ | ✓ | ✓ | ✓ | ✓ | |
Dioptimalkan untuk | Pekerjaan / Pesan | Pekerjaan / Pesan | Pekerjaan / Pesan | Pekerjaan | Pesan | Pekerjaan |
npm instal banteng --simpan
atau
benang tambahkan banteng
Persyaratan: Bull memerlukan versi Redis yang lebih besar atau sama dengan 2.8.18
.
npm install @types/bull --save-dev
benang tambahkan --dev @types/bull
Definisi saat ini disimpan di repo PastiTyped.
Kami menyambut semua jenis kontribusi, baik perbaikan kode, fitur baru, atau peningkatan dokumen. Pemformatan kode diterapkan dengan lebih cantik. Untuk komitmen, harap ikuti konvensi komitmen konvensional. Semua kode harus melewati aturan lint dan rangkaian pengujian sebelum dapat digabungkan ke dalam pengembangan.
const Antrian = memerlukan('bull');const videoQueue = Antrian baru('transcoding video', 'redis://127.0.0.1:6379');const audioQueue = Antrian baru('transcoding audio', { redis: { port : 6379, host: '127.0.0.1', kata sandi: 'foobared' } }); // Tentukan koneksi Redis menggunakan objectconst imageQueue = new Queue('image transcoding');const pdfQueue = new Queue('pdf transcoding');videoQueue.process(function (pekerjaan, selesai) { // job.data berisi data khusus yang diteruskan saat tugas dibuat // pekerjaan.id berisi id pekerjaan ini. // mentranskode video secara asinkron dan melaporkan kemajuannya pekerjaan.kemajuan(42); // panggilan selesai setelah selesai Selesai(); // atau memberikan error jika error selesai(Kesalahan baru('kesalahan transcoding')); // atau berikan hasilnya selesai(null, { framerate: 29,5 /* dst... */ }); // Jika tugas memunculkan pengecualian yang tidak tertangani, maka tugas tersebut juga ditangani dengan benar throw new Error('beberapa kesalahan tak terduga');});audioQueue.process(function (pekerjaan, selesai) { // mentranskode audio secara asinkron dan melaporkan kemajuannya pekerjaan.kemajuan(42); // panggilan selesai setelah selesai Selesai(); // atau memberikan error jika error selesai(Kesalahan baru('kesalahan transcoding')); // atau berikan hasilnya selesai(null, { samplerate: 48000 /* dst... */ }); // Jika tugas memunculkan pengecualian yang tidak tertangani, maka tugas tersebut juga ditangani dengan benar throw new Error('beberapa kesalahan tak terduga');});imageQueue.process(function (pekerjaan, selesai) { // mentranskode gambar secara asinkron dan melaporkan kemajuannya pekerjaan.kemajuan(42); // panggilan selesai setelah selesai Selesai(); // atau memberikan error jika error selesai(Kesalahan baru('kesalahan transcoding')); // atau berikan hasilnya selesai(null, { lebar: 1280, tinggi: 720 /* dst... */ }); // Jika tugas memunculkan pengecualian yang tidak tertangani, maka tugas tersebut juga ditangani dengan benar throw new Error('beberapa kesalahan tak terduga');});pdfQueue.process(function (pekerjaan) { // Prosesor juga dapat mengembalikan janji daripada menggunakan callback yang sudah selesai return pdfAsyncProcessor();});videoQueue.add({ video: 'http://example.com/video1.mov' });audioQueue.add({ audio: 'http://example.com/audio1.mp3 ' });imageQueue.add({ gambar: 'http://example.com/image1.tiff' });
Alternatifnya, Anda dapat mengembalikan janji alih-alih menggunakan panggilan balik done
:
videoQueue.process(function (job) {// jangan lupa untuk menghapus callback yang sudah selesai! // Cukup kembalikan janji kembali ambilVideo(pekerjaan.data.url).lalu(transcodeVideo); // Menangani penolakan janji return Promise.reject(Kesalahan baru('kesalahan transcoding')); // Melewati nilai janji yang diselesaikan ke acara "selesai". return Promise.resolve({ framerate: 29,5 /* dll... */ }); // Jika tugas memunculkan pengecualian yang tidak tertangani, maka tugas tersebut juga ditangani dengan benar melempar Kesalahan baru('beberapa kesalahan tak terduga'); // sama dengan return Promise.reject(Kesalahan baru('beberapa kesalahan tak terduga'));});
Fungsi proses juga dapat dijalankan dalam proses terpisah. Ini mempunyai beberapa keuntungan:
Prosesnya di-sandbox jadi jika crash tidak mempengaruhi pekerja.
Anda dapat menjalankan kode pemblokiran tanpa mempengaruhi antrian (pekerjaan tidak akan terhenti).
Pemanfaatan CPU multi-core yang jauh lebih baik.
Lebih sedikit koneksi ke redis.
Untuk menggunakan fitur ini cukup buat file terpisah dengan prosesor:
// processor.jsmodule.exports = fungsi (pekerjaan) { // Lakukan pekerjaan berat return Promise.resolve(hasil);}
Dan tentukan prosesor seperti ini:
// Proses tunggal:queue.process('/path/to/my/processor.js');// Anda juga dapat menggunakan konkurensi:queue.process(5, '/path/to/my/processor.js' );// dan diberi nama processors:queue.process('my processor', 5, '/path/to/my/processor.js');
Sebuah pekerjaan dapat ditambahkan ke antrian dan diproses berulang kali sesuai dengan spesifikasi cron:
pembayaranQueue.process(fungsi (pekerjaan) {// Periksa pembayaran }); // Ulangi tugas pembayaran sekali setiap hari pada pukul 3:15 (pagi) pembayaranQueue.add(pembayaranData, { ulangi: { cron: '15 3 * * *' } });
Sebagai tip, periksa ekspresi Anda di sini untuk memverifikasi kebenarannya: generator ekspresi cron
Antrean dapat dijeda dan dilanjutkan secara global (teruskan true
untuk menjeda pemrosesan hanya untuk pekerja ini):
antrian.jeda().lalu(fungsi() { // antrian dijeda sekarang});queue.resume().then(function () { // antrian dilanjutkan sekarang})
Antrian mengeluarkan beberapa peristiwa berguna, misalnya...
.on('selesai', fungsi (pekerjaan, hasil) { // Pekerjaan selesai dengan hasil keluaran!})
Untuk informasi selengkapnya tentang peristiwa, termasuk daftar lengkap peristiwa yang diaktifkan, lihat referensi Peristiwa
Antrian itu murah, jadi jika Anda membutuhkannya banyak, buat saja yang baru dengan nama berbeda:
const userJohn = Antrean baru('john');const penggunaLisa = Antrian baru('lisa');...
Namun setiap instance antrian akan memerlukan koneksi redis baru, periksa cara menggunakan kembali koneksi atau Anda juga dapat menggunakan prosesor bernama untuk mencapai hasil serupa.
CATATAN: Dari versi 3.2.0 ke atas, disarankan untuk menggunakan prosesor berulir.
Antrian bersifat kuat dan dapat dijalankan secara paralel di beberapa thread atau proses tanpa risiko bahaya atau kerusakan antrian. Periksa contoh sederhana ini menggunakan cluster untuk memparalelkan pekerjaan di seluruh proses:
const Antrean = memerlukan('banteng');const cluster = memerlukan('cluster');const numWorkers = 8;const antrian = Antrian baru('uji antrian serentak');if (cluster.isMaster) { for (misalkan i = 0; i < numWorkers; i++) {cluster.fork(); } cluster.on('online', function (pekerja) {// Mari kita buat beberapa tugas untuk pekerja antrian (misalkan i = 0; i < 500; i++) { queue.add({ foo: 'bar' }); }; }); cluster.on('exit', function (pekerja, kode, sinyal) {console.log('worker ' + pekerja.process.pid + ' mati'); });} kalau tidak { antrian.proses(fungsi (pekerjaan, pekerjaanSelesai) {console.log('Pekerjaan selesai oleh pekerja', cluster.worker.id, pekerjaan.id);jobDone(); });}
Untuk dokumentasi selengkapnya, lihat referensi dan pola umum:
Panduan — Titik awal Anda untuk berkembang bersama Bull.
Referensi — Dokumen referensi dengan semua objek dan metode yang tersedia.
Pola — sekumpulan contoh pola umum.
Lisensi — lisensi Bull—itu adalah MIT.
Jika Anda melihat sesuatu yang memerlukan lebih banyak dokumen, silakan kirimkan permintaan penarikan!
Antrian tersebut bertujuan untuk strategi kerja "setidaknya sekali". Artinya, dalam beberapa situasi, suatu pekerjaan dapat diproses lebih dari satu kali. Hal ini sebagian besar terjadi ketika seorang pekerja gagal mengunci pekerjaan tertentu selama total durasi pemrosesan.
Ketika seorang pekerja sedang memproses suatu pekerjaan, pekerjaan itu akan tetap "terkunci" sehingga pekerja lain tidak dapat memprosesnya.
Penting untuk memahami cara kerja penguncian untuk mencegah pekerjaan Anda kehilangan kuncinya - menjadi terhenti - dan akibatnya dimulai ulang. Penguncian diterapkan secara internal dengan membuat kunci untuk lockDuration
pada interval lockRenewTime
(yang biasanya setengah lockDuration
). Jika lockDuration
berlalu sebelum kunci dapat diperbarui, pekerjaan akan dianggap terhenti dan dimulai ulang secara otomatis; itu akan diproses ganda . Ini bisa terjadi ketika:
Proses Node yang menjalankan pemroses pekerjaan Anda tiba-tiba berhenti.
Pemroses pekerjaan Anda terlalu intensif CPU dan menghentikan loop peristiwa Node, dan akibatnya, Bull tidak dapat memperbarui kunci pekerjaan (lihat #488 untuk mengetahui cara kami mendeteksi hal ini dengan lebih baik). Anda dapat memperbaikinya dengan memecah pemroses pekerjaan Anda menjadi bagian-bagian yang lebih kecil sehingga tidak ada satu bagian pun yang dapat memblokir loop peristiwa Node. Alternatifnya, Anda dapat memberikan nilai yang lebih besar untuk pengaturan lockDuration
(dengan konsekuensinya akan memakan waktu lebih lama untuk mengenali pekerjaan yang benar-benar terhenti).
Oleh karena itu, Anda harus selalu mendengarkan kejadian stalled
dan mencatatnya ke sistem pemantauan kesalahan Anda, karena ini berarti pekerjaan Anda kemungkinan akan diproses ganda.
Sebagai perlindungan agar pekerjaan yang bermasalah tidak akan dimulai ulang tanpa batas waktu (misalnya jika pemroses pekerjaan selalu menghentikan proses Node-nya), pekerjaan akan dipulihkan dari keadaan terhenti maksimal maxStalledCount
kali (default: 1
).