เอ็นจิ้นคิวงานที่ได้รับการสนับสนุน Redis พร้อมการควบคุมงานขั้นสูงและความสอดคล้องในที่สุด
การจัดกลุ่มงาน การผูกมัด ตัววนซ้ำสำหรับช่วงขนาดใหญ่
เลื่อนงานและกำหนดเวลาการทำงาน
การกระจายโหลด + พูลผู้ปฏิบัติงาน
ง่ายต่อการฝัง
idoit
ให้การควบคุมขั้นสูงเพื่อนำไปใช้
การจัดกลุ่ม งาน group
พิเศษ ดำเนินงานของเด็ก ๆ และรอจนกว่างานทั้งหมดจะเสร็จสิ้น มีประโยชน์สำหรับแผนที่/ลดตรรกะ
การผูกมัด งาน chain
พิเศษดำเนินการเด็กทีละคน ยังมีประโยชน์สำหรับการลดแผนที่หรือแยกงานที่ซับซ้อนมากให้เป็นขั้นตอนที่ง่ายกว่าอีกด้วย
ตัววนซ้ำการทำแผนที่ คุณสมบัติพิเศษสำหรับน้ำหนักบรรทุกขนาดใหญ่ เพื่อผลิตชิ้นส่วนตามความต้องการ ประโยชน์:
ไม่มีความล่าช้าในเฟสการทำแผนที่ การประมวลผลชิ้นส่วนจะเริ่มต้นทันที
ง่ายต่อการปรับแต่งการสืบค้น DB เพื่อสร้างชิ้นส่วนที่มีขนาดเท่ากัน (ข้าม + จำกัดการสืบค้นจะช้ามากกับข้อมูลขนาดใหญ่)
ความคืบหน้า . เมื่อคุณใช้สถานการณ์กลุ่ม/ห่วงโซ่/แผนที่ คุณสามารถติดตามความคืบหน้าทั้งหมดผ่านทางพาเรนต์ระดับบนได้อย่างง่ายดาย งานแบบสแตนด์อโลนที่ใช้เวลานานสามารถแจ้งให้ผู้ใช้ทราบเกี่ยวกับการเปลี่ยนแปลงความคืบหน้าได้
พูลคนงาน คุณสามารถแบ่งงานตามกระบวนการที่แตกต่างกันได้ เช่น ถ้าคุณไม่ต้องการให้งานหนักมาขวางงานเบา
ผู้กำหนดตารางเวลา cron ในตัวช่วยให้สามารถดำเนินงานตามกำหนดเวลาที่กำหนด
ข้อมูลทั้งหมดใน Redis มีความสอดคล้องกันตลอดไป
งานไม่สามารถสูญหายได้ แต่สามารถทำงานได้สองครั้งในกรณี Edge (หากกระบวนการขัดข้องเมื่อฟังก์ชันงานกำลังจะเสร็จสิ้น)
ความคืบหน้าสามารถนับ "เร็วขึ้น" หากใช้ task.progressAdd()
และประมวลผลข้อขัดข้องก่อนที่งานจะเสร็จสิ้น แต่นั่นไม่สำคัญ เนื่องจากข้อมูลดังกล่าวสามารถใช้ได้เฉพาะกับการอัปเดตแถบความคืบหน้าของอินเทอร์เฟซเท่านั้น ในกรณีส่วนใหญ่คุณจะไม่เห็นความแตกต่าง
ต้องใช้ node.js
6+ และ redis
3.0+
npm ติดตั้ง idoit --save
redisURL (สตริง) - URL การเชื่อมต่อ redis
การทำงานพร้อมกัน (จำนวน) - งานสูงสุดที่จะใช้พร้อมกันโดยผู้ปฏิบัติงานคนเดียว โดยค่าเริ่มต้นคือ 100 งาน
พูล (สตริง) - ชื่อพูลผู้ปฏิบัติงาน "ค่าเริ่มต้น" หากไม่ได้ตั้งค่า ใช้หากอินสแตนซ์คิวนี้ใช้งานเท่านั้น (หลัง .start()
) คุณสามารถกำหนดเส้นทางงานไปยังกลุ่มผู้ปฏิบัติงานเฉพาะเพื่อหลีกเลี่ยงการล็อกที่ไม่ต้องการ คุณสามารถตั้ง pool
เป็น Array, [ 'pool1', 'pool2' ]
เพื่อใช้งานจากหลายพูล (เพื่อการพัฒนา/การทดสอบ)
ns (สตริง) - เนมสเปซข้อมูล ปัจจุบันใช้เป็นคำนำหน้าคีย์ Redis "idoitqueue:" ตามค่าเริ่มต้น
แนวทางปฏิบัติที่ดีคือมีกลุ่มผู้ปฏิบัติงานแยกต่างหากสำหรับงานบล็อกจำนวนมากและงานที่ไม่บล็อก ตัวอย่างเช่น ไม่มีใครควรปิดกั้นการส่งอีเมลเร่งด่วน ดังนั้น ให้สร้างกระบวนการของผู้ปฏิบัติงานหลายๆ กระบวนการ ปักหมุดกระบวนการเหล่านั้นไปยังกลุ่มต่างๆ และตั้งค่าการทำงานพร้อมกันอย่างเหมาะสม งานที่ไม่มีการบล็อกสามารถยกเลิกพร้อมกันได้ และคุณก็โอเคกับ concurrency
เริ่มต้น = 100 งานการบล็อกควรใช้ทีละงาน ตั้งค่า concurrency
= 1 สำหรับผู้ปฏิบัติงานเหล่านั้น
บันทึก. อาจเกิดขึ้นได้หากคุณลบงานบางประเภทออกจากแอปของคุณ ในกรณีนี้ ข้อมูลที่ถูกละเลยจะถูกล้างหลังจากผ่านไป 3 วัน
ตัวเลือก:
ชื่อ (สตริง) - ชื่อของงาน
baseClass (ฟังก์ชัน) - ตัวเลือก ตัวสร้างงานฐาน "งาน" ตามค่าเริ่มต้น
init (ฟังก์ชัน) - ทางเลือกที่ใช้สำหรับการเริ่มต้นงาน async ควรส่งคืน Promise
สิ่งนี้ (วัตถุ) - งานปัจจุบัน (ผลรวมของงานมีอยู่ในชื่อ this.total
)
TaskID (ฟังก์ชัน) - เป็นทางเลือก ควรส่งคืนรหัสงานใหม่ จำเป็นสำหรับการสร้างงาน "พิเศษเฉพาะ" เท่านั้น ให้ส่งคืนค่าสุ่มตามค่าเริ่มต้น เรียกว่า: function (taskData)
น้ำตาล: หากคุณส่งสตริงธรรมดา มันจะถูกรวมเข้ากับฟังก์ชัน ซึ่งจะส่งคืนสตริงนี้เสมอ
กระบวนการ (ฟังก์ชัน) - ฟังก์ชันงานหลักที่เรียกว่า: task.process(...args)
ควรคืน Promise
นี่ (วัตถุ) - งานปัจจุบัน
ลองอีกครั้ง (หมายเลข) - ทางเลือก จำนวนการลองใหม่เมื่อเกิดข้อผิดพลาด ค่าเริ่มต้น 2
retryDelay (หมายเลข) - เป็นทางเลือก หน่วงเวลาเป็น ms หลังจากลองอีกครั้ง ค่าเริ่มต้น 60,000 ms
หมดเวลา (หมายเลข) - ทางเลือก, หมดเวลาดำเนินการ, ค่าเริ่มต้น 120,000 ms
ผลรวม (ตัวเลข) - ทางเลือก ค่าความคืบหน้าสูงสุด ค่าเริ่มต้น 1 หากคุณไม่แก้ไขพฤติกรรม ความคืบหน้าจะเริ่มต้นด้วย 0 และกลายเป็น 1 เมื่อสิ้นสุดงาน
postponeDelay (หมายเลข) - ทางเลือก หากการเลื่อนถูกเรียกโดยไม่ล่าช้า ความล่าช้าจะถือว่าเท่ากับค่านี้ (ในหน่วยมิลลิวินาที)
cron (สตริง) - ทางเลือก, สตริง cron ("15 */6 * * *"), ค่าว่างเริ่มต้น
แทร็ก (หมายเลข) - ค่าเริ่มต้น 3600000ms (1 ชม.) ถึงเวลาจดจำงานที่กำหนดเวลาไว้จาก cron เพื่อหลีกเลี่ยงการรันซ้ำหากเซิร์ฟเวอร์หลายตัวในคลัสเตอร์มีนาฬิกาผิด อย่าตั้งค่าสูงเกินไปสำหรับงานที่ใช้บ่อยมาก เพราะอาจกินพื้นที่หน่วยความจำได้มาก
รับงานตามรหัส ส่งคืน Promise ที่แก้ไขด้วยงานหรือเป็น null
หากไม่มีงาน
ฟิลด์งานที่คุณสามารถใช้:
รวม - ความคืบหน้าของงานทั้งหมด
ความคืบหน้า - ความคืบหน้าของงานปัจจุบัน
ผลลัพธ์ - ผลลัพธ์ของงาน
ข้อผิดพลาด - ข้อผิดพลาดของงาน
ยกเลิกงาน ส่งคืนสัญญาที่ได้รับการแก้ไขด้วยงาน
บันทึก. คุณสามารถยกเลิกได้เฉพาะงานที่ไม่มีผู้ปกครอง
เริ่มต้นผู้ปฏิบัติงานและเริ่มต้นการใช้ข้อมูลงาน Return Promise
แก้ไขเมื่อคิวพร้อม (โทร .ready()
ภายใน)
หากระบุ pool
ใน cunstructor เฉพาะงานที่ถูกกำหนดเส้นทางไปยังการดึงนี้เท่านั้นที่จะถูกใช้
งดรับงานใหม่จากคิว Return Promise
จะได้รับการแก้ไขเมื่องานที่กำลังดำเนินการอยู่ในผู้ปฏิบัติงานรายนี้เสร็จสมบูรณ์
Return Promise
แก้ไขเมื่อคิวพร้อมใช้งาน (หลังจากเหตุการณ์ 'เชื่อมต่อ' ดูด้านล่าง)
อัปเดตตัวเลือกตัวสร้าง ยกเว้น redisURL
idoit
เป็น EventEmitter
ที่เริ่มเหตุการณ์บางอย่าง:
ready
เมื่อการเชื่อมต่อ Redis หมดลงและสามารถดำเนินการคำสั่งได้ (สามารถลงทะเบียนงานได้โดยไม่ต้องเชื่อมต่อ)
error
เมื่อมีข้อผิดพลาดเกิดขึ้น
task:progress
, task:progress:<task_id>
- เมื่องานอัปเดตความคืบหน้า ข้อมูลเหตุการณ์คือ: { id, uid, ผลรวม, ความคืบหน้า }
task:end
, task:end:<task_id>
- เมื่องานสิ้นสุด ข้อมูลเหตุการณ์คือ: { id, uid }
สร้างงานใหม่ด้วยพารามิเตอร์เสริม
แทนที่คุณสมบัติของงาน ตัวอย่างเช่น คุณอาจต้องการมอบหมายงานกลุ่ม/ลูกโซ่เฉพาะให้กับกลุ่มอื่น
รันงานทันที ส่งคืนสัญญาที่ได้รับการแก้ไขด้วยรหัสงาน
เลื่อนการดำเนินการงานเพื่อ delay
มิลลิวินาที (หรือเป็น task.postponeDelay
)
ส่งคืนสัญญาที่ได้รับการแก้ไขด้วยรหัสงาน
รีสตาร์ทงานที่กำลังทำงานอยู่
add_retry (บูลีน) - เป็นทางเลือก ไม่ว่าจะเพิ่มจำนวนการลองใหม่หรือไม่ (ค่าเริ่มต้น: false)
หากเป็น true
จำนวนการลองใหม่จะเพิ่มขึ้น และงานจะไม่เริ่มต้นใหม่ในกรณีที่เกินจำนวนนั้น
หากเป็น false
จำนวนการลองใหม่จะยังคงเหมือนเดิม ดังนั้นงานจึงสามารถรีสตาร์ทตัวเองได้อย่างไม่มีกำหนด
ดีเลย์ (ตัวเลข) ดีเลย์ก่อนรีสตาร์ทในหน่วยมิลลิวินาที (ค่าเริ่มต้น: task.retryDelay
)
หมายเหตุ idoit
มีตรรกะการรีสตาร์ทในตัวสำหรับข้อผิดพลาดของงานแล้ว อาจไม่ควรใช้วิธีนี้โดยตรง มีการเปิดเผยสำหรับกรณีที่เฉพาะเจาะจงมาก
เพิ่มความคืบหน้าของงานปัจจุบัน
ส่งคืนสัญญาที่ได้รับการแก้ไขด้วยรหัสงาน
อัพเดตกำหนดเวลางานปัจจุบัน
ส่งคืนสัญญาที่ได้รับการแก้ไขด้วยรหัสงาน
สร้างงานใหม่ ดำเนินการเด็กไปพร้อมๆ กัน
คิว.กลุ่ม([ คิวเด็ก1() คิวเด็ก2() คิวเด็ก3()]).รัน()
ผลลัพธ์ของกลุ่มคือผลลัพธ์อาร์เรย์ลูกที่ไม่เรียงลำดับ
สร้างงานใหม่ ดำเนินการเด็ก ๆ ตามลำดับ หากมีลูกคนใดล้มเหลว - โซ่ก็ล้มเหลวเช่นกัน
Queue.registerTask('คูณ', (a, b) => a * b);queue.registerTask('subtract', (a, b) => a - b);queue.chain([ Queue.multiply(2, 3), // 2 * 3 = 6 Queue.subtract(10), // 10 - 6 = 4 Queue.multiply(3) // 3 * 4 = 12]).run()
ผลลัพธ์ของงานก่อนหน้าส่งผ่านเป็นอาร์กิวเมนต์สุดท้ายของงานถัดไป ผลลัพธ์ของลูกโซ่เป็นผลมาจากงานสุดท้ายในลูกโซ่
วิธีพิเศษในการรันการทำแผนที่ขนาดใหญ่ในรูปแบบขี้เกียจ (ตามความต้องการ) ดูความคิดเห็นด้านล่าง
// ลงทะเบียนตัววนซ้ำ Taskqueue.registerTask ({ ชื่อ: 'lazy_mapper', baseClass: Queue.Iterator, // เมธอดนี้ถูกเรียกใช้เมื่องานเริ่มต้นและที่จุดสิ้นสุดของเด็กทุกคน มันสามารถเป็นได้ // ฟังก์ชันตัวสร้างหรือฟังก์ชันที่ส่งคืน "สัญญา" * วนซ้ำ (สถานะ) {// ...// สถานะเอาต์พุตที่เป็นไปได้สามประเภท: สิ้นสุดแล้ว, ไม่ต้องทำอะไรเลย & ข้อมูลใหม่//// 1. `null` - ถึงจุดสิ้นสุดแล้ว ไม่ควรเรียกตัววนซ้ำอีกต่อไป// 2. `{}` - ไม่ได้ใช้งาน มีงานย่อยเพียงพอในคิว พยายามเรียก// ตัววนซ้ำในภายหลัง (เมื่อรายการย่อยถัดไปเสร็จสิ้น)// 3. {// สถานะ - สถานะตัววนซ้ำใหม่ที่ต้องจำ (เช่น ออฟเซ็ตสำหรับ // db)) ข้อมูลซีเรียลไลซ์ใด ๆ // งาน - อาร์เรย์ของงานย่อยใหม่ที่จะพุชเข้าสู่คิว // }//// สำคัญ! Iterator สามารถเรียกพร้อมกันจากผู้ปฏิบัติงานที่แตกต่างกัน เรา// ใช้อินพุต `state` เพื่อแก้ไขข้อขัดแย้งในการอัปเดต Redis ดังนั้น หากคุณ// สร้างงานย่อยใหม่/// 1. `สถานะ` ใหม่จะต้องแตกต่าง (สำหรับสถานะก่อนหน้าทั้งหมด)// 2. `อาร์เรย์งาน` จะต้องไม่ว่างเปล่า//// ในกรณีอื่น ๆ คุณ ควรส่งสัญญาณเกี่ยวกับ 'end' หรือ 'idle'.//// การรวมกันที่ไม่ถูกต้องจะทำให้เกิด 'end' + เหตุการณ์ข้อผิดพลาด // return { state: newState, งาน: chunksArray}; }});// รัน iteratorqueue.lazy_mapper().run();
เหตุใดเวทมนตร์อันบ้าคลั่งนี้จึงถูกประดิษฐ์ขึ้น?
ลองจินตนาการว่าคุณต้องสร้างโพสต์ในฟอรัมขึ้นมาใหม่ 10 ล้านโพสต์ คุณต้องการแบ่งงานออกเป็นชิ้นเล็กๆ เท่าๆ กัน แต่โพสต์ไม่มีการแจกแจงจำนวนเต็มตามลำดับ มีเพียง mongo ID เท่านั้น คุณทำอะไรได้บ้าง?
คำขอ skip
โดยตรง + limit
นั้นมีราคาแพงมากสำหรับคอลเลกชันขนาดใหญ่ในฐานข้อมูลใด ๆ
คุณไม่สามารถแบ่งตามช่วงวันที่ได้ เนื่องจากความหนาแน่นของโพสต์จะแตกต่างกันไปมากตั้งแต่โพสต์แรกจนถึงโพสต์สุดท้าย
คุณสามารถเพิ่มฟิลด์ที่จัดทำดัชนีพร้อมตัวเลขสุ่มให้กับทุกโพสต์ได้ แล้วแบ่งตามระยะ ซึ่งจะใช้งานได้ แต่จะทำให้เกิดการเข้าถึงดิสก์แบบสุ่ม - ไม่เจ๋ง
วิธีแก้ไขคือใช้ตัวทำแผนที่ซ้ำซึ่งสามารถจดจำ "ตำแหน่งก่อนหน้า" ได้ ในกรณีนี้ คุณจะทำการร้องขอ range
+ limit
แทน skip
+ limit
ทำงานได้ดีกับฐานข้อมูล โบนัสเพิ่มเติมคือ:
คุณไม่จำเป็นต้องเก็บงานย่อยทั้งหมดไว้ในคิว ตัวอย่างเช่น คุณสามารถสร้าง 100 ชิ้นและเพิ่ม 100 ชิ้นถัดไปเมื่อชิ้นก่อนหน้ากำลังจะเสร็จสิ้น
เฟสการทำแผนที่จะกระจายออกไป และคุณสามารถเริ่มติดตามความคืบหน้าทั้งหมดได้ทันที
Quik-run redis ผ่านนักเทียบท่า:
# startdocker run -d -p 6379:6379 --name redis1 redis# stopdocker หยุด redis1 นักเทียบท่า rm redis1
ด้วยเหตุนี้เราจึงคุ้นเคยกับคือ ขึ้นฉ่าย และอักกะ เป้าหมายของเราคือมีความสมดุลระหว่างความเรียบง่ายและพลัง ดังนั้นเราจึงไม่รู้ว่า idoit
ทำงานได้ดีในคลัสเตอร์ที่มีอินสแตนซ์นับพันหรือไม่ แต่ควรจะใช้ได้ในปริมาณน้อยและใช้งานง่ายมาก
kue ไม่โอเคสำหรับความต้องการของเรา เพราะ:
แนวคิด "ลำดับความสำคัญ" ไม่ยืดหยุ่นและป้องกันการล็อคจากงานหนักได้ไม่ดี
ไม่มีการจัดกลุ่มงาน/ผูกมัดและอื่นๆ
ไม่มีการรับประกันความสอดคล้องของข้อมูลอย่างเข้มงวด
ใน idoit
เราสนใจ:
การดำเนินการกลุ่มงาน/ลูกโซ่ & ส่งข้อมูลระหว่างงาน (คล้ายกับคื่นฉ่าย)
พูลผู้ปฏิบัติงานเพื่อแยกการดำเนินการตามประเภท
ใช้งานง่ายและติดตั้ง (ต้องใช้ Redis เท่านั้น สามารถเรียกใช้ในกระบวนการที่มีอยู่ได้)
ความสอดคล้องในที่สุดของข้อมูลที่เก็บไว้
น้ำตาลที่จำเป็นเช่นตัวกำหนดเวลาในตัว
ตัวทำแผนที่ซ้ำสำหรับเพย์โหลดขนาดใหญ่ (คุณสมบัติพิเศษ มีประโยชน์มากสำหรับงานบำรุงรักษาหลายอย่าง)
การติดตามความคืบหน้าของงาน
หลีกเลี่ยงการล็อคทั่วโลก
Redis ยังคงเป็นจุดล้มเหลว แต่นั่นเป็นราคาที่ยอมรับได้สำหรับความเรียบง่าย ด้วยเหตุนี้ คุณสามารถได้รับความพร้อมใช้งานที่ดีขึ้นผ่านทางบัสข้อความแบบกระจาย เช่น RMQ แต่ในหลายกรณี การทำสิ่งต่างๆ ให้เรียบง่ายเป็นสิ่งสำคัญกว่า ด้วย idoit
คุณสามารถนำเทคโนโลยีที่มีอยู่กลับมาใช้ใหม่ได้โดยไม่มีค่าใช้จ่ายเพิ่มเติม