Bitte beachten Sie EG Bull als Alternative. Danke schön!
Kue ist eine vorrangige Jobwarteschlange, die von Redis unterstützt wird und für Node.js.
Protip Dies ist die neueste KUE -Dokumentation. Lesen Sie auch den ChangeList.
Letzte Veröffentlichung:
$ npm install kue
Master -Zweig:
$ npm install http://github.com/Automattic/kue/tarball/master
Erstellen Sie zuerst eine Queue
mit kue.createQueue()
:
var kue = require ( 'kue' )
, queue = kue . createQueue ( ) ;
Anrufen von queue.create()
mit der Art des Jobs ("E -Mail") und willkürlichen Jobdaten gibt einen Job
zurück, der dann save()
ed und addis zu einer Standardpriorität von "normal" hinzufügt. Die save()
-Methode akzeptiert optional einen Rückruf und antwortet mit einem error
, wenn etwas schief geht. Der title
ist mit Spezialbeschaffung und wird in den Stellenangeboten innerhalb der Benutzeroberfläche angezeigt, wodurch es einfacher ist, einen bestimmten Job zu finden.
var job = queue . create ( 'email' , {
title : 'welcome email for tj'
, to : '[email protected]'
, template : 'welcome-email'
} ) . save ( function ( err ) {
if ( ! err ) console . log ( job . id ) ;
} ) ;
Um die Priorität eines Jobs anzugeben, rufen Sie einfach die priority()
-Methode mit einer Nummer oder Prioritätsnamen auf, die einer Nummer zugeordnet ist.
queue . create ( 'email' , {
title : 'welcome email for tj'
, to : '[email protected]'
, template : 'welcome-email'
} ) . priority ( 'high' ) . save ( ) ;
Die Standard -Prioritätskarte lautet wie folgt:
{
low : 10
, normal : 0
, medium : - 5
, high : - 10
, critical : - 15
} ;
Standardmäßig haben Jobs nur einen Versuch, wenn sie scheitern, sind sie als Misserfolg gekennzeichnet und bleiben so, bis Sie eingreifen. Mit Kue können Sie dies jedoch angeben, was für Jobs wie die Übertragung einer E -Mail wichtig ist, die nach dem Scheitern normalerweise ohne Probleme wiederholen kann. Um dies zu tun, berufen Sie die Methode .attempts()
mit einer Nummer.
queue . create ( 'email' , {
title : 'welcome email for tj'
, to : '[email protected]'
, template : 'welcome-email'
} ) . priority ( 'high' ) . attempts ( 5 ) . save ( ) ;
Die Versuche von Job -Wiederholung werden durchgeführt, sobald sie scheitern, ohne Verzögerung, auch wenn Ihr Job eine Verzögerung durch Job#delay
festgelegt hat. Wenn Sie den Auftrag wieder auf Failures (als Backoff bezeichnet) verzögern möchten, können Sie Job#backoff
-Methode auf unterschiedliche Weise verwenden:
// Honor job's original delay (if set) at each attempt, defaults to fixed backoff
job . attempts ( 3 ) . backoff ( true )
// Override delay value, fixed backoff
job . attempts ( 3 ) . backoff ( { delay : 60 * 1000 , type : 'fixed' } )
// Enable exponential backoff using original delay (if set)
job . attempts ( 3 ) . backoff ( { type : 'exponential' } )
// Use a function to get a customized next attempt delay value
job . attempts ( 3 ) . backoff ( function ( attempts , delay ) {
//attempts will correspond to the nth attempt failure so it will start with 0
//delay will be the amount of the last delay, not the initial delay unless attempts === 0
return my_customized_calculated_delay ;
} )
Im letzten Szenario wird die Funktion (über Eval) bei jedem Wiederverhalten ausgeführt, um den nächsten Versuchsverzögerungswert zu erhalten, was bedeutet, dass Sie nicht externe/Kontextvariablen in ihm verweisen können.
Jobproduzenten können einen Ablaufwert für die Zeit festlegen, in der ihr Job im aktiven Zustand leben TTL exceeded
Parallelität.
queue . create ( 'email' , { title : 'email job with TTL' } ) . ttl ( milliseconds ) . save ( ) ;
Arbeitspezifische Protokolle ermöglichen es Ihnen, Informationen zu jedem Zeitpunkt in der Lebenszeit des Jobs der Benutzeroberfläche aufzuteilen. Um dies einfach zu tun, rufen Sie job.log()
auf, der eine Nachrichtenzeichenfolge sowie variable Argumente für die Sprintf-ähnliche Unterstützung akzeptiert:
job . log ( '$%d sent to %s' , amount , user . name ) ;
oder irgendetwas anderes (verwendet util.inspect () intern):
job . log ( { key : 'some key' , value : 10 } ) ;
job . log ( [ 1 , 2 , 3 , 5 , 8 ] ) ;
job . log ( 10.1 ) ;
Der Fortschritt des Arbeitsplatzes ist äußerst nützlich für langjährige Jobs wie die Videoumwandlung. Um den Fortschritt des Jobs zu aktualisieren, rufen Sie einfach auf job.progress(completed, total [, data])
auf:
job . progress ( frames , totalFrames ) ;
Daten können verwendet werden, um zusätzliche Informationen über den Job zu übergeben. Zum Beispiel eine Nachricht oder ein Objekt mit zusätzlichen Kontextdaten zum aktuellen Status.
Berufsspezifische Ereignisse werden in den Job
über Redis PubSub abgefeuert. Die folgenden Ereignisse werden derzeit unterstützt:
enqueue
the Job ist jetzt in der Warteschlangestart
den Job jetzt läuft jetztpromotion
Der Job wird vom verspäteten Staat in die Warteschlange befördertprogress
die Fortschritte des Jobs von 0-100failed attempt
Der Job ist gescheitert, hat aber noch verbleibende Versuchefailed
und hat keine verbleibenden Versuchecomplete
den Job abgeschlossenremove
den Job, wurde entferntZum Beispiel kann dies ungefähr wie folgt aussehen:
var job = queue . create ( 'video conversion' , {
title : 'converting loki's to avi'
, user : 1
, frames : 200
} ) ;
job . on ( 'complete' , function ( result ) {
console . log ( 'Job completed with data ' , result ) ;
} ) . on ( 'failed attempt' , function ( errorMessage , doneAttempts ) {
console . log ( 'Job failed' ) ;
} ) . on ( 'failed' , function ( errorMessage ) {
console . log ( 'Job failed' ) ;
} ) . on ( 'progress' , function ( progress , data ) {
console . log ( 'r job #' + job . id + ' ' + progress + '% complete with data ' , data ) ;
} ) ;
Beachten Sie , dass bei den Wiederherstellungen von Prozess -Level -Ereignissen nicht garantiert wird, dass der Prozess des Neustarts von Node.js den Verweis auf das spezifische Jobobjekt verliert. Wenn Sie einen zuverlässigeren Event -Handler wünschen, suchen Sie nach Warteschlangenereignissen.
Beachten Sie, dass Kue Jobobjekte im Speicher speichert, bis sie vollständig/nicht in der Lage sind, Ereignisse auf sie abzugeben. Wenn Sie eine enorme Parallelität in nicht abgeschlossenen Jobs haben, schalten Sie diese Funktion aus und verwenden Sie Ereignisse der Warteschlangenebene, um eine bessere Speicherskalierung zu erhalten.
kue . createQueue ( { jobEvents : false } )
Alternativ können Sie die events
für die Auftragsniveau verwenden, um zu steuern, ob Ereignisse für einen Job auf Jobebene entlassen werden.
var job = queue . create ( 'test' ) . events ( false ) . save ( ) ;
Ereignisse auf Warteschlangenebene bieten Zugriff auf die zuvor erwähnten Auftragsereignisse, die jedoch in die Queue
gesendet wurden, um Logik auf einer "globalen" Ebene anzuwenden. Ein Beispiel dafür ist das Entfernen von abgeschlossenen Jobs:
queue . on ( 'job enqueue' , function ( id , type ) {
console . log ( 'Job %s got queued of type %s' , id , type ) ;
} ) . on ( 'job complete' , function ( id , result ) {
kue . Job . get ( id , function ( err , job ) {
if ( err ) return ;
job . remove ( function ( err ) {
if ( err ) throw err ;
console . log ( 'removed completed job #%d' , job . id ) ;
} ) ;
} ) ;
} ) ;
Die verfügbaren Veranstaltungen sind die gleichen wie in "Job -Events", die jedoch "Job" vorangestellt sind.
Verspätete Jobs können für eine beliebige Zeit in der Warteschlange gestellt werden, indem die Methode .delay(ms)
aufgerufen wird und die Anzahl der Millisekunden in Bezug auf jetzt überholt. Alternativ können Sie in Zukunft ein JavaScript Date
mit einer bestimmten Zeit übergeben. Dies kennzeichnet den Job
automatisch als "verzögert".
var email = queue . create ( 'email' , {
title : 'Account renewal required'
, to : '[email protected]'
, template : 'renewal-email'
} ) . delay ( milliseconds )
. priority ( 'high' )
. save ( ) ;
KUE wird die verspäteten Jobs mit einem Timer überprüfen und sie bewerben, wenn die geplante Verzögerung überschritten wurde, was eine Überprüfung von Top 1000 -Jobs pro Sekunde überschritten hat.
Die Bearbeitung von Jobs ist bei KUE einfach. Erstellen Sie zunächst eine Queue
, ähnlich wie wir es für die Erstellung von Arbeitsplätzen tun und uns Zugriff auf Redis usw. ermöglichen, und rufen Sie dann queue.process()
mit dem zugehörigen Typ auf. Beachten Sie, dass im Gegensatz zu dem, was der Name createQueue
vorschlägt, derzeit eine Singleton Queue
-Instanz zurückgibt. Sie können also nur ein einzelnes Queue
in Ihrem Node.js -Prozess konfigurieren und verwenden.
Im folgenden Beispiel übergeben wir den Rückruf an email
, wenn ein Fehler auftritt, erregte wir mit dem Aufruf von done
done(err)
um KUE zu sagen, dass etwas passiert ist, andernfalls rufen wir done()
wenn der Job abgeschlossen ist. Wenn diese Funktion mit einem Fehler reagiert, wird sie in der Benutzeroberfläche angezeigt und der Job als Fehler gekennzeichnet. Das an die Fertigstellung übergebene Fehlerobjekt sollte vom Error
des Typs sein.
var kue = require ( 'kue' )
, queue = kue . createQueue ( ) ;
queue . process ( 'email' , function ( job , done ) {
email ( job . data . to , done ) ;
} ) ;
function email ( address , done ) {
if ( ! isValidEmail ( address ) ) {
//done('invalid to address') is possible but discouraged
return done ( new Error ( 'invalid to address' ) ) ;
}
// email send stuff...
done ( ) ;
}
Die Arbeitnehmer können auch das Arbeitsplatz als zweites Parameter übergeben done(null,result)
um dies in Job.result
-Schlüssel zu speichern. result
wird auch complete
Event -Handler weitergegeben, damit die Arbeitsproduzenten es erhalten können, wenn sie möchten.
Standardmäßig übernimmt ein Anruf bei queue.process()
jeweils nur einen Job für die Bearbeitung. Für kleine Aufgaben wie das Senden von E -Mails ist dies nicht ideal. Daher können wir die maximalen aktiven Jobs für diesen Typ angeben, indem wir eine Nummer übergeben:
queue . process ( 'email' , 20 , function ( job , done ) {
// ...
} ) ;
Arbeiter können ihre Aktivitäten vorübergehend innehalten und wieder aufnehmen. Das heißt, nachdem sie pause
angerufen haben, erhalten sie in ihrem Prozessrückruf keine Jobs, bis resume
gerufen wird. Die pause
-Funktion führt diesen Arbeiter anmutig herunter und verwendet dieselbe interne Funktionalität wie die shutdown
bei der Graceful Shutdown.
queue . process ( 'email' , function ( job , ctx , done ) {
ctx . pause ( 5000 , function ( err ) {
console . log ( "Worker is paused... " ) ;
setTimeout ( function ( ) { ctx . resume ( ) ; } , 10000 ) ;
} ) ;
} ) ;
Beachten Sie, dass der ctx
-Parameter von KUE >=0.9.0
das zweite Argument der Prozess -Rückruffunktion ist und done
ist immer immer das letzte Mal
Beachten Sie, dass die Signatur der pause
-Methode von kue >=0.9.0
geändert wird, um die Rückruffunktion auf die letzte zu verschieben.
Für ein "echtes" Beispiel müssen wir einen PDF aus zahlreichen Folien mit Node-Canvas kompilieren. Unser Job kann aus den folgenden Daten bestehen. Beachten Sie, dass Sie im Allgemeinen keine großen Daten im Job-Selbst speichern sollten. Es ist besser, Referenzen wie IDs zu speichern und sie während der Verarbeitung zu ziehen.
queue . create ( 'slideshow pdf' , {
title : user . name + "'s slideshow"
, slides : [ ... ] // keys to data stored in redis, mongodb, or some other store
} ) ;
Wir können während der Verarbeitung über die Eigenschaft job.data
auf dieselben willkürlichen Daten innerhalb eines separaten Prozesses zugreifen. In dem Beispiel machen wir jede Folie eins von eins und aktualisieren das Protokoll und Fortschritt des Jobs.
queue . process ( 'slideshow pdf' , 5 , function ( job , done ) {
var slides = job . data . slides
, len = slides . length ;
function next ( i ) {
var slide = slides [ i ] ; // pretend we did a query on this slide id ;)
job . log ( 'rendering %dx%d slide' , slide . width , slide . height ) ;
renderSlide ( slide , function ( err ) {
if ( err ) return done ( err ) ;
job . progress ( i , len , { nextSlide : i == len ? 'itsdone' : i + 1 } ) ;
if ( i == len ) done ( )
else next ( i + 1 ) ;
} ) ;
}
next ( 0 ) ;
} ) ;
Queue#shutdown([timeout,] fn)
signalisiert alle Arbeitnehmer, die Verarbeitung zu beenden, nachdem ihr aktueller aktiver Job erledigt ist. Die Arbeitnehmer warten timeout
-Millisekunden, damit ihre aktiven Arbeiten erledigt werden sollen, um aufgerufen zu werden, oder zu markieren, dass der aktive Job mit dem Herunterfahrenfehler aus failed
. Wenn alle Arbeiter Kue sagen, dass sie gestoppt werden, wird fn
gerufen.
var queue = require ( 'kue' ) . createQueue ( ) ;
process . once ( 'SIGTERM' , function ( sig ) {
queue . shutdown ( 5000 , function ( err ) {
console . log ( 'Kue shutdown: ' , err || '' ) ;
process . exit ( 0 ) ;
} ) ;
} ) ;
Beachten Sie , dass die Signatur des shutdown
von KUE >=0.9.0
geändert wird, um die Rückruffunktion auf die letzte zu verschieben.
Alle Fehler entweder in der Redis -Client -Bibliothek oder in der Warteschlange werden in das Queue
emittiert. Sie sollten an error
binden, um unbekannte Ausnahmen zu verhindern oder Kue -Fehler zu debuggen.
var queue = require ( 'kue' ) . createQueue ( ) ;
queue . on ( 'error' , function ( err ) {
console . log ( 'Oops... ' , err ) ;
} ) ;
KUE markiert einen Job, der vollständig/fehlgeschlagen ist, wenn Ihr Arbeiter done
wird. Sie sollten daher die richtige Fehlerbehandlung verwenden, um nicht erfasste Ausnahmen im Code Ihres Arbeiters zu verhindern. Dies kann auf zwei Arten erreicht werden:
queue . process ( 'my-error-prone-task' , function ( job , done ) {
var domain = require ( 'domain' ) . create ( ) ;
domain . on ( 'error' , function ( err ) {
done ( err ) ;
} ) ;
domain . run ( function ( ) { // your process function
throw new Error ( 'bad things happen' ) ;
done ( ) ;
} ) ;
} ) ;
HINWEIS - Domänen werden von NodeJs mit Stabilität 0 veraltet und es wird nicht empfohlen.
Dies ist die weichste und beste Lösung, ist jedoch nicht mit Kue integriert. Bitte beachten Sie diese Diskussion. Sie können diese Funktion in der zugehörigen offenen Kue -Ausgabe kommentieren.
Sie können auch Versprechen verwenden, um so etwas wie zu tun
queue . process ( 'my-error-prone-task' , function ( job , done ) {
Promise . method ( function ( ) { // your process function
throw new Error ( 'bad things happen' ) ;
} ) ( ) . nodeify ( done )
} ) ;
Dies werden jedoch keine Ausnahmen in Ihrem asynchronen Call -Stack wie Domains erfassen.
uncaughtException
und anmutiges Schließen des KUE. Dies ist jedoch keine empfohlene Fehlerhandlung in JavaScript, da Sie den Fehlerkontext verlieren. process . once ( 'uncaughtException' , function ( err ) {
console . error ( 'Something bad happened: ' , err ) ;
queue . shutdown ( 1000 , function ( err2 ) {
console . error ( 'Kue shutdown result: ' , err2 || 'OK' ) ;
process . exit ( 0 ) ;
} ) ;
} ) ;
KUE verwendet derzeit Client Side Job State Management, und wenn Redis mitten in diesen Operationen abstürzt, werden einige festgefahrene Jobs oder Index -Inkonsistenzen stattfinden. Die Folge ist, dass eine bestimmte Anzahl von Arbeitsplätzen festsitzt und nur dann vom Arbeitnehmer herausgezogen wird, wenn neue Arbeitsplätze geschaffen werden. Wenn keine neuen Arbeitsplätze mehr geschaffen werden, steckten sie für immer fest. Wir empfehlen also dringend , dass Sie Watchdog durchführen, um dieses Problem zu beheben, indem Sie anrufen:
queue . watchStuckJobs ( interval )
interval
ist in Millisekunden und standardmäßig 1000 ms
KUE wird aus Version 1.0 auf vollständig Atomic Job State Management neu ausgestattet. Sie können hier und hier mehr lesen.
Das Warteschlangenobjekt verfügt über zwei Arten von Methoden, um Ihnen über die Anzahl der Jobs in jedem Staat zu informieren
queue . inactiveCount ( function ( err , total ) { // others are activeCount, completeCount, failedCount, delayedCount
if ( total > 100000 ) {
console . log ( 'We need some back pressure here' ) ;
}
} ) ;
Sie können auch einen bestimmten Jobtyp abfragen:
queue . failedCount ( 'my-critical-job' , function ( err , total ) {
if ( total > 10000 ) {
console . log ( 'This is tOoOo bad' ) ;
}
} ) ;
und Iteration über Job -IDs
queue . inactive ( function ( err , ids ) { // others are active, complete, failed, delayed
// you may want to fetch each id to get the Job object out of it...
} ) ;
Der zweite skaliert jedoch nicht auf große Bereitstellungen. Dort können Sie spezifischere statische Job
verwenden:
kue . Job . rangeByState ( 'failed' , 0 , n , 'asc' , function ( err , jobs ) {
// you have an array of maximum n Job objects here
} ) ;
oder
kue . Job . rangeByType ( 'my-job-type' , 'failed' , 0 , n , 'asc' , function ( err , jobs ) {
// you have an array of maximum n Job objects here
} ) ;
Beachten Sie , dass die letzten beiden Methoden in späteren KUE -Versionen geändert werden können.
Wenn Sie in einem Abschnitt "Fehlerbehandlungsabschnitt" oder in keiner Weise aktive Jobs verloren haben, können Sie sich von ihnen erholen, wenn Ihr Prozess neu gestartet wird. Eine blinde Logik wäre, alle festgefahrenen Jobs wieder zu erzählen:
queue . active ( function ( err , ids ) {
ids . forEach ( function ( id ) {
kue . Job . get ( id , function ( err , job ) {
// Your application should check if job is a stuck one
job . inactive ( ) ;
} ) ;
} ) ;
} ) ;
Beachten Sie in einer Cluster -Bereitstellung. Ihre Anwendung sollte sich bewusst sein, dass sie keine gültige Aufgabe haben, die derzeit von anderen Arbeitnehmern nicht bearbeitet wird.
Jobdaten und Suchindizes essen den Redis-Speicherraum, sodass Sie einen Arbeitsplatzprozess in realen Bereitstellungen benötigen. Ihre erste Chance besteht darin, nach Abschluss automatischer Arbeitsplätze zu entfernen.
queue . create ( ... ) . removeOnComplete ( true ) . save ( )
Wenn Sie jedoch schließlich/zeitlich abgeschlossene Stellendaten benötigen, können Sie ein On-Demand-Job-Entfernungs-Skript wie unten einrichten, um die obersten n
abgeschlossenen Jobs zu entfernen:
kue . Job . rangeByState ( 'complete' , 0 , n , 'asc' , function ( err , jobs ) {
jobs . forEach ( function ( job ) {
job . remove ( function ( ) {
console . log ( 'removed ' , job . id ) ;
} ) ;
} ) ;
} ) ;
Beachten Sie , dass Sie genügend Zeit für .remove
-Anrufe für jedes Jobobjekt angeben sollten, um vor dem Ablauf Ihres Prozesses zu vervollständigen
Standardmäßig wird KUE mit den Client -Standardeinstellungen eine Verbindung zu Redis herstellen (Port -Standardeinstellungen zu 6379
, Host -Standardeinstellungen auf 127.0.0.1
, Präfix standardmäßig mit q
). Queue#createQueue(options)
akzeptiert Redis -Verbindungsoptionen in options.redis
-Schlüssel.
var kue = require ( 'kue' ) ;
var q = kue . createQueue ( {
prefix : 'q' ,
redis : {
port : 1234 ,
host : '10.0.50.20' ,
auth : 'password' ,
db : 3 , // if provided select a non-default redis db
options : {
// see https://github.com/mranney/node_redis#rediscreateclient
}
}
} ) ;
prefix
steuert die in Redis verwendeten Schlüsselnamen. Standardmäßig ist dies einfach q
. Das Präfix sollte im Allgemeinen nicht geändert werden, es sei denn, Sie müssen für mehrere Apps eine Redis -Instanz verwenden. Es kann auch nützlich sein, um ein isoliertes Testbett in Ihrer Hauptanwendung bereitzustellen.
Sie können auch die Verbindungsinformationen als URL -Zeichenfolge angeben.
var q = kue . createQueue ( {
redis : 'redis://example.com:1234?redis_option=value&redis_option=value'
} ) ;
Da NODE_REDIS UNIX -Domain -Sockets unterstützt, können Sie Kue auch sagen, dass dies dies tut. Siehe Unix-Domain-Socket für Ihre Redis-Server-Konfiguration.
var kue = require ( 'kue' ) ;
var q = kue . createQueue ( {
prefix : 'q' ,
redis : {
socket : '/data/sockets/redis.sock' ,
auth : 'password' ,
options : {
// see https://github.com/mranney/node_redis#rediscreateclient
}
}
} ) ;
Jede Node.js -Redis -Client -Bibliothek, die an die NODE_REDIS -API entspricht (oder wenn sie angepasst ist), kann in KUE injiziert werden. Sie sollten nur eine createClientFactory
-Funktion als Redis -Verbindungsfabrik bereitstellen, anstatt NODE_REDIS -Verbindungsoptionen bereitzustellen.
Im Folgenden finden Sie einen Beispielcode, mit dem Redis-Sentinel eine Verbindung zum Redis-Sentinel für automatische Master-/Slave-Failover herstellen kann.
var kue = require ( 'kue' ) ;
var Sentinel = require ( 'redis-sentinel' ) ;
var endpoints = [
{ host : '192.168.1.10' , port : 6379 } ,
{ host : '192.168.1.11' , port : 6379 }
] ;
var opts = options || { } ; // Standard node_redis client options
var masterName = 'mymaster' ;
var sentinel = Sentinel . Sentinel ( endpoints ) ;
var q = kue . createQueue ( {
redis : {
createClientFactory : function ( ) {
return sentinel . createClient ( masterName , opts ) ;
}
}
} ) ;
Beachten Sie , dass alle <0.8.x
-Clientcodes neu gestaltet werden sollten, um 0.8.x
-Optionen an die Übergabe von Redis redis#createClient
Queue#createQueue
übergeben.
var Redis = require ( 'ioredis' ) ;
var kue = require ( 'kue' ) ;
// using https://github.com/72squared/vagrant-redis-cluster
var queue = kue . createQueue ( {
redis : {
createClientFactory : function ( ) {
return new Redis . Cluster ( [ {
port : 7000
} , {
port : 7001
} ] ) ;
}
}
} ) ;
Die Benutzeroberfläche ist eine kleine Expressanwendung. Ein Skript wird in bin/
zum Ausführen der Schnittstelle als eigenständige Anwendung mit Standardeinstellungen bereitgestellt. Sie können Optionen für Port, Redis-URL und Präfix übergeben. Zum Beispiel:
node_modules/kue/bin/kue-dashboard -p 3050 -r redis://127.0.0.1:3000 -q prefix
Sie können es auch aus einer anderen Anwendung abfeuern:
var kue = require ( 'kue' ) ;
kue . createQueue ( ... ) ;
kue . app . listen ( 3000 ) ;
Der Titel ist standardmäßig "kue", um diesen Aufruf zu ändern:
kue . app . set ( 'title' , 'My Application' ) ;
Beachten Sie , dass kue.createQueue(...)
wenn Sie nicht-default-kue-Optionen verwenden, vor dem Zugriff auf kue.app
aufgerufen werden muss.
Sie können auch die KUE-UI-Webschnittstelle verwenden, die von Arnaud Bénard beigetragen wird
Zusammen mit der UI Kue enthüllt auch eine JSON -API, die von der Benutzeroberfläche verwendet wird.
Abfrageberichte, zum Beispiel "Get /Job /Suche? Q = Avi Video":
[ "5" , "7" , "10" ]
Standardmäßig indiziert KUE das gesamte Jobdatenobjekt für die Suche. Dies kann jedoch über den Aufruf von Job#searchKeys
angepasst werden, um KUE zu mitteilen, welche Schlüssel für Jobdaten für den Erstellen von Index für:
var kue = require ( 'kue' ) ;
queue = kue . createQueue ( ) ;
queue . create ( 'email' , {
title : 'welcome email for tj'
, to : '[email protected]'
, template : 'welcome-email'
} ) . searchKeys ( [ 'to' , 'title' ] ) . save ( ) ;
Die Suchfunktion wird standardmäßig von KUE >=0.9.0
ausgeschaltet. Lesen Sie hier mehr darüber. Sie sollten Suchindizes aktivieren und in Ihren Abhängigkeiten Rots hinzufügen, wenn Sie:
var kue = require ( 'kue' ) ;
q = kue . createQueue ( {
disableSearch : false
} ) ;
npm install reds --save
Derzeit reagiert mit staatlichen Zählungen und der Aktivitätszeit der Arbeitnehmer in Millisekunden:
{ "inactiveCount" : 4 , "completeCount" : 69 , "activeCount" : 2 , "failedCount" : 0 , "workTime" : 20892 }
Holen Sie sich einen Job von :id
:
{ "id" : "3" , "type" : "email" , "data" : { "title" : "welcome email for tj" , "to" : "[email protected]" , "template" : "welcome-email" } , "priority" : - 10 , "progress" : "100" , "state" : "complete" , "attempts" : null , "created_at" : "1309973155248" , "updated_at" : "1309973155248" , "duration" : "15002" }
Holen Sie sich Job :id
Protokoll:
[ 'foo' , 'bar' , 'baz' ]
Holen Sie sich Jobs mit dem angegebenen Bereich :from
:to
zum Beispiel "/jobs/0..2", wobei :order
"ASC" oder "Desc" sein kann:
[ { "id" : "12" , "type" : "email" , "data" : { "title" : "welcome email for tj" , "to" : "[email protected]" , "template" : "welcome-email" } , "priority" : - 10 , "progress" : 0 , "state" : "active" , "attempts" : null , "created_at" : "1309973299293" , "updated_at" : "1309973299293" } , { "id" : "130" , "type" : "email" , "data" : { "title" : "welcome email for tj" , "to" : "[email protected]" , "template" : "welcome-email" } , "priority" : - 10 , "progress" : 0 , "state" : "active" , "attempts" : null , "created_at" : "1309975157291" , "updated_at" : "1309975157291" } ]
Gleich wie oben, Einschränkung durch :state
, der eines von: gehört:
- active
- inactive
- failed
- complete
Gleich wie oben, wie beschränkt auf :type
und :state
.
Job löschen :id
:
$ curl -X DELETE http://local:3000/job/2
{"message":"job 2 removed"}
Erstellen Sie einen Job:
$ curl -H "Content-Type: application/json" -X POST -d
'{
"type": "email",
"data": {
"title": "welcome email for tj",
"to": "[email protected]",
"template": "welcome-email"
},
"options" : {
"attempts": 5,
"priority": "high"
}
}' http://localhost:3000/job
{"message": "job created", "id": 3}
Sie können mehrere Jobs gleichzeitig erstellen, indem Sie ein Array verabschieden. In diesem Fall wird auch die Antwort ein Array sein, in dem die Bestellung beibehalten wird:
$ curl -H "Content-Type: application/json" -X POST -d
'[{
"type": "email",
"data": {
"title": "welcome email for tj",
"to": "[email protected]",
"template": "welcome-email"
},
"options" : {
"attempts": 5,
"priority": "high"
}
},
{
"type": "email",
"data": {
"title": "followup email for tj",
"to": "[email protected]",
"template": "followup-email"
},
"options" : {
"delay": 86400,
"attempts": 5,
"priority": "high"
}
}]' http://localhost:3000/job
[
{"message": "job created", "id": 4},
{"message": "job created", "id": 5}
]
HINWEIS: Wenn Sie mehrere Jobs in Masse einfügen, wird KUE die verbleibenden Jobs in Ordnung weiterhin in Ordnung bringen. Das Antwortarray enthält die IDs der Aufgaben erfolgreich, und jedes fehlgeschlagene Element ist ein Objekt, das den Fehler beschreibt: {"error": "error reason"}
.
Das folgende Beispiel zeigt, wie Sie Cluster verwenden können, um die Auftragsverarbeitungslast über CPUs zu verbreiten. Weitere detailliertere Beispiele zur Verwendung von Cluster -Moduldokumentation finden Sie in der Verwendung.
Wenn Cluster .isMaster
die Datei im Kontext des Master -Prozesses ausgeführt wird, können Sie in diesem Fall Aufgaben ausführen, die Sie nur einmal wünschen, z. B. mit Kue gestartet werden. Die Logik im else
-Block wird pro Arbeiter ausgeführt.
var kue = require ( 'kue' )
, cluster = require ( 'cluster' )
, queue = kue . createQueue ( ) ;
var clusterWorkerSize = require ( 'os' ) . cpus ( ) . length ;
if ( cluster . isMaster ) {
kue . app . listen ( 3000 ) ;
for ( var i = 0 ; i < clusterWorkerSize ; i ++ ) {
cluster . fork ( ) ;
}
} else {
queue . process ( 'email' , 10 , function ( job , done ) {
var pending = 5
, total = pending ;
var interval = setInterval ( function ( ) {
job . log ( 'sending!' ) ;
job . progress ( total - pending , total ) ;
-- pending || done ( ) ;
pending || clearInterval ( interval ) ;
} , 1000 ) ;
} ) ;
}
Dadurch werden jeweils eine email
-Jobprozessor (Arbeiter) pro einzelner Ihrer Maschinen -CPU -Kerne erstellt. Mit jedem können Sie 10 gleichzeitige E -Mail -Jobs übernehmen, was zu insgesamt 10 * N
nähigen E -Mail -Jobs führt, die in Ihrem N
-Kerngerät verarbeitet werden.
Wenn Sie jetzt die Benutzeroberfläche von Kue im Browser besuchen, werden Sie feststellen, dass Jobs N
schneller verarbeitet werden! (Wenn Sie N
-Kerne haben).
Durch die Verwendung der App-Montage können Sie die Webanwendung anpassen, TLS aktivieren oder zusätzliche Middleware wie basic-auth-connect
hinzufügen.
$ npm install --save basic-auth-connect
var basicAuth = require ( 'basic-auth-connect' ) ;
var app = express . createServer ( { ... tls options ... } ) ;
app . use ( basicAuth ( 'foo' , 'bar' ) ) ;
app . use ( kue . app ) ;
app . listen ( 3000 ) ;
Aktivieren Sie den Testmodus, um alle Jobs in ein jobs
-Array zu bringen. Machen Sie Behauptungen gegen die Jobs in diesem Array, um sicherzustellen, dass der zu testende Code korrekt auf Arbeitsplätze geht.
queue = require ( 'kue' ) . createQueue ( ) ;
before ( function ( ) {
queue . testMode . enter ( ) ;
} ) ;
afterEach ( function ( ) {
queue . testMode . clear ( ) ;
} ) ;
after ( function ( ) {
queue . testMode . exit ( )
} ) ;
it ( 'does something cool' , function ( ) {
queue . createJob ( 'myJob' , { foo : 'bar' } ) . save ( ) ;
queue . createJob ( 'anotherJob' , { baz : 'bip' } ) . save ( ) ;
expect ( queue . testMode . jobs . length ) . to . equal ( 2 ) ;
expect ( queue . testMode . jobs [ 0 ] . type ) . to . equal ( 'myJob' ) ;
expect ( queue . testMode . jobs [ 0 ] . data ) . to . eql ( { foo : 'bar' } ) ;
} ) ;
WICHTIG: Standardmäßig werden Jobs nicht verarbeitet, wenn sie im Testmodus erstellt werden. Sie können die Bearbeitung von Arbeit
before ( function ( ) {
queue . testMode . enter ( true ) ;
} ) ;
Wir lieben Beiträge!
Befolgen Sie beim Beitrag die einfachen Regeln:
(Die MIT -Lizenz)
Copyright (c) 2011 Learnboost <[email protected]>
Die Erlaubnis wird hiermit kostenlos an eine Person erteilt, die eine Kopie dieser Software und zugehörigen Dokumentationsdateien (der 'Software') erhält, um die Software ohne Einschränkung zu behandeln, einschließlich ohne Einschränkung der Rechte zu verwenden, zu kopieren, zu modifizieren, zu verschmelzen, verschmelzen , veröffentlichen, vertreiben, unterlizenzieren und/oder Kopien der Software verkaufen und Personen, denen die Software dazu bereitgestellt wird, unter den folgenden Bedingungen ermöglicht:
Die oben genannte Copyright -Mitteilung und diese Erlaubnisbekanntmachung müssen in alle Kopien oder wesentlichen Teile der Software enthalten sein.
Die Software wird "wie ist" ohne Garantie jeglicher Art, ausdrücklich oder stillschweigend bereitgestellt, einschließlich, aber nicht beschränkt auf die Gewährleistung der Handelsfähigkeit, die Eignung für einen bestimmten Zweck und die Nichtverletzung. In keinem Fall haftet die Autoren oder Urheberrechtsinhaber für Ansprüche, Schäden oder andere Haftungen, sei es in einer Vertragsklage, unerlaubter Handlung oder anderweitig, aus oder im Zusammenhang mit der Software oder anderen Geschäften in der SOFTWARE.