โปรดดู EG Bull เป็นทางเลือก ขอบคุณ!
Kue เป็นคิวงานที่มีลำดับความสำคัญที่ได้รับการสนับสนุนโดย Redis สร้างขึ้นสำหรับ Node.js.
Protip นี่เป็นเอกสาร Kue ล่าสุดตรวจสอบให้แน่ใจว่าได้อ่าน The Changelist ด้วย
รุ่นล่าสุด:
$ npm install kue
สาขาหลัก:
$ npm install http://github.com/Automattic/kue/tarball/master
ก่อนอื่นสร้าง Queue
งานด้วย kue.createQueue()
:
var kue = require ( 'kue' )
, queue = kue . createQueue ( ) ;
การโทร queue.create()
ด้วยประเภทของงาน ("อีเมล") และข้อมูลงานโดยพลการจะส่งคืน Job
ซึ่งสามารถ save()
ED เพิ่มลงใน Redis ด้วยระดับความสำคัญเริ่มต้นของ "ปกติ" วิธี save()
เลือกรับการโทรกลับโดยตอบกลับด้วย error
หากมีอะไรผิดพลาด คีย์ title
เป็นแบบพิเศษและจะแสดงในรายการงานภายใน UI ทำให้หางานเฉพาะได้ง่ายขึ้น
var job = queue . create ( 'email' , {
title : 'welcome email for tj'
, to : '[email protected]'
, template : 'welcome-email'
} ) . save ( function ( err ) {
if ( ! err ) console . log ( job . id ) ;
} ) ;
ในการระบุลำดับความสำคัญของงานเพียงแค่เรียกใช้เมธอด priority()
ด้วยตัวเลขหรือชื่อลำดับความสำคัญซึ่งแมปกับตัวเลข
queue . create ( 'email' , {
title : 'welcome email for tj'
, to : '[email protected]'
, template : 'welcome-email'
} ) . priority ( 'high' ) . save ( ) ;
แผนที่ลำดับความสำคัญเริ่มต้นมีดังนี้:
{
low : 10
, normal : 0
, medium : - 5
, high : - 10
, critical : - 15
} ;
โดยค่าเริ่มต้นงานมีเพียงความพยายาม เดียว นั่นคือเมื่อพวกเขาล้มเหลวพวกเขาจะถูกทำเครื่องหมายว่าเป็นความล้มเหลวและยังคงเป็นเช่นนั้นจนกว่าคุณจะเข้าไปแทรกแซง อย่างไรก็ตาม Kue ช่วยให้คุณระบุสิ่งนี้ซึ่งเป็นสิ่งสำคัญสำหรับงานเช่นการถ่ายโอนอีเมลซึ่งเมื่อล้มเหลวอาจลองใหม่โดยไม่มีปัญหา ในการทำเช่นนี้เรียกใช้วิธี .attempts()
ด้วยตัวเลข
queue . create ( 'email' , {
title : 'welcome email for tj'
, to : '[email protected]'
, template : 'welcome-email'
} ) . priority ( 'high' ) . attempts ( 5 ) . save ( ) ;
ความพยายามในการลองใหม่จะเสร็จสิ้นทันทีที่พวกเขาล้มเหลวโดยไม่ล่าช้าแม้ว่างานของคุณจะมีความล่าช้าผ่าน Job#delay
หากคุณต้องการชะลองานที่ได้รับการยกเว้นจากความล้มเหลวอีกครั้ง (เรียกว่า backoff) คุณสามารถใช้วิธี Job#backoff
ในรูปแบบที่แตกต่างกัน:
// Honor job's original delay (if set) at each attempt, defaults to fixed backoff
job . attempts ( 3 ) . backoff ( true )
// Override delay value, fixed backoff
job . attempts ( 3 ) . backoff ( { delay : 60 * 1000 , type : 'fixed' } )
// Enable exponential backoff using original delay (if set)
job . attempts ( 3 ) . backoff ( { type : 'exponential' } )
// Use a function to get a customized next attempt delay value
job . attempts ( 3 ) . backoff ( function ( attempts , delay ) {
//attempts will correspond to the nth attempt failure so it will start with 0
//delay will be the amount of the last delay, not the initial delay unless attempts === 0
return my_customized_calculated_delay ;
} )
ในสถานการณ์สุดท้ายฟังก์ชั่นที่ให้ไว้จะถูกดำเนินการ (ผ่าน Eval) ในแต่ละครั้งเพื่อให้ได้ค่าล่าช้าครั้งต่อไปซึ่งหมายความว่าคุณไม่สามารถอ้างอิงตัวแปรภายนอก/บริบทภายในได้
ผู้ผลิตงานสามารถกำหนดค่าหมดอายุได้ในเวลาที่งานของพวกเขาสามารถอยู่ในสถานะที่ใช้งานอยู่ได้ดังนั้นหากคนงานไม่ตอบกลับในเวลาที่เหมาะสม Kue จะล้มเหลวด้วย TTL exceeded
ข้อความแสดงข้อผิดพลาดที่ป้องกันไม่ให้งานติดอยู่ในสถานะที่ใช้งานอยู่ พร้อมกัน
queue . create ( 'email' , { title : 'email job with TTL' } ) . ttl ( milliseconds ) . save ( ) ;
บันทึกเฉพาะงานช่วยให้คุณสามารถเปิดเผยข้อมูลไปยัง UI ได้ทุกจุดในเวลาตลอดชีวิตของงาน ในการทำเช่นนั้นเพียงเรียกใช้ job.log()
ซึ่งยอมรับสตริงข้อความเช่นเดียวกับตัวแปร arguments สำหรับการสนับสนุนที่มีลักษณะคล้าย Sprintf:
job . log ( '$%d sent to %s' , amount , user . name ) ;
หรือสิ่งอื่นใด (ใช้ util.inspect () ภายใน):
job . log ( { key : 'some key' , value : 10 } ) ;
job . log ( [ 1 , 2 , 3 , 5 , 8 ] ) ;
job . log ( 10.1 ) ;
ความคืบหน้าของงานมีประโยชน์อย่างยิ่งสำหรับงานที่ดำเนินการมานานเช่นการแปลงวิดีโอ หากต้องการอัปเดตความคืบหน้าของงานเพียงแค่เรียกใช้ job.progress(completed, total [, data])
:
job . progress ( frames , totalFrames ) ;
ข้อมูลสามารถใช้เพื่อส่งข้อมูลเพิ่มเติมเกี่ยวกับงาน ตัวอย่างเช่นข้อความหรือวัตถุที่มีข้อมูลบริบทพิเศษบางอย่างไปยังสถานะปัจจุบัน
กิจกรรมเฉพาะงานจะถูกไล่ออกจากอินสแตนซ์ Job
ผ่าน Redis Pubsub ปัจจุบันมีการสนับสนุนเหตุการณ์ต่อไปนี้:
enqueue
งานตอนนี้เข้าคิวstart
งานกำลังทำงานอยู่promotion
งานได้รับการเลื่อนตำแหน่งจากสถานะล่าช้าไปสู่คิวprogress
ความคืบหน้าของงานตั้งแต่ 0-100failed attempt
งานล้มเหลว แต่ยังมีความพยายามที่เหลืออยู่failed
ล้มเหลวและไม่มีความพยายามที่เหลืออยู่complete
remove
งานออกแล้วตัวอย่างเช่นสิ่งนี้อาจมีลักษณะดังต่อไปนี้:
var job = queue . create ( 'video conversion' , {
title : 'converting loki's to avi'
, user : 1
, frames : 200
} ) ;
job . on ( 'complete' , function ( result ) {
console . log ( 'Job completed with data ' , result ) ;
} ) . on ( 'failed attempt' , function ( errorMessage , doneAttempts ) {
console . log ( 'Job failed' ) ;
} ) . on ( 'failed' , function ( errorMessage ) {
console . log ( 'Job failed' ) ;
} ) . on ( 'progress' , function ( progress , data ) {
console . log ( 'r job #' + job . id + ' ' + progress + '% complete with data ' , data ) ;
} ) ;
โปรดทราบ ว่าเหตุการณ์ระดับงานจะไม่ได้รับการรับประกันว่าจะได้รับการรีสตาร์ทกระบวนการเนื่องจากกระบวนการรีสตาร์ท Node.js จะสูญเสียการอ้างอิงไปยังวัตถุงานที่เฉพาะเจาะจง หากคุณต้องการตัวจัดการเหตุการณ์ที่เชื่อถือได้มากขึ้นมองหากิจกรรมคิว
หมายเหตุ Kue เก็บวัตถุงานในหน่วยความจำจนกว่าพวกเขาจะเสร็จสมบูรณ์/ล้มเหลวเพื่อให้สามารถปล่อยเหตุการณ์ในพวกเขาได้ หากคุณมีงานพร้อมกันอย่างมากในงานที่ไม่สมบูรณ์ให้ปิดคุณสมบัตินี้และใช้กิจกรรมระดับคิวเพื่อการปรับขนาดหน่วยความจำที่ดีขึ้น
kue . createQueue ( { jobEvents : false } )
หรือคุณสามารถใช้ events
ฟังก์ชั่นระดับงานเพื่อควบคุมว่าเหตุการณ์จะถูกไล่ออกสำหรับงานในระดับงานหรือไม่
var job = queue . create ( 'test' ) . events ( false ) . save ( ) ;
เหตุการณ์ระดับคิวให้การเข้าถึงเหตุการณ์ระดับงานที่กล่าวถึงก่อนหน้านี้อย่างไรก็ตามกำหนดขอบเขตของอินสแตนซ์ Queue
เพื่อใช้ตรรกะในระดับ "ทั่วโลก" ตัวอย่างนี้คือการลบงานที่เสร็จสมบูรณ์:
queue . on ( 'job enqueue' , function ( id , type ) {
console . log ( 'Job %s got queued of type %s' , id , type ) ;
} ) . on ( 'job complete' , function ( id , result ) {
kue . Job . get ( id , function ( err , job ) {
if ( err ) return ;
job . remove ( function ( err ) {
if ( err ) throw err ;
console . log ( 'removed completed job #%d' , job . id ) ;
} ) ;
} ) ;
} ) ;
เหตุการณ์ที่มีอยู่นั้นเหมือนกับที่กล่าวไว้ใน "กิจกรรมงาน" อย่างไรก็ตามนำหน้าด้วย "งาน"
งานล่าช้าอาจถูกกำหนดให้เข้าคิวในระยะเวลาตามอำเภอใจโดยการเรียกใช้วิธี .delay(ms)
ผ่านจำนวนมิลลิวินาทีเมื่อเทียบกับ ตอนนี้ หรือคุณสามารถผ่านวัตถุ Date
JavaScript ที่มีเวลาเฉพาะในอนาคต สิ่งนี้จะตั้งค่า Job
โดยอัตโนมัติว่า "ล่าช้า"
var email = queue . create ( 'email' , {
title : 'Account renewal required'
, to : '[email protected]'
, template : 'renewal-email'
} ) . delay ( milliseconds )
. priority ( 'high' )
. save ( ) ;
Kue จะตรวจสอบงานล่าช้าด้วยตัวจับเวลาเพื่อโปรโมตพวกเขาหากเกินความล่าช้าตามกำหนดเวลาจะเริ่มต้นการตรวจสอบงาน 1,000 ตำแหน่งสูงสุดทุกวินาที
งานการประมวลผลนั้นง่ายด้วย Kue ก่อนอื่นสร้างอิน Queue
เหมือนกับที่เราทำเพื่อสร้างงานให้เราเข้าถึง Redis ฯลฯ จากนั้นเรียกใช้ queue.process()
ด้วยประเภทที่เกี่ยวข้อง โปรดทราบว่าไม่เหมือนกับชื่อ createQueue
ที่แนะนำในปัจจุบันมันส่งคืนอินสแตน Queue
ซิงเกิล ดังนั้นคุณสามารถกำหนดค่าและใช้วัตถุ Queue
เดียวภายในกระบวนการ Node.js ของคุณ
ในตัวอย่างต่อไปนี้เราส่งการโทร done
ไปยัง email
เมื่อเกิดข้อผิดพลาดเราเรียก done(err)
เพื่อบอกว่ามีบางอย่างเกิดขึ้นมิฉะนั้นเราเรียก done()
เฉพาะเมื่องานเสร็จสมบูรณ์ หากฟังก์ชั่นนี้ตอบสนองด้วยข้อผิดพลาดจะแสดงใน UI และงานจะถูกทำเครื่องหมายว่าเป็นความล้มเหลว วัตถุข้อผิดพลาดที่ส่งไปยังทำควรเป็น Error
ประเภทมาตรฐาน
var kue = require ( 'kue' )
, queue = kue . createQueue ( ) ;
queue . process ( 'email' , function ( job , done ) {
email ( job . data . to , done ) ;
} ) ;
function email ( address , done ) {
if ( ! isValidEmail ( address ) ) {
//done('invalid to address') is possible but discouraged
return done ( new Error ( 'invalid to address' ) ) ;
}
// email send stuff...
done ( ) ;
}
คนงานยังสามารถส่งผลงานเป็นพารามิเตอร์ที่สองที่ทำ done(null,result)
เพื่อจัดเก็บสิ่งนั้นไว้ใน Job.result
result
จะถูกส่งผ่านตัวจัดการเหตุการณ์ complete
เพื่อให้ผู้ผลิตงานสามารถรับได้หากพวกเขาต้องการ
โดยค่าเริ่มต้นการโทรไปยัง queue.process()
จะรับงานเดียวในเวลาสำหรับการประมวลผล สำหรับงานเล็ก ๆ เช่นการส่งอีเมลสิ่งนี้ไม่เหมาะดังนั้นเราอาจระบุงานที่ใช้งานสูงสุดสำหรับประเภทนี้โดยผ่านหมายเลข:
queue . process ( 'email' , 20 , function ( job , done ) {
// ...
} ) ;
คนงานสามารถหยุดชั่วคราวและดำเนินการต่อกิจกรรมได้ชั่วคราว นั่นคือหลังจากโทร pause
พวกเขาจะไม่ได้รับงานในกระบวนการโทรกลับของพวกเขาจนกว่าจะเรียก resume
ฟังก์ชั่น pause
ปิดเครื่องนี้อย่างสง่างามและใช้ฟังก์ชั่นภายในเช่นเดียวกับวิธี shutdown
ในการปิดตัวลงอย่างสง่างาม
queue . process ( 'email' , function ( job , ctx , done ) {
ctx . pause ( 5000 , function ( err ) {
console . log ( "Worker is paused... " ) ;
setTimeout ( function ( ) { ctx . resume ( ) ; } , 10000 ) ;
} ) ;
} ) ;
หมายเหตุ พารามิเตอร์ ctx
จาก kue >=0.9.0
เป็นอาร์กิวเมนต์ที่สองของฟังก์ชันการเรียกกลับกระบวนการและ done
อย่างแพร่หลายเสมอ
หมายเหตุ ลายเซ็นเมธอด pause
จะเปลี่ยนจาก kue >=0.9.0
เพื่อย้ายฟังก์ชันการโทรกลับไปยังสุดท้าย
สำหรับตัวอย่าง "ของจริง" สมมติว่าเราต้องรวบรวม PDF จากสไลด์จำนวนมากด้วยโหนด-แคนวาส งานของเราอาจประกอบด้วยข้อมูลต่อไปนี้โปรดทราบว่าโดยทั่วไปคุณ ไม่ ควรเก็บข้อมูลขนาดใหญ่ในงานด้วยตนเองจะดีกว่าที่จะจัดเก็บข้อมูลอ้างอิงเช่น IDs ดึงพวกเขาเข้ามาขณะประมวลผล
queue . create ( 'slideshow pdf' , {
title : user . name + "'s slideshow"
, slides : [ ... ] // keys to data stored in redis, mongodb, or some other store
} ) ;
เราสามารถเข้าถึงข้อมูลโดยพลการเดียวกันนี้ภายในกระบวนการแยกต่างหากในขณะที่การประมวลผลผ่านคุณสมบัติ job.data
ในตัวอย่างที่เราแสดงผลแต่ละสไลด์แบบทีละตัวอัปเดตบันทึกและความคืบหน้าของงาน
queue . process ( 'slideshow pdf' , 5 , function ( job , done ) {
var slides = job . data . slides
, len = slides . length ;
function next ( i ) {
var slide = slides [ i ] ; // pretend we did a query on this slide id ;)
job . log ( 'rendering %dx%d slide' , slide . width , slide . height ) ;
renderSlide ( slide , function ( err ) {
if ( err ) return done ( err ) ;
job . progress ( i , len , { nextSlide : i == len ? 'itsdone' : i + 1 } ) ;
if ( i == len ) done ( )
else next ( i + 1 ) ;
} ) ;
}
next ( 0 ) ;
} ) ;
Queue#shutdown([timeout,] fn)
ส่งสัญญาณว่าคนงานทุกคนหยุดการประมวลผลหลังจากงานที่ใช้งานอยู่ในปัจจุบันเสร็จสิ้น คนงานจะรอมิลลิวินาที timeout
สำหรับงานที่ใช้งานอยู่ที่จะถูกเรียกหรือทำเครื่องหมายงานที่ใช้งาน failed
ด้วยเหตุผลข้อผิดพลาดในการปิด เมื่อคนงานทุกคนบอก Kue ว่าพวกเขาหยุด fn
var queue = require ( 'kue' ) . createQueue ( ) ;
process . once ( 'SIGTERM' , function ( sig ) {
queue . shutdown ( 5000 , function ( err ) {
console . log ( 'Kue shutdown: ' , err || '' ) ;
process . exit ( 0 ) ;
} ) ;
} ) ;
โปรดทราบ ว่าลายเซ็นวิธี shutdown
จะเปลี่ยนจาก kue >=0.9.0
เพื่อย้ายฟังก์ชั่นการโทรกลับไปยังล่าสุด
ข้อผิดพลาดทั้งหมดในไลบรารีไคลเอ็นต์ Redis หรือคิวจะถูกปล่อยออกมาในวัตถุ Queue
คุณควรผูกกับเหตุการณ์ error
เพื่อป้องกันข้อยกเว้นที่ไม่ถูกต้องหรือข้อผิดพลาด Kue
var queue = require ( 'kue' ) . createQueue ( ) ;
queue . on ( 'error' , function ( err ) {
console . log ( 'Oops... ' , err ) ;
} ) ;
Kue ทำเครื่องหมายงานที่สมบูรณ์/ล้มเหลวเมื่อ done
โดยคนงานของคุณดังนั้นคุณควรใช้การจัดการข้อผิดพลาดที่เหมาะสมเพื่อป้องกันข้อยกเว้นที่ไม่ถูกต้องในรหัสของคนงานและกระบวนการ Node.js ที่ออกก่อนในการจัดการงาน สิ่งนี้สามารถทำได้ในสองวิธี:
queue . process ( 'my-error-prone-task' , function ( job , done ) {
var domain = require ( 'domain' ) . create ( ) ;
domain . on ( 'error' , function ( err ) {
done ( err ) ;
} ) ;
domain . run ( function ( ) { // your process function
throw new Error ( 'bad things happen' ) ;
done ( ) ;
} ) ;
} ) ;
ข้อสังเกต - โดเมนเลิกใช้งานจาก NodeJs ด้วย ความเสถียร 0 และไม่แนะนำให้ใช้
นี่เป็นทางออกที่นุ่มที่สุดและดีที่สุด แต่ไม่ได้อยู่ใน Kue โปรดดูการสนทนานี้ คุณสามารถแสดงความคิดเห็นเกี่ยวกับฟีเจอร์นี้ในปัญหาเปิด KUE ที่เกี่ยวข้อง
นอกจากนี้คุณยังสามารถใช้สัญญาที่จะทำบางอย่างเช่น
queue . process ( 'my-error-prone-task' , function ( job , done ) {
Promise . method ( function ( ) { // your process function
throw new Error ( 'bad things happen' ) ;
} ) ( ) . nodeify ( done )
} ) ;
แต่สิ่งนี้จะไม่จับข้อยกเว้นในสแต็กการเรียก Async ของคุณเป็นโดเมน
uncaughtException
และปิด Kue อย่างสง่างาม แต่นี่ไม่ใช่ข้อผิดพลาดที่แนะนำในการจัดการสำนวนใน JavaScript เนื่องจากคุณสูญเสียบริบทข้อผิดพลาด process . once ( 'uncaughtException' , function ( err ) {
console . error ( 'Something bad happened: ' , err ) ;
queue . shutdown ( 1000 , function ( err2 ) {
console . error ( 'Kue shutdown result: ' , err2 || 'OK' ) ;
process . exit ( 0 ) ;
} ) ;
} ) ;
ปัจจุบัน Kue ใช้การจัดการสถานะงานด้านลูกค้าและเมื่อ Redis ชนกันในช่วงกลางของการดำเนินงานนั้นงานที่ติดอยู่หรือความไม่สอดคล้องของดัชนีจะเกิดขึ้น ผลที่ตามมาคือจำนวนงานบางอย่างจะติดอยู่และถูกดึงออกมาโดยคนงานเฉพาะเมื่อมีการสร้างงานใหม่หากไม่มีการสร้างงานใหม่อีกต่อไปพวกเขาก็ติดอยู่ตลอดไป ดังนั้นเราขอแนะนำ อย่างยิ่ง ให้คุณเรียกใช้ Watchdog เพื่อแก้ไขปัญหานี้โดยการโทร:
queue . watchStuckJobs ( interval )
interval
อยู่ในมิลลิวินาทีและค่าเริ่มต้นเป็น 1,000ms
KUE จะได้รับการปรับปรุงใหม่เพื่อการจัดการสถานะงานอะตอมอย่างเต็มที่จากเวอร์ชัน 1.0 และสิ่งนี้จะเกิดขึ้นโดยสคริปต์ LUA และ/หรือการรวมกันของ Brpoplpush คุณสามารถอ่านเพิ่มเติมได้ที่นี่และที่นี่
วัตถุคิวมีวิธีการสองประเภทที่จะบอกคุณเกี่ยวกับจำนวนงานในแต่ละรัฐ
queue . inactiveCount ( function ( err , total ) { // others are activeCount, completeCount, failedCount, delayedCount
if ( total > 100000 ) {
console . log ( 'We need some back pressure here' ) ;
}
} ) ;
นอกจากนี้คุณยังสามารถสอบถามเกี่ยวกับประเภทงานที่เฉพาะเจาะจง:
queue . failedCount ( 'my-critical-job' , function ( err , total ) {
if ( total > 10000 ) {
console . log ( 'This is tOoOo bad' ) ;
}
} ) ;
และวนซ้ำรหัสงาน
queue . inactive ( function ( err , ids ) { // others are active, complete, failed, delayed
// you may want to fetch each id to get the Job object out of it...
} ) ;
อย่างไรก็ตามอันที่สองไม่ได้ปรับขนาดเป็นขนาดใหญ่คุณสามารถใช้วิธี Job
แบบคงที่เฉพาะเจาะจงมากขึ้น:
kue . Job . rangeByState ( 'failed' , 0 , n , 'asc' , function ( err , jobs ) {
// you have an array of maximum n Job objects here
} ) ;
หรือ
kue . Job . rangeByType ( 'my-job-type' , 'failed' , 0 , n , 'asc' , function ( err , jobs ) {
// you have an array of maximum n Job objects here
} ) ;
โปรดทราบ ว่าสองวิธีสุดท้ายอาจมีการเปลี่ยนแปลงในรุ่น Kue ในภายหลัง
หากคุณไม่ได้อยู่ข้างบนในส่วนการจัดการข้อผิดพลาดหรือกระบวนการของคุณสูญเสียงานที่ใช้งานอยู่ แต่อย่างใดคุณสามารถกู้คืนจากพวกเขาเมื่อกระบวนการของคุณเริ่มต้นใหม่ ตรรกะตาบอดคือการสร้างงานที่ติดอยู่ทั้งหมดอีกครั้ง:
queue . active ( function ( err , ids ) {
ids . forEach ( function ( id ) {
kue . Job . get ( id , function ( err , job ) {
// Your application should check if job is a stuck one
job . inactive ( ) ;
} ) ;
} ) ;
} ) ;
หมายเหตุ ในการปรับใช้แบบคลัสเตอร์แอปพลิเคชันของคุณควรทราบว่าจะไม่เกี่ยวข้องกับงานที่ถูกต้องโดยคนงานคนอื่น ๆ
ข้อมูลงานและดัชนีการค้นหากินพื้นที่หน่วยความจำ Redis ดังนั้นคุณจะต้องใช้กระบวนการรักษางานในการปรับใช้ในโลกแห่งความเป็นจริง โอกาสครั้งแรกของคุณคือการใช้การกำจัดงานอัตโนมัติเมื่อเสร็จสิ้น
queue . create ( ... ) . removeOnComplete ( true ) . save ( )
แต่ในที่สุดถ้าคุณต้องการข้อมูลงานที่เสร็จสมบูรณ์ในที่สุดคุณสามารถตั้งค่าสคริปต์การกำจัดงานตามความต้องการได้เช่นด้านล่างเพื่อลบงานที่เสร็จสมบูรณ์ Top n
ที่เสร็จสมบูรณ์:
kue . Job . rangeByState ( 'complete' , 0 , n , 'asc' , function ( err , jobs ) {
jobs . forEach ( function ( job ) {
job . remove ( function ( ) {
console . log ( 'removed ' , job . id ) ;
} ) ;
} ) ;
} ) ;
โปรดทราบ ว่าคุณควรให้เวลามากพอสำหรับการเรียกใช้การโทร .remove
ในแต่ละวัตถุงานเพื่อให้เสร็จสมบูรณ์ก่อนที่กระบวนการของคุณจะออกหรือดัชนีงานจะรั่วไหล
โดยค่าเริ่มต้น Kue จะเชื่อมต่อกับ Redis โดยใช้การตั้งค่าเริ่มต้นของไคลเอ็นต์ (ค่าเริ่มต้นพอร์ตเป็น 6379
, ค่าเริ่มต้นโฮสต์เป็น 127.0.0.1
, คำนำหน้าค่าเริ่มต้นเป็น q
) Queue#createQueue(options)
ยอมรับตัวเลือกการเชื่อมต่อ REDIS ใน options.redis
รหัส REDIS
var kue = require ( 'kue' ) ;
var q = kue . createQueue ( {
prefix : 'q' ,
redis : {
port : 1234 ,
host : '10.0.50.20' ,
auth : 'password' ,
db : 3 , // if provided select a non-default redis db
options : {
// see https://github.com/mranney/node_redis#rediscreateclient
}
}
} ) ;
คำนำ prefix
ควบคุมชื่อคีย์ที่ใช้ใน Redis โดยค่าเริ่มต้นนี่เป็นเพียง q
คำนำหน้าโดยทั่วไปไม่ควรเปลี่ยนแปลงเว้นแต่คุณจะต้องใช้หนึ่งอินสแตนซ์ Redis หนึ่งอินสแตนซ์สำหรับหลายแอพ นอกจากนี้ยังมีประโยชน์สำหรับการให้การทดสอบแยกต่างหากในแอปพลิเคชันหลักของคุณ
คุณยังสามารถระบุข้อมูลการเชื่อมต่อเป็นสตริง URL
var q = kue . createQueue ( {
redis : 'redis://example.com:1234?redis_option=value&redis_option=value'
} ) ;
เนื่องจาก Node_Redis รองรับซ็อกเก็ตโดเมน Unix คุณสามารถบอก Kue ให้ทำเช่นนั้นได้ ดู Unix-Domain-Socket สำหรับการกำหนดค่าเซิร์ฟเวอร์ Redis ของคุณ
var kue = require ( 'kue' ) ;
var q = kue . createQueue ( {
prefix : 'q' ,
redis : {
socket : '/data/sockets/redis.sock' ,
auth : 'password' ,
options : {
// see https://github.com/mranney/node_redis#rediscreateclient
}
}
} ) ;
ไลบรารีไคลเอนต์ Redis node.js ใด ๆ ที่สอดคล้อง (หรือเมื่อดัดแปลง) เป็น node_redis API สามารถฉีดเข้าไปใน Kue ได้ คุณควรจัดเตรียมฟังก์ชั่น createClientFactory
เป็นโรงงานเชื่อมต่อ Redis แทนการให้ตัวเลือกการเชื่อมต่อ Node_Redis
ด้านล่างนี้เป็นรหัสตัวอย่างเพื่อเปิดใช้งาน Redis-Sentinel เพื่อเชื่อมต่อกับ Redis Sentinel สำหรับการล้มเหลวของ Master/Slave อัตโนมัติ
var kue = require ( 'kue' ) ;
var Sentinel = require ( 'redis-sentinel' ) ;
var endpoints = [
{ host : '192.168.1.10' , port : 6379 } ,
{ host : '192.168.1.11' , port : 6379 }
] ;
var opts = options || { } ; // Standard node_redis client options
var masterName = 'mymaster' ;
var sentinel = Sentinel . Sentinel ( endpoints ) ;
var q = kue . createQueue ( {
redis : {
createClientFactory : function ( ) {
return sentinel . createClient ( masterName , opts ) ;
}
}
} ) ;
โปรดทราบ ว่ารหัสไคลเอ็นต์ <0.8.x
ทั้งหมดควรได้รับการ refactored เพื่อส่งตัวเลือก Redis ไปยัง Queue#createQueue
แทนที่จะเป็นรูปแบบแพทช์ลิงที่เอาชนะ redis#createClient
หรือพวกเขาจะถูกทำลายจาก Kue 0.8.x
var Redis = require ( 'ioredis' ) ;
var kue = require ( 'kue' ) ;
// using https://github.com/72squared/vagrant-redis-cluster
var queue = kue . createQueue ( {
redis : {
createClientFactory : function ( ) {
return new Redis . Cluster ( [ {
port : 7000
} , {
port : 7001
} ] ) ;
}
}
} ) ;
UI เป็นแอปพลิเคชั่นด่วนขนาดเล็ก สคริปต์มีให้ใน bin/
สำหรับการรันอินเทอร์เฟซเป็นแอปพลิเคชันแบบสแตนด์อโลนที่มีการตั้งค่าเริ่มต้น คุณอาจส่งผ่านตัวเลือกสำหรับพอร์ต Redis-URL และคำนำหน้า ตัวอย่างเช่น:
node_modules/kue/bin/kue-dashboard -p 3050 -r redis://127.0.0.1:3000 -q prefix
คุณสามารถยิงได้จากภายในแอปพลิเคชันอื่นเช่นกัน:
var kue = require ( 'kue' ) ;
kue . createQueue ( ... ) ;
kue . app . listen ( 3000 ) ;
ชื่อเริ่มต้นเป็น "kue" เพื่อเปลี่ยนการเรียกใช้นี้:
kue . app . set ( 'title' , 'My Application' ) ;
โปรดทราบ ว่าหากคุณใช้ตัวเลือก Kue ที่ไม่ใช่ Default kue.createQueue(...)
จะต้องเรียกก่อนที่จะเข้าถึง kue.app
นอกจากนี้คุณยังสามารถใช้ Kue-Ui Web Interface ที่สนับสนุนโดย Arnaud Bénard
พร้อมกับ UI Kue ยังเผยให้เห็น JSON API ซึ่งใช้โดย UI
Query Jobs ตัวอย่างเช่น "รับ /งาน /ค้นหา? q = วิดีโอ avi":
[ "5" , "7" , "10" ]
โดยค่าเริ่มต้น KUE ดัชนีวัตถุข้อมูลงานทั้งหมดสำหรับการค้นหา แต่สามารถปรับแต่งได้ผ่านการโทรหา Job#searchKeys
เพื่อบอก Kue ว่าปุ่มใดในข้อมูลงานเพื่อสร้างดัชนีสำหรับ:
var kue = require ( 'kue' ) ;
queue = kue . createQueue ( ) ;
queue . create ( 'email' , {
title : 'welcome email for tj'
, to : '[email protected]'
, template : 'welcome-email'
} ) . searchKeys ( [ 'to' , 'title' ] ) . save ( ) ;
คุณลักษณะการค้นหาถูกปิดโดยค่าเริ่มต้นจาก kue >=0.9.0
อ่านเพิ่มเติมเกี่ยวกับเรื่องนี้ที่นี่ คุณควรเปิดใช้งานดัชนีการค้นหาและเพิ่มสีแดงในการพึ่งพาของคุณหากคุณต้องการ:
var kue = require ( 'kue' ) ;
q = kue . createQueue ( {
disableSearch : false
} ) ;
npm install reds --save
ปัจจุบันตอบสนองด้วยการนับรัฐและเวลากิจกรรมของคนงานในมิลลิวินาที:
{ "inactiveCount" : 4 , "completeCount" : 69 , "activeCount" : 2 , "failedCount" : 0 , "workTime" : 20892 }
รับงานโดย :id
:
{ "id" : "3" , "type" : "email" , "data" : { "title" : "welcome email for tj" , "to" : "[email protected]" , "template" : "welcome-email" } , "priority" : - 10 , "progress" : "100" , "state" : "complete" , "attempts" : null , "created_at" : "1309973155248" , "updated_at" : "1309973155248" , "duration" : "15002" }
รับงาน :id
:
[ 'foo' , 'bar' , 'baz' ]
รับงานที่มีช่วงที่ระบุ :from
ถึง :to
เช่น "/jobs/0..2", ที่ไหน :order
อาจเป็น "asc" หรือ "desc":
[ { "id" : "12" , "type" : "email" , "data" : { "title" : "welcome email for tj" , "to" : "[email protected]" , "template" : "welcome-email" } , "priority" : - 10 , "progress" : 0 , "state" : "active" , "attempts" : null , "created_at" : "1309973299293" , "updated_at" : "1309973299293" } , { "id" : "130" , "type" : "email" , "data" : { "title" : "welcome email for tj" , "to" : "[email protected]" , "template" : "welcome-email" } , "priority" : - 10 , "progress" : 0 , "state" : "active" , "attempts" : null , "created_at" : "1309975157291" , "updated_at" : "1309975157291" } ]
เช่นเดียวกับข้างต้นการ จำกัด โดย :state
ซึ่งเป็นหนึ่งใน:
- active
- inactive
- failed
- complete
เช่นเดียวกับข้างต้นอย่างไรก็ตาม จำกัด :type
และ :state
ลบงาน :id
:
$ curl -X DELETE http://local:3000/job/2
{"message":"job 2 removed"}
สร้างงาน:
$ curl -H "Content-Type: application/json" -X POST -d
'{
"type": "email",
"data": {
"title": "welcome email for tj",
"to": "[email protected]",
"template": "welcome-email"
},
"options" : {
"attempts": 5,
"priority": "high"
}
}' http://localhost:3000/job
{"message": "job created", "id": 3}
คุณสามารถสร้างงานหลายงานพร้อมกันโดยผ่านอาร์เรย์ ในกรณีนี้การตอบสนองจะเป็นอาร์เรย์ด้วยเช่นกันเพื่อรักษาคำสั่งซื้อ:
$ curl -H "Content-Type: application/json" -X POST -d
'[{
"type": "email",
"data": {
"title": "welcome email for tj",
"to": "[email protected]",
"template": "welcome-email"
},
"options" : {
"attempts": 5,
"priority": "high"
}
},
{
"type": "email",
"data": {
"title": "followup email for tj",
"to": "[email protected]",
"template": "followup-email"
},
"options" : {
"delay": 86400,
"attempts": 5,
"priority": "high"
}
}]' http://localhost:3000/job
[
{"message": "job created", "id": 4},
{"message": "job created", "id": 5}
]
หมายเหตุ: เมื่อแทรกงานหลายงานเป็นจำนวนมากหากการแทรกหนึ่งล้มเหลว Kue จะประมวลผลงานที่เหลืออยู่ตามลำดับ อาร์เรย์การตอบกลับจะมี IDS ของงานที่เพิ่มสำเร็จและองค์ประกอบที่ล้มเหลวใด ๆ จะเป็นวัตถุที่อธิบายข้อผิดพลาด: {"error": "error reason"}
ตัวอย่างด้านล่างแสดงวิธีที่คุณอาจใช้คลัสเตอร์เพื่อกระจายโหลดการประมวลผลงานข้าม CPU โปรดดูเอกสารของคลัสเตอร์โมดูลสำหรับตัวอย่างรายละเอียดเพิ่มเติมเกี่ยวกับการใช้งาน
เมื่อคลัสเตอร์ .isMaster
ไฟล์จะถูกดำเนินการในบริบทของกระบวนการหลักซึ่งในกรณีนี้คุณสามารถทำงานที่คุณต้องการเพียงครั้งเดียวเช่นการเริ่มต้นแอปแอพที่รวมกับ Kue ตรรกะในบล็อก else
ดำเนินการ ต่อคนงาน
var kue = require ( 'kue' )
, cluster = require ( 'cluster' )
, queue = kue . createQueue ( ) ;
var clusterWorkerSize = require ( 'os' ) . cpus ( ) . length ;
if ( cluster . isMaster ) {
kue . app . listen ( 3000 ) ;
for ( var i = 0 ; i < clusterWorkerSize ; i ++ ) {
cluster . fork ( ) ;
}
} else {
queue . process ( 'email' , 10 , function ( job , done ) {
var pending = 5
, total = pending ;
var interval = setInterval ( function ( ) {
job . log ( 'sending!' ) ;
job . progress ( total - pending , total ) ;
-- pending || done ( ) ;
pending || clearInterval ( interval ) ;
} , 1000 ) ;
} ) ;
}
สิ่งนี้จะสร้างตัวประมวลผลงาน email
(ผู้ปฏิบัติงาน) ต่อแต่ละแกน CPU ของเครื่อง CPU ของคุณโดยแต่ละรายการคุณสามารถจัดการงานอีเมลพร้อมกันได้ 10 ตำแหน่งซึ่งนำไปสู่งานอีเมลพร้อมกัน 10 * N
พร้อมกันที่ประมวลผลในเครื่อง N
ของคุณ
ตอนนี้เมื่อคุณเยี่ยมชม UI ของ Kue ในเบราว์เซอร์คุณจะเห็นว่างานกำลังดำเนินการเร็วขึ้นประมาณ N
เท่า! (ถ้าคุณมี N
คอร์)
ผ่านการใช้แอพคุณสามารถปรับแต่งเว็บแอปพลิเคชันเปิดใช้งาน TLS หรือเพิ่มมิดเดิลแวร์เพิ่มเติมเช่น basic-auth-connect
$ npm install --save basic-auth-connect
var basicAuth = require ( 'basic-auth-connect' ) ;
var app = express . createServer ( { ... tls options ... } ) ;
app . use ( basicAuth ( 'foo' , 'bar' ) ) ;
app . use ( kue . app ) ;
app . listen ( 3000 ) ;
เปิดใช้งานโหมดทดสอบเพื่อผลักดันงานทั้งหมดให้เป็นอาร์เรย์ jobs
ทำการยืนยันกับงานในอาร์เรย์นั้นเพื่อให้แน่ใจว่ารหัสภายใต้การทดสอบเป็นงานที่ถูกต้องอย่างถูกต้อง
queue = require ( 'kue' ) . createQueue ( ) ;
before ( function ( ) {
queue . testMode . enter ( ) ;
} ) ;
afterEach ( function ( ) {
queue . testMode . clear ( ) ;
} ) ;
after ( function ( ) {
queue . testMode . exit ( )
} ) ;
it ( 'does something cool' , function ( ) {
queue . createJob ( 'myJob' , { foo : 'bar' } ) . save ( ) ;
queue . createJob ( 'anotherJob' , { baz : 'bip' } ) . save ( ) ;
expect ( queue . testMode . jobs . length ) . to . equal ( 2 ) ;
expect ( queue . testMode . jobs [ 0 ] . type ) . to . equal ( 'myJob' ) ;
expect ( queue . testMode . jobs [ 0 ] . data ) . to . eql ( { foo : 'bar' } ) ;
} ) ;
สำคัญ: โดยเริ่มต้นงานจะไม่ถูกประมวลผลเมื่อสร้างขึ้นในโหมดทดสอบ คุณสามารถเปิดใช้งานการประมวลผลงานได้โดยส่งจริงไปยัง testmode.enter
before ( function ( ) {
queue . testMode . enter ( true ) ;
} ) ;
เรารักการมีส่วนร่วม!
เมื่อมีส่วนร่วมให้ปฏิบัติตามกฎง่ายๆ:
(ใบอนุญาต MIT)
ลิขสิทธิ์ (c) 2011 LearnBoost <[email protected]>
ได้รับอนุญาตโดยไม่เสียค่าใช้จ่ายสำหรับบุคคลใด ๆ ที่ได้รับสำเนาซอฟต์แวร์นี้และไฟล์เอกสารที่เกี่ยวข้อง ('ซอฟต์แวร์') เพื่อจัดการในซอฟต์แวร์โดยไม่มีการ จำกัด รวมถึง แต่ไม่ จำกัด เฉพาะสิทธิ์ในการใช้คัดลอกแก้ไขผสาน เผยแพร่แจกจ่าย sublicense และ/หรือขายสำเนาของซอฟต์แวร์และอนุญาตให้บุคคลที่ซอฟต์แวร์ได้รับการตกแต่งให้ทำเช่นนั้นภายใต้เงื่อนไขดังต่อไปนี้:
ประกาศลิขสิทธิ์ข้างต้นและประกาศการอนุญาตนี้จะรวมอยู่ในสำเนาทั้งหมดหรือส่วนสำคัญของซอฟต์แวร์
ซอฟต์แวร์มีให้ 'ตามที่เป็นอยู่' โดยไม่มีการรับประกันใด ๆ ไม่ว่าโดยชัดแจ้งหรือโดยนัยรวมถึง แต่ไม่ จำกัด เพียงการรับประกันความสามารถในการค้าการออกกำลังกายเพื่อวัตถุประสงค์เฉพาะและการไม่เข้าร่วม ไม่ว่าในกรณีใดผู้เขียนหรือผู้ถือลิขสิทธิ์จะต้องรับผิดชอบต่อการเรียกร้องความเสียหายหรือความรับผิดอื่น ๆ ไม่ว่าจะเป็นการกระทำของสัญญาการละเมิดหรืออื่น ๆ ซอฟต์แวร์.