Mesin antrian tugas yang didukung Redis dengan kontrol tugas tingkat lanjut dan konsistensi akhir.
Pengelompokan tugas, rangkaian, iterator untuk rentang yang sangat besar.
Tugas yang ditunda & dijadwalkan dijalankan.
Distribusi beban + kumpulan pekerja.
Mudah untuk disematkan.
idoit
memberikan kontrol tingkat lanjut untuk mengimplementasikannya
Pengelompokan . Tugas group
khusus melaksanakan tugas anak dan menunggu sampai semua selesai. Berguna untuk memetakan/mengurangi logika.
Rantai . Tugas chain
khusus mengeksekusi anak-anak satu per satu. Juga berguna untuk mengurangi peta atau membagi tugas yang sangat rumit menjadi langkah yang lebih sederhana.
Memetakan iterator . Fitur khusus untuk muatan besar, untuk menghasilkan potongan sesuai permintaan. Manfaat:
Tidak ada jeda pada fase pemetaan, pemrosesan bongkahan segera dimulai.
Mudah untuk mengoptimalkan kueri DB untuk membuat potongan dengan ukuran yang sama (lewati + batasi kueri sangat lambat pada data besar).
Kemajuan . Saat Anda menggunakan skenario grup/rantai/peta, mudah untuk memantau kemajuan total melalui induk teratas. Tugas mandiri yang panjang juga dapat memberi tahu pengguna tentang perubahan kemajuan.
Kolam pekerja . Anda dapat membagi tugas berdasarkan proses yang berbeda. Misalnya, jika Anda tidak ingin tugas-tugas berat menghalangi tugas-tugas ringan.
Penjadwal . Cron bawaan memungkinkan untuk menjalankan tugas pada jadwal tertentu.
Semua data di redis selalu konsisten.
Tugas tidak bisa hilang, tapi BISA dijalankan dua kali pada kasus Edge (jika proses terhenti saat fungsi tugas hampir selesai)
Kemajuan dapat dihitung "lebih cepat" jika task.progressAdd()
digunakan dan proses macet sebelum tugas selesai. Namun itu tidak penting, karena informasi tersebut hanya dapat digunakan untuk pembaruan bilah kemajuan antarmuka. Dalam kebanyakan kasus, Anda tidak akan melihat perbedaannya.
node.js
6+ dan redis
3.0+ diperlukan.
npm instal idoit --simpan
redisURL (String) - redis url koneksi.
konkurensi (Jumlah) - tugas maksimal yang akan digunakan secara paralel oleh satu pekerja, 100 secara default.
pool (String) - nama kumpulan pekerja, "default" jika tidak disetel. Digunakan jika instance antrian ini hanya menggunakan tugas (setelah .start()
). Anda dapat merutekan tugas ke kumpulan pekerja tertentu untuk menghindari penguncian yang tidak diinginkan. Anda dapat mengatur pool
ke Array, [ 'pool1', 'pool2' ]
untuk menggunakan tugas dari beberapa kumpulan (untuk tujuan pengembangan/pengujian).
ns (String) - namespace data, saat ini digunakan sebagai awalan kunci redis, "idoitqueue:" secara default.
Merupakan praktik yang baik untuk memiliki kumpulan pekerja terpisah untuk tugas-tugas pemblokiran berat dan tugas-tugas non-pemblokiran. Misalnya, tidak seorang pun boleh memblokir pengiriman email penting. Jadi, buat beberapa proses pekerja, sematkan proses tersebut ke kumpulan berbeda dan atur konkurensi tugas yang tepat. Tugas non-pemblokiran dapat dilakukan secara paralel, dan Anda dapat melakukannya dengan concurrency
default = 100. Tugas pemblokiran harus dilakukan satu per satu, setel concurrency
= 1 untuk pekerja tersebut.
Catatan. Mungkin saja Anda menghapus beberapa jenis tugas dari aplikasi Anda. Dalam hal ini data yatim piatu akan dihapus setelah 3 hari.
Pilihan:
name (String) - nama tugas.
baseClass (Fungsi) - opsional, konstruktor tugas dasar, "Tugas" secara default.
init (Fungsi) - opsional, digunakan untuk inisialisasi tugas asinkron, harus mengembalikan Promise
this (Objek) - tugas saat ini (total tugas tersedia sebagai this.total
).
taskID (Fungsi) - opsional, harus mengembalikan id tugas baru. Hanya diperlukan untuk membuat tugas "eksklusif", mengembalikan nilai acak secara default, yang disebut sebagai: function (taskData)
. Gula: jika Anda meneruskan string biasa, string itu akan dibungkus berfungsi, yang selalu mengembalikan string ini.
proses (Fungsi) - fungsi tugas utama, disebut sebagai: task.process(...args)
. Harus mengembalikan Promise
ini (Objek) - tugas saat ini.
coba lagi (Nomor) - opsional, jumlah percobaan ulang jika terjadi kesalahan, default 2.
retryDelay (Nomor) - opsional, penundaan dalam ms setelah percobaan ulang, default 60000 ms.
batas waktu (Nomor) - opsional, batas waktu eksekusi, default 120000 ms.
total (Nomor) - opsional, nilai kemajuan maksimal, default 1. Jika Anda tidak mengubah perilaku, kemajuan dimulai dengan 0 dan menjadi 1 di akhir tugas.
tundaDelay (Nomor) - opsional, jika tunda dipanggil tanpa penundaan, penundaan diasumsikan sama dengan nilai ini (dalam milidetik).
cron (String) - opsional, string cron ("15 */6 * * *"), default null.
track (Nomor) - default 3600000ms (1 jam). Saatnya mengingat tugas terjadwal dari cron untuk menghindari pengulangan jika beberapa server di cluster memiliki jam yang salah. Jangan menyetel terlalu tinggi untuk tugas yang sangat sering, karena dapat menghabiskan banyak memori.
Dapatkan tugas berdasarkan id. Mengembalikan Janji yang diselesaikan dengan tugas atau dengan null
jika tugas tidak ada.
Bidang tugas yang dapat Anda gunakan:
total - total kemajuan tugas
kemajuan - kemajuan tugas saat ini
hasil - hasil tugas
kesalahan - kesalahan tugas
Batalkan tugas. Mengembalikan Janji yang diselesaikan dengan tugas.
Catatan. Anda hanya dapat membatalkan tugas tanpa orang tua.
Mulai pekerja dan mulai konsumsi data tugas. Return Promise
, diselesaikan ketika antrian sudah siap (panggil .ready()
di dalam).
Jika pool
ditentukan di konstruktor, hanya tugas yang dirutekan ke tarikan ini yang akan digunakan.
Berhenti menerima tugas baru dari antrean. Return Promise
, diselesaikan ketika semua tugas aktif di pekerja ini selesai.
Return Promise
, diselesaikan ketika antrian siap beroperasi (setelah acara 'connect', lihat di bawah).
Perbarui opsi konstruktor, kecuali redisURL.
idoit
adalah instance EventEmitter
yang memicu beberapa peristiwa:
ready
ketika koneksi redis aktif dan perintah dapat dijalankan (tugas dapat didaftarkan tanpa koneksi)
error
ketika kesalahan telah terjadi.
task:progress
, task:progress:<task_id>
- saat pembaruan tugas berlangsung. Data acara adalah: { id, uid, total, kemajuan }
task:end
, task:end:<task_id>
- saat tugas berakhir. Data peristiwa adalah: { id, uid }
Buat Tugas baru dengan parameter opsional.
Ganti properti tugas. Misalnya, Anda mungkin ingin menetapkan tugas grup/rantai tertentu ke kumpulan lain.
Jalankan tugas segera. Mengembalikan Janji yang diselesaikan dengan id tugas.
Tunda eksekusi tugas untuk delay
milidetik (atau hingga task.postponeDelay
).
Mengembalikan Janji yang diselesaikan dengan id tugas.
Mulai ulang tugas yang sedang berjalan.
add_retry (Boolean) - opsional, apakah akan menambah jumlah percobaan ulang atau tidak (default: false)
jika true
, jumlah percobaan ulang meningkat, dan tugas tidak dimulai ulang jika melebihi
jika false
, jumlah percobaan ulang tetap sama, sehingga tugas dapat dimulai ulang sendiri tanpa batas waktu
penundaan (Angka) penundaan sebelum memulai ulang dalam milidetik (default: task.retryDelay
).
Catatan, idoit
sudah memiliki logika restart bawaan pada kesalahan tugas. Mungkin sebaiknya Anda tidak menggunakan cara ini secara langsung. Ini diungkap untuk kasus-kasus yang sangat spesifik.
Tingkatkan kemajuan tugas saat ini.
Mengembalikan Janji yang diselesaikan dengan id tugas.
Perbarui tenggat waktu tugas saat ini.
Mengembalikan Janji yang diselesaikan dengan id tugas.
Buat tugas baru dengan menjalankan anak secara paralel.
antrian.grup([ antrian.anak-anak1(), antrian.anak-anak2(), antrian.children3()]).run()
Hasil grup adalah hasil array anak yang tidak diurutkan.
Buat tugas baru, jalankan anak secara seri. Jika ada anak yang gagal - rantai juga gagal.
antrian.registerTask('kalikan', (a, b) => a * b);antrian.registerTask('kurangi', (a, b) => a - b);antrian.rantai([ antrian.multiply(2, 3), // 2 * 3 = 6 antrian.kurangi(10), // 10 - 6 = 4 antrian.multiply(3) // 3 * 4 = 12]).run()
Hasil tugas sebelumnya diteruskan sebagai argumen terakhir dari tugas berikutnya. Hasil rantai adalah hasil tugas terakhir dalam rantai.
Cara khusus untuk menjalankan pemetaan besar dengan gaya malas (sesuai permintaan). Lihat komentar di bawah.
// daftarkan iterator taskqueue.registerTask({ nama: 'malas_mapper', baseClass: Antrian.Iterator, // Metode ini dipanggil saat tugas dimulai dan di setiap akhir anak. Itu bisa saja // fungsi generator atau fungsi yang mengembalikan `Janji`. * iterate(state) {// ...// Tiga jenis status keluaran yang mungkin: berakhir, tidak melakukan apa pun & data baru.//// 1. `null` - akhir tercapai, iterator tidak boleh dipanggil lagi.// 2. `{}` - menganggur, ada cukup subtugas dalam antrean, coba panggil// iterator nanti (saat turunan berikutnya selesai).// 3. {// status - status iterator baru yang perlu diingat (misalnya, offset untuk // kueri db), data// tugas apa pun yang dapat diserialkan - rangkaian subtugas baru yang akan dimasukkan ke dalam antrean// }//// PENTING! Iterator dapat dipanggil secara paralel dari pekerja yang berbeda. Kami// menggunakan masukan `status` untuk mengatasi tabrakan pada pembaruan redis. Jadi, jika Anda// membuat subtugas baru://// 1. `status` baru HARUS berbeda (untuk semua status sebelumnya)// 2. susunan `tugas` TIDAK HARUS kosong.//// Dalam kasus lain, Anda harus memberi sinyal tentang 'end' atau 'idle'.//// Kombinasi yang tidak valid akan menyebabkan 'end' + error event.//return { state: newState, task: chunksArray}; }});// jalankan iteratorqueue.lazy_mapper().run();
Mengapa keajaiban gila ini ditemukan?
Bayangkan Anda perlu membangun kembali 10 juta postingan forum. Anda ingin membagi pekerjaan menjadi bagian-bagian kecil yang sama, tetapi postingan tidak memiliki enumerasi bilangan bulat berurutan, hanya ID mongo. Apa yang bisa kamu lakukan?
Permintaan skip
+ limit
langsung sangat mahal pada koleksi besar di database mana pun.
Anda tidak dapat membagi berdasarkan interval tanggal, karena kepadatan postingan sangat bervariasi dari postingan pertama hingga terakhir.
Anda dapat menambahkan bidang yang diindeks dengan nomor acak ke setiap posting. Kemudian bagi berdasarkan interval. Itu akan berhasil, tetapi akan menyebabkan akses disk acak - tidak keren.
Solusinya adalah dengan menggunakan iterative mapper, yang dapat mengingat "posisi sebelumnya". Dalam hal ini, Anda akan melakukan permintaan range
+ limit
alih-alih skip
+ limit
. Itu bekerja dengan baik dengan database. Bonus tambahannya adalah:
Anda tidak perlu menyimpan semua subtugas dalam antrean. Misalnya, Anda dapat membuat 100 potongan dan menambahkan 100 potongan berikutnya ketika potongan sebelumnya akan selesai.
Fase pemetaan menjadi terdistribusi dan Anda dapat mulai memantau kemajuan total dengan segera.
Redis yang dijalankan dengan cepat melalui buruh pelabuhan:
# startdocker run -d -p 6379:6379 --name redis1 redis# stopdocker stop redis1 buruh pelabuhan rm redis1
Tentu kita mengenal kue, seledri dan akka. Target kami adalah memiliki keseimbangan antara kesederhanaan dan kekuatan. Jadi, kami tidak tahu apakah idoit
berfungsi dengan baik dalam cluster dengan ribuan instance. Tapi itu akan baik-baik saja dalam volume yang lebih kecil dan sangat mudah digunakan.
kue tidak sesuai dengan kebutuhan kita, karena:
konsep "prioritas" itu tidak fleksibel dan tidak melindungi dengan baik dari kuncian oleh tugas-tugas berat
tidak ada pengelompokan/rangkaian tugas dan sebagainya
tidak ada jaminan kuat atas konsistensi data
Di idoit
kami peduli tentang:
grup tugas/operasi rantai & meneruskan data antar tugas (mirip dengan seledri)
kumpulan pekerja untuk mengisolasi eksekusi tugas berdasarkan jenisnya.
mudah digunakan & diinstal (hanya diperlukan redis, dapat dijalankan pada proses yang ada)
konsistensi akhir dari data yang disimpan
gula esensial seperti penjadwal bawaan
pemetaan berulang untuk muatan besar (fitur unik, sangat berguna untuk banyak tugas pemeliharaan)
pelacakan kemajuan tugas
menghindari kunci global
Redis masih bisa menjadi titik kegagalan, tapi itu harga yang bisa diterima demi kesederhanaan. Karenanya Anda bisa mendapatkan ketersediaan yang lebih baik melalui bus pesan terdistribusi seperti RMQ. Namun dalam banyak kasus, yang lebih penting adalah menjaga segala sesuatunya tetap sederhana. Dengan idoit
Anda dapat menggunakan kembali teknologi yang ada tanpa biaya tambahan.