เพิ่มพลังให้กับคิวของคุณด้วยส่วนหน้าแบบมืออาชีพ:
รับภาพรวมที่สมบูรณ์ของคิวทั้งหมดของคุณ
ตรวจสอบงาน ค้นหา ลองใหม่ หรือเลื่อนตำแหน่งงานที่ล่าช้า
ตัวชี้วัดและสถิติ
และคุณสมบัติอื่น ๆ อีกมากมาย
ลงทะเบียนที่ Taskforce.sh
การใช้งาน CPU น้อยที่สุดเนื่องจากการออกแบบที่ไม่มีการโพล
การออกแบบที่แข็งแกร่งบนพื้นฐานของ Redis
งานล่าช้า.
กำหนดเวลาและทำซ้ำงานตามข้อกำหนดของ cron
ตัวจำกัดอัตราสำหรับงาน
ลองอีกครั้ง
ลำดับความสำคัญ.
เห็นพ้องต้องกัน
หยุดชั่วคราว/เล่นต่อ—ทั่วโลกหรือภายในเครื่อง
งานหลายประเภทต่อคิว
ฟังก์ชันการประมวลผลแบบเธรด (แซนด์บ็อกซ์)
การกู้คืนอัตโนมัติจากกระบวนการขัดข้อง
และก็มาถึงแผนงาน...
การรับทราบงานเสร็จสิ้น (คุณสามารถใช้รูปแบบคิวข้อความในระหว่างนี้)
ความสัมพันธ์ระหว่างงานระหว่างพ่อแม่และลูก
มี UI ของบุคคลที่สามบางส่วนที่คุณสามารถใช้เพื่อตรวจสอบ:
BullMQ
คณะทำงานเฉพาะกิจ
บูล v3
คณะทำงานเฉพาะกิจ
กระดานกระทิง
ตัวแทนกระทิง
มอนิเตอร์วัว
มอนิเตอร์
กระทิง <= v2
มาทาดอร์
ปฏิกิริยากระทิง
ตูเรโร
ด้วยผู้ส่งออกคิวกระทิง Prometheus
เนื่องจากมีโซลูชันคิวงานอยู่สองสามโซลูชัน ต่อไปนี้เป็นตารางเปรียบเทียบ:
Dragonfly เป็นการทดแทน Redis™ ใหม่ที่สามารถใช้งานร่วมกับ BullMQ ได้อย่างสมบูรณ์ และนำข้อได้เปรียบที่สำคัญบางประการเหนือ Redis™ เช่น ประสิทธิภาพที่ดีขึ้นอย่างมากโดยใช้แกน CPU ทั้งหมดที่มีอยู่ ตลอดจนโครงสร้างข้อมูลที่มีประสิทธิภาพหน่วยความจำเร็วขึ้นและมากขึ้น อ่านเพิ่มเติมที่นี่เกี่ยวกับวิธีการใช้กับ BullMQ | |
หากคุณต้องการอินสแตนซ์ Redis ที่ใช้งานจริงคุณภาพสูงสำหรับโปรเจ็กต์ Bull ของคุณ โปรดพิจารณาสมัคร Memetria สำหรับ Redis ซึ่งเป็นผู้นำในโฮสติ้ง Redis ที่ทำงานได้อย่างสมบูรณ์แบบกับ BullMQ ใช้รหัสโปรโมชั่น "BULLMQ" เมื่อสมัครเพื่อช่วยเราสนับสนุนการพัฒนา BullMQ! |
คุณสมบัติ | BullMQ-Pro | BullMQ | วัว | คิว | ผึ้ง | กำหนดการ |
---|---|---|---|---|---|---|
แบ็กเอนด์ | ทำซ้ำ | ทำซ้ำ | ทำซ้ำ | ทำซ้ำ | ทำซ้ำ | มองโก |
สิ่งที่สังเกตได้ | ||||||
ขีดจำกัดอัตรากลุ่ม | ||||||
การสนับสนุนกลุ่ม | ||||||
การสนับสนุนแบทช์ | ||||||
การพึ่งพาพ่อแม่/ลูก | ||||||
ลำดับความสำคัญ | ||||||
เห็นพ้องด้วย | ||||||
งานล่าช้า | ||||||
เหตุการณ์ระดับโลก | ||||||
ตัวจำกัดอัตรา | ||||||
หยุดชั่วคราว/ดำเนินการต่อ | ||||||
ผู้ปฏิบัติงานแซนด์บ็อกซ์ | ||||||
งานที่ทำซ้ำได้ | ||||||
ปฏิบัติการปรมาณู | ||||||
ความพากเพียร | ||||||
UI | ||||||
ปรับให้เหมาะสมสำหรับ | งาน/ข้อความ | งาน/ข้อความ | งาน/ข้อความ | งาน | ข้อความ | งาน |
ติดตั้ง npm bull --save
หรือ
เส้นด้ายเพิ่มวัว
ข้อกำหนด: Bull ต้องการ Redis เวอร์ชันที่มากกว่าหรือเท่ากับ 2.8.18
npm ติดตั้ง @types/bull --save-dev
เส้นด้ายเพิ่ม --dev @types/bull
ปัจจุบันคำจำกัดความยังคงอยู่ใน repo ของ SureTyped
เรายินดีรับการสนับสนุนทุกประเภท ทั้งการแก้ไขโค้ด คุณลักษณะใหม่ หรือการปรับปรุงเอกสาร การจัดรูปแบบโค้ดบังคับใช้โดยพริตตี้ สำหรับคอมมิต โปรดปฏิบัติตามแบบแผนคอมมิตทั่วไป รหัสทั้งหมดจะต้องผ่านกฎ Lint และชุดทดสอบก่อนจึงจะสามารถรวมเข้ากับการพัฒนาได้
const Queue = ต้องการ ('bull'); const videoQueue = คิวใหม่ ('การแปลงรหัสวิดีโอ', 'redis://127.0.0.1:6379'); const audioQueue = คิวใหม่ ('การแปลงรหัสเสียง', { redis: { พอร์ต : 6379, โฮสต์: '127.0.0.1', รหัสผ่าน: 'foobared' } }); // ระบุการเชื่อมต่อ Redis โดยใช้ objectconst imageQueue = new Queue('image transcoding');const pdfQueue = new Queue('pdf transcoding');videoQueue.process(function (job, done) { // job.data มีข้อมูลที่กำหนดเองที่ส่งผ่านเมื่อมีการสร้างงาน // job.id มี id ของงานนี้ // แปลงรหัสวิดีโอแบบอะซิงโครนัสและรายงานความคืบหน้า งาน.ความคืบหน้า(42); // โทรเสร็จสิ้นเมื่อเสร็จแล้ว เสร็จแล้ว(); // หรือให้ข้อผิดพลาดหากมีข้อผิดพลาด เสร็จสิ้น (ข้อผิดพลาดใหม่ ('ข้อผิดพลาดในการแปลงรหัส')); // หรือส่งผ่านผลลัพธ์ เสร็จสิ้น (null, { framerate: 29.5 /* ฯลฯ... */ }); // หากงานเกิดข้อผิดพลาดที่ไม่สามารถจัดการได้ งานนั้นจะได้รับการจัดการอย่างถูกต้องเช่นกัน โยนข้อผิดพลาดใหม่ ('ข้อผิดพลาดที่ไม่คาดคิด');}); audioQueue.process (ฟังก์ชั่น (งานเสร็จแล้ว) { // แปลงรหัสเสียงแบบอะซิงโครนัสและรายงานความคืบหน้า งาน.ความคืบหน้า(42); // โทรเสร็จสิ้นเมื่อเสร็จแล้ว เสร็จแล้ว(); // หรือให้ข้อผิดพลาดหากมีข้อผิดพลาด เสร็จสิ้น (ข้อผิดพลาดใหม่ ('ข้อผิดพลาดในการแปลงรหัส')); // หรือส่งผ่านผลลัพธ์ เสร็จสิ้น (null, { อัตราตัวอย่าง: 48000 /* ฯลฯ... */ }); // หากงานเกิดข้อผิดพลาดที่ไม่สามารถจัดการได้ งานนั้นจะได้รับการจัดการอย่างถูกต้องเช่นกัน โยนข้อผิดพลาดใหม่ ('ข้อผิดพลาดที่ไม่คาดคิด');}); imageQueue.process (ฟังก์ชั่น (งานเสร็จแล้ว) { // แปลงรหัสรูปภาพแบบอะซิงโครนัสและรายงานความคืบหน้า งาน.ความคืบหน้า(42); // โทรเสร็จสิ้นเมื่อเสร็จแล้ว เสร็จแล้ว(); // หรือให้ข้อผิดพลาดหากมีข้อผิดพลาด เสร็จสิ้น (ข้อผิดพลาดใหม่ ('ข้อผิดพลาดในการแปลงรหัส')); // หรือส่งผ่านผลลัพธ์ เสร็จแล้ว(null, { ความกว้าง: 1280, สูง: 720 /* ฯลฯ... */ }); // หากงานเกิดข้อผิดพลาดที่ไม่สามารถจัดการได้ งานนั้นจะได้รับการจัดการอย่างถูกต้องเช่นกัน โยนข้อผิดพลาดใหม่ ('ข้อผิดพลาดที่ไม่คาดคิด');}); pdfQueue.process (ฟังก์ชั่น (งาน) { // โปรเซสเซอร์ยังสามารถคืนสัญญาแทนการใช้การโทรกลับที่ทำเสร็จแล้ว ส่งคืน pdfAsyncProcessor();});videoQueue.add({ วิดีโอ: 'http://example.com/video1.mov' });audioQueue.add({ เสียง: 'http://example.com/audio1.mp3 ' });imageQueue.add({ รูปภาพ: 'http://example.com/image1.tiff' });
หรือคุณสามารถคืนสัญญาแทนการใช้การโทรกลับ done
:
videoQueue.process(function (job) { // อย่าลืมลบการโทรกลับที่ทำเสร็จแล้ว! // เพียงแค่คืนสัญญา ส่งคืน fetchVideo(job.data.url).then(transcodeVideo); // จัดการกับการปฏิเสธสัญญา return Promise.reject (ข้อผิดพลาดใหม่ ('การแปลงรหัสข้อผิดพลาด')); // ส่งผ่านค่าที่สัญญาได้รับการแก้ไขด้วยเหตุการณ์ "เสร็จสมบูรณ์" return Promise.resolve ({ อัตราเฟรม: 29.5 /* ฯลฯ... */ }); // หากงานเกิดข้อผิดพลาดที่ไม่สามารถจัดการได้ งานนั้นจะได้รับการจัดการอย่างถูกต้องเช่นกัน โยนข้อผิดพลาดใหม่ ('ข้อผิดพลาดบางอย่างที่ไม่คาดคิด'); //เหมือนกับ. return Promise.reject (ข้อผิดพลาดใหม่ ('ข้อผิดพลาดที่ไม่คาดคิดบางอย่าง'));});
ฟังก์ชันกระบวนการสามารถรันในกระบวนการที่แยกต่างหากได้ สิ่งนี้มีข้อดีหลายประการ:
กระบวนการนี้อยู่ในแซนด์บ็อกซ์ ดังนั้นหากเกิดข้อขัดข้อง ก็จะไม่ส่งผลกระทบต่อผู้ปฏิบัติงาน
คุณสามารถเรียกใช้โค้ดการบล็อกได้โดยไม่กระทบต่อคิว (งานจะไม่หยุดชะงัก)
การใช้งาน CPU แบบมัลติคอร์ที่ดีขึ้นมาก
เชื่อมต่อกับ Redis น้อยลง
เพื่อที่จะใช้คุณสมบัตินี้ เพียงแค่สร้างไฟล์แยกต่างหากด้วยตัวประมวลผล:
// processor.jsmodule.exports = ฟังก์ชั่น (งาน) { //ทำงานหนักไปหน่อย กลับ Promise.resolve (ผลลัพธ์);}
และกำหนดโปรเซสเซอร์ดังนี้:
// Single process:queue.process('/path/to/my/processor.js');// คุณสามารถใช้การทำงานพร้อมกันได้เช่นกัน:queue.process(5, '/path/to/my/processor.js' );// และตั้งชื่อโปรเซสเซอร์:queue.process('my processor', 5, '/path/to/my/processor.js');
สามารถเพิ่มงานลงในคิวและประมวลผลซ้ำๆ ตามข้อกำหนดของ cron:
PaymentQueue.process(function (งาน) {// ตรวจสอบการชำระเงิน - // ทำซ้ำงานชำระเงินทุกวัน เวลา 03:15 น. PaymentsQueue.add(ข้อมูลการชำระเงิน, { ทำซ้ำ: { cron: '15 3 * * *' } });
โปรดทราบว่าให้ตรวจสอบนิพจน์ของคุณที่นี่เพื่อยืนยันว่าถูกต้อง: เครื่องมือสร้างนิพจน์ cron
คิวสามารถหยุดชั่วคราวและดำเนินการต่อได้ทั่วโลก (ผ่าน true
เพื่อหยุดการประมวลผลชั่วคราวสำหรับผู้ปฏิบัติงานรายนี้):
Queue.pause().แล้ว(ฟังก์ชั่น () { // คิวหยุดชั่วคราวแล้ว});queue.resume().then(function () { // คิวกลับมาดำเนินการต่อแล้ว})
คิวปล่อยเหตุการณ์ที่เป็นประโยชน์บางอย่างออกมา เช่น...
.on('เสร็จสมบูรณ์', ฟังก์ชั่น (งาน, ผลลัพธ์) { // งานเสร็จสมบูรณ์พร้อมผลลัพธ์เอาท์พุต!})
สำหรับข้อมูลเพิ่มเติมเกี่ยวกับกิจกรรม รวมถึงรายการกิจกรรมทั้งหมดที่เริ่มทำงาน โปรดดูข้อมูลอ้างอิงกิจกรรม
คิวมีราคาถูก ดังนั้นหากคุณต้องการหลายคิว ให้สร้างคิวใหม่โดยใช้ชื่อที่แตกต่างกัน:
const userJohn = คิวใหม่ ('john'); const userLisa = คิวใหม่ ('lisa');...
อย่างไรก็ตาม ทุกอินสแตนซ์คิวจะต้องมีการเชื่อมต่อ Redis ใหม่ โปรดตรวจสอบวิธีใช้การเชื่อมต่อซ้ำ หรือคุณสามารถใช้ตัวประมวลผลที่กำหนดชื่อเพื่อให้ได้ผลลัพธ์ที่คล้ายกันได้
หมายเหตุ: ตั้งแต่เวอร์ชัน 3.2.0 และสูงกว่า ขอแนะนำให้ใช้โปรเซสเซอร์แบบเธรดแทน
คิวมีความทนทานและสามารถทำงานแบบขนานในหลายเธรดหรือกระบวนการโดยไม่มีความเสี่ยงต่ออันตรายหรือความเสียหายของคิว ตรวจสอบตัวอย่างง่ายๆ นี้โดยใช้คลัสเตอร์เพื่อทำให้งานพร้อมกันระหว่างกระบวนการ:
const Queue = ต้องการ ('bull'); const cluster = ต้องการ ('cluster'); const numWorkers = 8; const Queue = คิวใหม่ ('ทดสอบคิวพร้อมกัน'); if (cluster.isMaster) { สำหรับ (ให้ i = 0; i < numWorkers; i++) {cluster.fork(); - คลัสเตอร์.on('online', function (worker) {// มาสร้างงานสองสามอย่างสำหรับผู้ปฏิบัติงานคิวสำหรับ (let i = 0; i < 500; i++) { Queue.add({ foo: 'bar' }); }; - cluster.on('exit', function (ผู้ปฏิบัติงาน, รหัส, สัญญาณ) {console.log ('worker ' + worker.process.pid + ' dead'); });} อื่น { Queue.process(function (job, jobDone) {console.log('งานที่ทำโดยคนงาน', cluster.worker.id, job.id);jobDone(); -
สำหรับเอกสารฉบับเต็ม โปรดดูข้อมูลอ้างอิงและรูปแบบทั่วไป:
Guide — จุดเริ่มต้นในการพัฒนากับ Bull
การอ้างอิง — เอกสารอ้างอิงพร้อมออบเจ็กต์และวิธีการทั้งหมดที่มี
รูปแบบ — ชุดตัวอย่างสำหรับรูปแบบทั่วไป
ใบอนุญาต — ใบอนุญาต Bull — มันคือ MIT
หากคุณเห็นสิ่งใดที่สามารถใช้เอกสารเพิ่มเติมได้ โปรดส่งคำขอดึง!
คิวมุ่งเป้าไปที่กลยุทธ์การทำงาน "อย่างน้อยหนึ่งครั้ง" ซึ่งหมายความว่าในบางสถานการณ์ งานสามารถถูกประมวลผลได้มากกว่าหนึ่งครั้ง ส่วนใหญ่เกิดขึ้นเมื่อผู้ปฏิบัติงานไม่สามารถล็อกงานที่กำหนดในระหว่างระยะเวลารวมของการประมวลผล
เมื่อผู้ปฏิบัติงานกำลังประมวลผลงาน งานนั้นจะ "ล็อค" ไว้เพื่อให้ผู้ปฏิบัติงานรายอื่นไม่สามารถดำเนินการได้
สิ่งสำคัญคือต้องเข้าใจวิธีการทำงานของการล็อคเพื่อป้องกันไม่ให้งานของคุณสูญเสียการล็อค - หยุดทำงาน - และเป็นผลให้เริ่มต้นใหม่ได้ การล็อคถูกนำไปใช้ภายในโดยการสร้างการล็อคสำหรับ lockDuration
ในช่วงเวลา lockRenewTime
(ซึ่งโดยปกติคือครึ่งหนึ่ง lockDuration
) หาก lockDuration
ผ่านไปก่อนที่จะสามารถต่ออายุการล็อคได้ งานจะถือว่าหยุดทำงานและรีสตาร์ทโดยอัตโนมัติ มันจะถูก ประมวลผลสองครั้ง สิ่งนี้สามารถเกิดขึ้นได้เมื่อ:
กระบวนการโหนดที่เรียกใช้ตัวประมวลผลงานของคุณสิ้นสุดลงโดยไม่คาดคิด
ตัวประมวลผลงานของคุณใช้ CPU มากเกินไปและทำให้ Node event loop หยุดชะงัก และด้วยเหตุนี้ Bull จึงไม่สามารถต่ออายุการล็อคงานได้ (ดู #488 เพื่อดูว่าเราจะตรวจจับสิ่งนี้ได้ดีขึ้นได้อย่างไร) คุณสามารถแก้ไขได้โดยการแบ่งตัวประมวลผลงานของคุณออกเป็นส่วนเล็กๆ เพื่อไม่ให้มีส่วนใดส่วนหนึ่งสามารถบล็อกการวนซ้ำเหตุการณ์ของโหนดได้ หรือคุณสามารถส่งค่าที่มากขึ้นสำหรับการตั้ง lockDuration
(โดยข้อดีคือจะใช้เวลานานกว่าในการจดจำงานที่หยุดจริง)
ด้วยเหตุนี้ คุณควรรับฟังเหตุการณ์ stalled
และบันทึกสิ่งนี้ลงในระบบตรวจสอบข้อผิดพลาดของคุณ เนื่องจากนี่หมายความว่างานของคุณมีแนวโน้มที่จะได้รับการประมวลผลซ้ำซ้อน
เพื่อเป็นการป้องกันงานที่มีปัญหาจะไม่เริ่มต้นใหม่อย่างไม่มีกำหนด (เช่น หากตัวประมวลผลงานขัดข้องกระบวนการโหนดอยู่เสมอ) งานจะถูกกู้คืนจากสถานะจนตรอกสูงสุดครั้ง maxStalledCount
(ค่าเริ่มต้น: 1
)