يرجى الاطلاع على مثل الثور كبديل. شكرًا لك!
Kue عبارة عن قائمة انتظار ذات أولوية مدعومة بـ Redis ، تم تصميمها لـ Node.js.
Protip هذا هو أحدث وثائق Kue ، تأكد من قراءة Changelist أيضًا.
آخر إصدار:
$ npm install kue
فرع الماجستير:
$ npm install http://github.com/Automattic/kue/tarball/master
قم أولاً بإنشاء Queue
وظيفة مع kue.createQueue()
:
var kue = require ( 'kue' )
, queue = kue . createQueue ( ) ;
save()
queue.create()
Job
تقبل طريقة save()
اختياريًا رد الاتصال ، ويستجيب error
إذا حدث خطأ ما. مفتاح title
مغلف بشكل خاص ، وسيتم عرضه في قوائم الوظائف داخل واجهة المستخدم ، مما يسهل العثور على وظيفة محددة.
var job = queue . create ( 'email' , {
title : 'welcome email for tj'
, to : '[email protected]'
, template : 'welcome-email'
} ) . save ( function ( err ) {
if ( ! err ) console . log ( job . id ) ;
} ) ;
لتحديد أولوية الوظيفة ، ما عليك سوى استدعاء طريقة priority()
برقم أو اسم الأولوية ، والذي يتم تعيينه على رقم.
queue . create ( 'email' , {
title : 'welcome email for tj'
, to : '[email protected]'
, template : 'welcome-email'
} ) . priority ( 'high' ) . save ( ) ;
خريطة الأولوية الافتراضية هي كما يلي:
{
low : 10
, normal : 0
, medium : - 5
, high : - 10
, critical : - 15
} ;
بشكل افتراضي ، لا تملك سوى محاولة واحدة ، فهذا عندما يفشلون ، يتم تمييزها على أنها فشل ، وتبقى على هذا النحو حتى تتدخل. ومع ذلك ، يسمح لك Kue بتحديد هذا ، وهو أمر مهم بالنسبة للوظائف مثل نقل بريد إلكتروني ، والذي قد يتم إعادة محاكمته عند الفشل دون مشكلة. للقيام بذلك استدعاء طريقة .attempts()
مع عدد.
queue . create ( 'email' , {
title : 'welcome email for tj'
, to : '[email protected]'
, template : 'welcome-email'
} ) . priority ( 'high' ) . attempts ( 5 ) . save ( ) ;
تتم محاولات إعادة محاولة الوظائف بمجرد فشلها ، دون تأخير ، حتى لو كان وظيفتك قد تم تعيين تأخير عبر Job#delay
. إذا كنت ترغب في تأخير إعادة الاعتراض على الوظيفة عند الإخفاقات (المعروفة باسم Backoff) ، يمكنك استخدام طريقة Job#backoff
بطرق مختلفة:
// Honor job's original delay (if set) at each attempt, defaults to fixed backoff
job . attempts ( 3 ) . backoff ( true )
// Override delay value, fixed backoff
job . attempts ( 3 ) . backoff ( { delay : 60 * 1000 , type : 'fixed' } )
// Enable exponential backoff using original delay (if set)
job . attempts ( 3 ) . backoff ( { type : 'exponential' } )
// Use a function to get a customized next attempt delay value
job . attempts ( 3 ) . backoff ( function ( attempts , delay ) {
//attempts will correspond to the nth attempt failure so it will start with 0
//delay will be the amount of the last delay, not the initial delay unless attempts === 0
return my_customized_calculated_delay ;
} )
في السيناريو الأخير ، سيتم تنفيذ الوظيفة المتوفرة (عبر eval) على كل إعادة تراجع للحصول على قيمة تأخير المحاولة التالية ، مما يعني أنه لا يمكنك الرجوع إلى متغيرات خارجية/سياق داخلها.
يمكن أن يقوم منتجو الوظائف بتعيين قيمة انتهاء الصلاحية للوقت الذي يمكن أن يعيش فيه وظيفتهم في حالة نشطة ، بحيث إذا لم يرد العمال في الوقت المناسب ، ستفشل Kue مع TTL exceeded
رسالة الخطأ التي تمنع تلك الوظيفة من التعرض للوقوع في حالة نشطة وإفساد التزامن.
queue . create ( 'email' , { title : 'email job with TTL' } ) . ttl ( milliseconds ) . save ( ) ;
تمكنك سجلات الوظائف الخاصة بك من فضح المعلومات إلى واجهة المستخدم في أي وقت من العمر في الوظيفة. للقيام بذلك ببساطة استدعاء job.log()
، الذي يقبل سلسلة الرسائل بالإضافة إلى الحجج المتغيرة للدعم الشبيه بـ SprintF:
job . log ( '$%d sent to %s' , amount , user . name ) ;
أو أي شيء آخر (يستخدم util.inspect () داخليًا):
job . log ( { key : 'some key' , value : 10 } ) ;
job . log ( [ 1 , 2 , 3 , 5 , 8 ] ) ;
job . log ( 10.1 ) ;
تقدم الوظيفة مفيد للغاية للوظائف طويلة الأمد مثل تحويل الفيديو. لتحديث تقدم الوظيفة ببساطة استدعاء job.progress(completed, total [, data])
:
job . progress ( frames , totalFrames ) ;
يمكن استخدام البيانات لتمرير معلومات إضافية حول الوظيفة. على سبيل المثال ، رسالة أو كائن مع بعض البيانات السياقية الإضافية إلى الحالة الحالية.
يتم إطلاق الأحداث الخاصة بالوظيفة على مثيلات Job
عبر Redis PubSub. الأحداث التالية مدعومة حاليًا:
enqueue
المهمة الآن في قائمة الانتظارstart
المهمة تعمل الآنpromotion
يتم الترويج للوظيفة من تأخر الدولة إلى قائمة الانتظارprogress
تقدم الوظيفة الذي يتراوح بين 0-100failed attempt
قد فشلت الوظيفة ، ولكنها لم تحاول حتى الآنfailed
المهمة فشلت وليس لديها محاولات متبقيةcomplete
المهمة قد اكتملremove
المهمةعلى سبيل المثال ، قد يبدو هذا مثل ما يلي:
var job = queue . create ( 'video conversion' , {
title : 'converting loki's to avi'
, user : 1
, frames : 200
} ) ;
job . on ( 'complete' , function ( result ) {
console . log ( 'Job completed with data ' , result ) ;
} ) . on ( 'failed attempt' , function ( errorMessage , doneAttempts ) {
console . log ( 'Job failed' ) ;
} ) . on ( 'failed' , function ( errorMessage ) {
console . log ( 'Job failed' ) ;
} ) . on ( 'progress' , function ( progress , data ) {
console . log ( 'r job #' + job . id + ' ' + progress + '% complete with data ' , data ) ;
} ) ;
لاحظ أن أحداث مستوى الوظيفة ليست مضمونة عند إعادة تشغيل عملية إعادة التشغيل ، نظرًا لأن عملية إعادة تشغيل Node.js ستفقد الإشارة إلى كائن الوظيفة المحدد. إذا كنت تريد معالج أحداث أكثر موثوقية البحث عن أحداث قائمة الانتظار.
لاحظ Kue تخزن كائنات الوظيفة في الذاكرة حتى يتم إكمالها/فشلت في أن تكون قادرة على إصدار الأحداث عليها. إذا كان لديك تزامن كبير في وظائف غير مكتملة ، فقم بإيقاف تشغيل هذه الميزة واستخدم أحداث مستوى قائمة الانتظار لتحديد تحجيم الذاكرة بشكل أفضل.
kue . createQueue ( { jobEvents : false } )
بدلاً من ذلك ، يمكنك استخدام events
وظيفة مستوى الوظيفة للتحكم في ما إذا كانت الأحداث قد تم إطلاقها لوظيفة على مستوى الوظيفة.
var job = queue . create ( 'test' ) . events ( false ) . save ( ) ;
توفر أحداث مستوى قائمة الانتظار الوصول إلى الأحداث على مستوى الوظيفة المذكورة سابقًا ، ولكن تم تحديدها إلى مثيل Queue
لتطبيق المنطق على المستوى "العالمي". مثال على ذلك هو إزالة الوظائف المكتملة:
queue . on ( 'job enqueue' , function ( id , type ) {
console . log ( 'Job %s got queued of type %s' , id , type ) ;
} ) . on ( 'job complete' , function ( id , result ) {
kue . Job . get ( id , function ( err , job ) {
if ( err ) return ;
job . remove ( function ( err ) {
if ( err ) throw err ;
console . log ( 'removed completed job #%d' , job . id ) ;
} ) ;
} ) ;
} ) ;
الأحداث المتاحة هي نفسها المذكورة في "أحداث الوظائف" ، ولكن مسبوقة بـ "Job".
قد يتم تحديد موقع الوظائف المتأخرة للوصول إلى مسافة تعسفية في الوقت المناسب من خلال استدعاء طريقة .delay(ms)
، مرور عدد المللي ثانية بالنسبة إلى الآن . بدلاً من ذلك ، يمكنك تمرير كائن Date
JavaScript مع وقت محدد في المستقبل. هذا يعلق تلقائيا Job
على أنها "تأخر".
var email = queue . create ( 'email' , {
title : 'Account renewal required'
, to : '[email protected]'
, template : 'renewal-email'
} ) . delay ( milliseconds )
. priority ( 'high' )
. save ( ) ;
ستقوم Kue بالتحقق من الوظائف المتأخرة مع مؤقت ، وترويجها إذا تم تجاوز التأخير المجدولة ، مما يتخلف عن فحص أفضل 1000 وظيفة كل ثانية.
معالجة الوظائف بسيطة مع Kue. قم أولاً بإنشاء مثيل Queue
مثلما نفعله لإنشاء فرص عمل ، وتوفير إمكانية الوصول إلى redis وما إلى ذلك ، ثم استدعاء قائمة queue.process()
. لاحظ أنه بخلاف ما يقترحه الاسم createQueue
، فإنه يعيد حاليًا مثيل قائمة Queue
Singleton. حتى تتمكن من تكوين واستخدام كائن Queue
واحد فقط ضمن عملية Node.js.
في المثال التالي ، نقوم بتمرير رد الاتصال done
على email
، عندما يحدث خطأ ، نستدعي done(err)
لإخبار Kue بحدث شيء ما ، وإلا فإننا نستدعي done()
فقط عند اكتمال المهمة. إذا كانت هذه الوظيفة تستجيب بخطأ ، فسيتم عرضها في واجهة المستخدم وسيتم تمييز المهمة كفشل. كائن الخطأ الذي تم تمريره إلى القيام به ، يجب أن يكون Error
من النوع القياسي.
var kue = require ( 'kue' )
, queue = kue . createQueue ( ) ;
queue . process ( 'email' , function ( job , done ) {
email ( job . data . to , done ) ;
} ) ;
function email ( address , done ) {
if ( ! isValidEmail ( address ) ) {
//done('invalid to address') is possible but discouraged
return done ( new Error ( 'invalid to address' ) ) ;
}
// email send stuff...
done ( ) ;
}
يمكن للعمال أيضًا نقل النتائج الوظيفية باعتبارها المعلمة الثانية التي يتم done(null,result)
لتخزين ذلك في Job.result
Key. يتم تمرير result
أيضًا من خلال معالجات الأحداث complete
حتى يتمكن منتجو الوظائف من استلامها إذا كانوا يرغبون في ذلك.
بشكل افتراضي queue.process()
. بالنسبة للمهام الصغيرة مثل إرسال رسائل البريد الإلكتروني ، هذا ليس مثاليًا ، لذلك قد نحدد الحد الأقصى للوظائف النشطة لهذا النوع عن طريق تمرير رقم:
queue . process ( 'email' , 20 , function ( job , done ) {
// ...
} ) ;
يمكن للعمال التوقف مؤقتًا واستئناف نشاطهم. وهذا هو ، بعد استدعاء pause
، لن يتلقوا أي وظائف في عملية رد الاتصال الخاصة بهم حتى يتم استدعاء resume
. تقوم وظيفة pause
بإغلاق هذا العامل بأمان ، وتستخدم نفس الوظيفة الداخلية مثل طريقة shutdown
في الإغلاق الرشيق.
queue . process ( 'email' , function ( job , ctx , done ) {
ctx . pause ( 5000 , function ( err ) {
console . log ( "Worker is paused... " ) ;
setTimeout ( function ( ) { ctx . resume ( ) ; } , 10000 ) ;
} ) ;
} ) ;
لاحظ أن معلمة ctx
من Kue >=0.9.0
هي الوسيطة الثانية لوظيفة رد الاتصال على العملية done
دائمًا ما هو الأخير
ملاحظة يتم تغيير توقيع طريقة pause
من Kue >=0.9.0
لتحريك وظيفة رد الاتصال إلى الأخير.
للحصول على مثال "حقيقي" ، دعنا نقول أننا بحاجة إلى تجميع ملف PDF من العديد من الشرائح باستخدام Node-Canvas. قد تتكون مهمتنا من البيانات التالية ، لاحظ أنه يجب عليك بشكل عام تخزين بيانات كبيرة في الوظيفة الذاتية ، من الأفضل تخزين المراجع مثل IDS ، وسحبها أثناء المعالجة.
queue . create ( 'slideshow pdf' , {
title : user . name + "'s slideshow"
, slides : [ ... ] // keys to data stored in redis, mongodb, or some other store
} ) ;
يمكننا الوصول إلى هذه البيانات التعسفية نفسها ضمن عملية منفصلة أثناء المعالجة ، عبر خاصية job.data
. في المثال ، نقوم بإعداد كل شريحة واحدة تلو الأخرى ، ونحدث سجل الوظيفة والتقدم.
queue . process ( 'slideshow pdf' , 5 , function ( job , done ) {
var slides = job . data . slides
, len = slides . length ;
function next ( i ) {
var slide = slides [ i ] ; // pretend we did a query on this slide id ;)
job . log ( 'rendering %dx%d slide' , slide . width , slide . height ) ;
renderSlide ( slide , function ( err ) {
if ( err ) return done ( err ) ;
job . progress ( i , len , { nextSlide : i == len ? 'itsdone' : i + 1 } ) ;
if ( i == len ) done ( )
else next ( i + 1 ) ;
} ) ;
}
next ( 0 ) ;
} ) ;
Queue#shutdown([timeout,] fn)
يشير جميع العمال إلى التوقف عن المعالجة بعد القيام بعملهم النشط الحالي. سيقوم العمال بانتظار timeout
milliseconds لقيامهم النشط بالاستدعاء أو وضع علامة على failed
النشطة مع سبب خطأ الإغلاق. عندما يخبر جميع العمال Kue أنهم يتوقفون عن fn
.
var queue = require ( 'kue' ) . createQueue ( ) ;
process . once ( 'SIGTERM' , function ( sig ) {
queue . shutdown ( 5000 , function ( err ) {
console . log ( 'Kue shutdown: ' , err || '' ) ;
process . exit ( 0 ) ;
} ) ;
} ) ;
لاحظ أنه يتم تغيير توقيع طريقة shutdown
من Kue >=0.9.0
لتحريك وظيفة رد الاتصال إلى الأخير.
جميع الأخطاء إما في مكتبة عميل Redis أو قائمة انتظارها تنبعث من كائن Queue
. يجب أن ترتبط بأحداث error
لمنع استثناءات غير معلوم أو أخطاء Kue.
var queue = require ( 'kue' ) . createQueue ( ) ;
queue . on ( 'error' , function ( err ) {
console . log ( 'Oops... ' , err ) ;
} ) ;
تمثل Kue مهمة كاملة/فشل عند done
من قبل العامل ، لذلك يجب عليك استخدام معالجة الأخطاء المناسبة لمنع الاستثناءات غير المطلوبة في رمز العامل الخاص بك وعملية Node.js التي تخرج قبل أن يتم تنفيذ وظائف المقبض. يمكن تحقيق ذلك بطريقتين:
queue . process ( 'my-error-prone-task' , function ( job , done ) {
var domain = require ( 'domain' ) . create ( ) ;
domain . on ( 'error' , function ( err ) {
done ( err ) ;
} ) ;
domain . run ( function ( ) { // your process function
throw new Error ( 'bad things happen' ) ;
done ( ) ;
} ) ;
} ) ;
إشعار - يتم إهمال المجالات من NodeJs مع الاستقرار 0 ولا ينصح باستخدامها.
هذا هو الحل أنعم وأفضل ، ولكن ليس مدمجًا مع Kue. يرجى الرجوع إلى هذه المناقشة. يمكنك التعليق على هذه الميزة في قضية Open Kue ذات الصلة.
يمكنك أيضًا استخدام الوعود لفعل شيء مثل
queue . process ( 'my-error-prone-task' , function ( job , done ) {
Promise . method ( function ( ) { // your process function
throw new Error ( 'bad things happen' ) ;
} ) ( ) . nodeify ( done )
} ) ;
ولكن هذا لن يلتقط استثناءات في مكدس استدعاء Async كما تفعل المجالات.
uncaughtException
وإغلاق Kue بأمان ، ومع ذلك ، فإن هذا ليس خطأً موصى به في التعامل مع المصطلح في JavaScript لأنك تفقد سياق الخطأ. process . once ( 'uncaughtException' , function ( err ) {
console . error ( 'Something bad happened: ' , err ) ;
queue . shutdown ( 1000 , function ( err2 ) {
console . error ( 'Kue shutdown result: ' , err2 || 'OK' ) ;
process . exit ( 0 ) ;
} ) ;
} ) ;
تستخدم KUE حاليًا إدارة حالة عمل عميل جانبي ، وعندما تتعطل Redis في منتصف تلك العمليات ، ستحدث بعض الوظائف المتعلقة بالوظائف أو التناقضات في الفهرس. والنتيجة هي أن عدد معين من الوظائف سيكون عالقًا ، ويتم سحبه من قبل العامل فقط عند إنشاء وظائف جديدة ، إذا لم يتم إنشاء وظائف جديدة ، فهي عالقة إلى الأبد. لذلك نقترح بشدة أن تقوم بتشغيل WatchDog لإصلاح هذه المشكلة عن طريق الاتصال:
queue . watchStuckJobs ( interval )
interval
بالمللي ثانية ويتخلف عن السداد إلى 1000ms
سيتم إعادة تمهيد Kue لإدارة حالة الوظائف الذرية بالكامل من الإصدار 1.0 وسيحدث هذا بواسطة البرامج النصية LUA و/أو مجموعة BRPOPLPUSH. يمكنك قراءة المزيد هنا وهنا.
يحتوي كائن قائمة الانتظار على نوعان من الطرق لإخباركم عن عدد الوظائف في كل ولاية
queue . inactiveCount ( function ( err , total ) { // others are activeCount, completeCount, failedCount, delayedCount
if ( total > 100000 ) {
console . log ( 'We need some back pressure here' ) ;
}
} ) ;
يمكنك أيضًا الاستعلام عن نوع عمل معين:
queue . failedCount ( 'my-critical-job' , function ( err , total ) {
if ( total > 10000 ) {
console . log ( 'This is tOoOo bad' ) ;
}
} ) ;
والتكرار على معرفات الوظيفة
queue . inactive ( function ( err , ids ) { // others are active, complete, failed, delayed
// you may want to fetch each id to get the Job object out of it...
} ) ;
ومع ذلك ، فإن الثانية لا تتوسع في عمليات النشر الكبيرة ، يمكنك استخدام طرق أكثر تحديداً Job
:
kue . Job . rangeByState ( 'failed' , 0 , n , 'asc' , function ( err , jobs ) {
// you have an array of maximum n Job objects here
} ) ;
أو
kue . Job . rangeByType ( 'my-job-type' , 'failed' , 0 , n , 'asc' , function ( err , jobs ) {
// you have an array of maximum n Job objects here
} ) ;
لاحظ أن آخر طريقتين عرضة للتغيير في إصدارات KUE اللاحقة.
إذا لم تفعل أي شيء أعلاه في قسم معالجة الأخطاء أو فقدان عمليتك في الوظائف النشطة بأي شكل من الأشكال ، فيمكنك التعافي منها عند إعادة تشغيل العملية. سيكون المنطق الأعمى هو إعادة توصيل جميع الوظائف المعلقة:
queue . active ( function ( err , ids ) {
ids . forEach ( function ( id ) {
kue . Job . get ( id , function ( err , job ) {
// Your application should check if job is a stuck one
job . inactive ( ) ;
} ) ;
} ) ;
} ) ;
ملاحظة في النشر المجمع ، يجب أن يكون طلبك على دراية بعدم إشراك وظيفة صالحة ، حاليًا من قبل العمال الآخرين.
بيانات الوظائف وفهارس البحث تأكل مساحة ذاكرة redis ، لذلك ستحتاج إلى بعض عمليات حفظ الوظائف في عمليات النشر في العالم الحقيقي. فرصتك الأولى هي استخدام إزالة الوظائف التلقائية عند الانتهاء.
queue . create ( ... ) . removeOnComplete ( true ) . save ( )
ولكن إذا كنت بحاجة إلى بيانات الوظائف المكتملة/مؤقتًا ، فيمكنك إعداد برنامج نصي لإزالة الوظائف عند الطلب كما يلي لإزالة الوظائف المكتملة n
:
kue . Job . rangeByState ( 'complete' , 0 , n , 'asc' , function ( err , jobs ) {
jobs . forEach ( function ( job ) {
job . remove ( function ( ) {
console . log ( 'removed ' , job . id ) ;
} ) ;
} ) ;
} ) ;
لاحظ أنه يجب عليك توفير الوقت الكافي لـ .remove
المكالمات على كل كائن عمل لإكماله قبل خروج العملية ، أو ستتسرب فهارس الوظائف
بشكل افتراضي ، ستتصل Kue بـ Redis باستخدام إعدادات العميل الافتراضية (الافتراضية المنفذ إلى 6379
، والمضيف الافتراضي إلى 127.0.0.1
، والبادئة الافتراضية إلى q
). Queue#createQueue(options)
تقبل خيارات الاتصال Redis في options.redis
Key.
var kue = require ( 'kue' ) ;
var q = kue . createQueue ( {
prefix : 'q' ,
redis : {
port : 1234 ,
host : '10.0.50.20' ,
auth : 'password' ,
db : 3 , // if provided select a non-default redis db
options : {
// see https://github.com/mranney/node_redis#rediscreateclient
}
}
} ) ;
prefix
تتحكم في أسماء المفاتيح المستخدمة في redis. بشكل افتراضي ، هذا ببساطة q
. لا ينبغي تغيير البادئة عمومًا إلا إذا كنت بحاجة إلى استخدام مثيل redis واحد لتطبيقات متعددة. يمكن أن يكون مفيدًا أيضًا لتوفير اختبار معزول عبر تطبيقك الرئيسي.
يمكنك أيضًا تحديد معلومات الاتصال كسلسلة عناوين URL.
var q = kue . createQueue ( {
redis : 'redis://example.com:1234?redis_option=value&redis_option=value'
} ) ;
نظرًا لأن Node_redis يدعم مآخذ مجال UNIX ، يمكنك أيضًا إخبار Kue بالقيام بذلك. انظر مقبس Unix-Domain لتكوين Redis Server الخاص بك.
var kue = require ( 'kue' ) ;
var q = kue . createQueue ( {
prefix : 'q' ,
redis : {
socket : '/data/sockets/redis.sock' ,
auth : 'password' ,
options : {
// see https://github.com/mranney/node_redis#rediscreateclient
}
}
} ) ;
يمكن حقن أي مكتبة عميل Node.js التي تتوافق (أو عند تكييفها) مع API Node_redis في Kue. يجب عليك فقط توفير وظيفة createClientFactory
كمصنع اتصال Redis بدلاً من توفير خيارات اتصال Node_redis.
فيما يلي رمز عينة لتمكين Redis-Sentinel من الاتصال بـ Redis Sentinel من أجل فشل Master/Slave التلقائي.
var kue = require ( 'kue' ) ;
var Sentinel = require ( 'redis-sentinel' ) ;
var endpoints = [
{ host : '192.168.1.10' , port : 6379 } ,
{ host : '192.168.1.11' , port : 6379 }
] ;
var opts = options || { } ; // Standard node_redis client options
var masterName = 'mymaster' ;
var sentinel = Sentinel . Sentinel ( endpoints ) ;
var q = kue . createQueue ( {
redis : {
createClientFactory : function ( ) {
return sentinel . createClient ( masterName , opts ) ;
}
}
} ) ;
لاحظ أن جميع رموز العميل <0.8.x
يجب أن يتم إعادة تمهيدها لتمرير خيارات redis إلى Queue#createQueue
بدلاً من النمط المرقّد القرد المتجاوز من redis#createClient
أو سيتم كسرها من Kue 0.8.x
var Redis = require ( 'ioredis' ) ;
var kue = require ( 'kue' ) ;
// using https://github.com/72squared/vagrant-redis-cluster
var queue = kue . createQueue ( {
redis : {
createClientFactory : function ( ) {
return new Redis . Cluster ( [ {
port : 7000
} , {
port : 7001
} ] ) ;
}
}
} ) ;
واجهة المستخدم هو تطبيق صريح صغير. يتم توفير برنامج نصي في bin/
لتشغيل الواجهة كتطبيق مستقل مع إعدادات افتراضية. يمكنك تمرير خيارات للمنفذ ، Redis-url ، والبادئة. على سبيل المثال:
node_modules/kue/bin/kue-dashboard -p 3050 -r redis://127.0.0.1:3000 -q prefix
يمكنك إطلاق النار من داخل تطبيق آخر أيضًا:
var kue = require ( 'kue' ) ;
kue . createQueue ( ... ) ;
kue . app . listen ( 3000 ) ;
يتخلف العنوان إلى "Kue" ، لتغيير هذا الاستدعاء:
kue . app . set ( 'title' , 'My Application' ) ;
لاحظ أنه إذا كنت تستخدم خيارات Kue غير الافتراضية ، فيجب استدعاء kue.createQueue(...)
قبل الوصول إلى kue.app
.
يمكنك أيضًا استخدام واجهة الويب KUE-UI التي ساهم بها Arnaud Bénard
جنبا إلى جنب مع واجهة المستخدم kue يعرض أيضا واجهة برمجة تطبيقات JSON ، والتي تستخدمها واجهة المستخدم.
وظائف الاستعلام ، على سبيل المثال "Get /Job /Search؟ q = avi video":
[ "5" , "7" , "10" ]
بشكل افتراضي ، يقوم Kue بفهرسة كائن بيانات الوظيفة بالكامل للبحث ، ولكن يمكن تخصيص هذا من خلال استدعاء Job#searchKeys
لإخبار Kue عن مفاتيح بيانات الوظيفة لإنشاء فهرس لـ:
var kue = require ( 'kue' ) ;
queue = kue . createQueue ( ) ;
queue . create ( 'email' , {
title : 'welcome email for tj'
, to : '[email protected]'
, template : 'welcome-email'
} ) . searchKeys ( [ 'to' , 'title' ] ) . save ( ) ;
يتم إيقاف ميزة البحث افتراضيًا من Kue >=0.9.0
. اقرأ المزيد عن هذا هنا. يجب عليك تمكين فهارس البحث وإضافة حمراء في تبعياتك إذا كنت بحاجة إلى:
var kue = require ( 'kue' ) ;
q = kue . createQueue ( {
disableSearch : false
} ) ;
npm install reds --save
يستجيب حاليًا بتهم الدولة ، ووقت نشاط العمال بالمللي ثانية:
{ "inactiveCount" : 4 , "completeCount" : 69 , "activeCount" : 2 , "failedCount" : 0 , "workTime" : 20892 }
احصل على وظيفة :id
:
{ "id" : "3" , "type" : "email" , "data" : { "title" : "welcome email for tj" , "to" : "[email protected]" , "template" : "welcome-email" } , "priority" : - 10 , "progress" : "100" , "state" : "complete" , "attempts" : null , "created_at" : "1309973155248" , "updated_at" : "1309973155248" , "duration" : "15002" }
احصل على وظيفة :id
:
[ 'foo' , 'bar' , 'baz' ]
احصل على وظائف مع النطاق المحدد :from
إلى :to
، على سبيل المثال "/jobs/0..2" ، حيث :order
"ASC" أو "desc":
[ { "id" : "12" , "type" : "email" , "data" : { "title" : "welcome email for tj" , "to" : "[email protected]" , "template" : "welcome-email" } , "priority" : - 10 , "progress" : 0 , "state" : "active" , "attempts" : null , "created_at" : "1309973299293" , "updated_at" : "1309973299293" } , { "id" : "130" , "type" : "email" , "data" : { "title" : "welcome email for tj" , "to" : "[email protected]" , "template" : "welcome-email" } , "priority" : - 10 , "progress" : 0 , "state" : "active" , "attempts" : null , "created_at" : "1309975157291" , "updated_at" : "1309975157291" } ]
كما هو مذكور أعلاه ، تقييد :state
التي هي واحدة من:
- active
- inactive
- failed
- complete
كما هو مذكور أعلاه ، مهما كان مقيدًا بـ :type
و :state
.
حذف الوظيفة :id
:
$ curl -X DELETE http://local:3000/job/2
{"message":"job 2 removed"}
إنشاء وظيفة:
$ curl -H "Content-Type: application/json" -X POST -d
'{
"type": "email",
"data": {
"title": "welcome email for tj",
"to": "[email protected]",
"template": "welcome-email"
},
"options" : {
"attempts": 5,
"priority": "high"
}
}' http://localhost:3000/job
{"message": "job created", "id": 3}
يمكنك إنشاء وظائف متعددة مرة واحدة عن طريق تمرير صفيف. في هذه الحالة ، ستكون الاستجابة عبارة عن صفيف أيضًا ، مع الحفاظ على الترتيب:
$ curl -H "Content-Type: application/json" -X POST -d
'[{
"type": "email",
"data": {
"title": "welcome email for tj",
"to": "[email protected]",
"template": "welcome-email"
},
"options" : {
"attempts": 5,
"priority": "high"
}
},
{
"type": "email",
"data": {
"title": "followup email for tj",
"to": "[email protected]",
"template": "followup-email"
},
"options" : {
"delay": 86400,
"attempts": 5,
"priority": "high"
}
}]' http://localhost:3000/job
[
{"message": "job created", "id": 4},
{"message": "job created", "id": 5}
]
ملاحظة: عند إدخال وظائف متعددة بكميات كبيرة ، إذا فشل إدخال واحد ، فسيستمر Kue في معالجة الوظائف المتبقية بالترتيب. سيحتوي صفيف الاستجابة على معرفات الوظائف المضافة بنجاح ، وسيكون أي عنصر فاشل كائنًا يصف الخطأ: {"error": "error reason"}
.
يوضح المثال أدناه كيف يمكنك استخدام Cluster لنشر تحميل معالجة الوظائف عبر وحدات المعالجة المركزية. يرجى الاطلاع على وثائق وحدة المجموعة للحصول على أمثلة أكثر تفصيلاً حول استخدامه.
عندما يتم تنفيذ الملف .isMaster
يتم تنفيذ المنطق في كتلة else
لكل عامل .
var kue = require ( 'kue' )
, cluster = require ( 'cluster' )
, queue = kue . createQueue ( ) ;
var clusterWorkerSize = require ( 'os' ) . cpus ( ) . length ;
if ( cluster . isMaster ) {
kue . app . listen ( 3000 ) ;
for ( var i = 0 ; i < clusterWorkerSize ; i ++ ) {
cluster . fork ( ) ;
}
} else {
queue . process ( 'email' , 10 , function ( job , done ) {
var pending = 5
, total = pending ;
var interval = setInterval ( function ( ) {
job . log ( 'sending!' ) ;
job . progress ( total - pending , total ) ;
-- pending || done ( ) ;
pending || clearInterval ( interval ) ;
} , 1000 ) ;
} ) ;
}
سيؤدي ذلك إلى إنشاء معالج وظيفة email
(عامل) لكل من مراكز وحدة المعالجة المركزية في جهازك ، مع كل منها يمكنك التعامل مع 10 وظائف بريد إلكتروني متزامنة ، مما يؤدي إلى 10 * N
وظائف البريد الإلكتروني المتزامنة في جهاز N
Core الخاص بك.
الآن عندما تزور واجهة المستخدم في Kue في المتصفح ، سترى أن الوظائف تتم معالجتها بشكل أسرع N
! (إذا كان لديك نور N
).
من خلال استخدام تركيب التطبيق ، يمكنك تخصيص تطبيق الويب ، أو تمكين TLS ، أو إضافة برامج وسيطة إضافية مثل basic-auth-connect
.
$ npm install --save basic-auth-connect
var basicAuth = require ( 'basic-auth-connect' ) ;
var app = express . createServer ( { ... tls options ... } ) ;
app . use ( basicAuth ( 'foo' , 'bar' ) ) ;
app . use ( kue . app ) ;
app . listen ( 3000 ) ;
تمكين وضع الاختبار لدفع جميع الوظائف إلى صفيف jobs
. قم بتأكيد ضد الوظائف في تلك الصفيف لضمان أن الرمز الخاضع للاختبار هو وظائف لاستئناف بشكل صحيح.
queue = require ( 'kue' ) . createQueue ( ) ;
before ( function ( ) {
queue . testMode . enter ( ) ;
} ) ;
afterEach ( function ( ) {
queue . testMode . clear ( ) ;
} ) ;
after ( function ( ) {
queue . testMode . exit ( )
} ) ;
it ( 'does something cool' , function ( ) {
queue . createJob ( 'myJob' , { foo : 'bar' } ) . save ( ) ;
queue . createJob ( 'anotherJob' , { baz : 'bip' } ) . save ( ) ;
expect ( queue . testMode . jobs . length ) . to . equal ( 2 ) ;
expect ( queue . testMode . jobs [ 0 ] . type ) . to . equal ( 'myJob' ) ;
expect ( queue . testMode . jobs [ 0 ] . data ) . to . eql ( { foo : 'bar' } ) ;
} ) ;
هام: بشكل افتراضي لا تتم معالجة الوظائف عند إنشائها أثناء وضع الاختبار. يمكنك تمكين معالجة الوظائف عن طريق تمرير True to testmode.enter
before ( function ( ) {
queue . testMode . enter ( true ) ;
} ) ;
نحن نحب المساهمات!
عند المساهمة ، اتبع القواعد البسيطة:
(ترخيص معهد ماساتشوستس للتكنولوجيا)
حقوق الطبع والنشر (C) 2011 LearnBoost <[email protected]>
يتم منح الإذن بموجب هذا ، مجانًا ، لأي شخص يحصل على نسخة من هذا البرنامج وملفات الوثائق المرتبطة ("البرنامج") ، للتعامل في البرنامج دون تقييد ، بما في ذلك على سبيل المثال لا الحصر حقوق استخدام ، نسخ ، تعديل ، دمج أو نشر نسخ وتوزيعها و/أو بيعها و/أو بيع نسخ من البرامج ، والسماح للأشخاص الذين يتم تقديم البرنامج لهم للقيام بذلك ، مع مراعاة الشروط التالية:
يجب إدراج إشعار حقوق الطبع والنشر أعلاه وإشعار الإذن هذا في جميع النسخ أو الأجزاء الكبيرة من البرنامج.
يتم توفير البرنامج "كما هو" ، دون أي ضمان من أي نوع ، صريح أو ضمني ، بما في ذلك على سبيل المثال لا الحصر ضمانات القابلية للتسويق واللياقة لغرض معين وعدم الانفجارات. لن يكون المؤلفون أو حاملي حقوق الطبع والنشر بأي حال من الأحوال مسؤولاً عن أي مطالبة أو أضرار أو مسؤولية أخرى ، سواء في إجراء عقد أو أضرار أو غير ذلك ، ناشئة عن البرامج أو خارجها أو الاستخدام أو غيرها برمجة.