Django-Integration mit RQ, einer Redis-basierten Python-Warteschlangenbibliothek. Django-RQ ist eine einfache App, mit der Sie Ihre Warteschlangen in settings.py
von Django konfigurieren und problemlos in Ihrem Projekt verwenden können.
Wenn Sie django-rq
nützlich finden, denken Sie bitte darüber nach, seine Entwicklung über Tidelift zu unterstützen.
django-rq
(oder laden Sie es von PyPI herunter): pip install django - rq
django_rq
zu INSTALLED_APPS
in settings.py
hinzu: INSTALLED_APPS = (
# other apps
"django_rq" ,
)
settings.py
: RQ_QUEUES = {
'default' : {
'HOST' : 'localhost' ,
'PORT' : 6379 ,
'DB' : 0 ,
'USERNAME' : 'some-user' ,
'PASSWORD' : 'some-password' ,
'DEFAULT_TIMEOUT' : 360 ,
'REDIS_CLIENT_KWARGS' : { # Eventual additional Redis connection arguments
'ssl_cert_reqs' : None ,
},
},
'with-sentinel' : {
'SENTINELS' : [( 'localhost' , 26736 ), ( 'localhost' , 26737 )],
'MASTER_NAME' : 'redismaster' ,
'DB' : 0 ,
# Redis username/password
'USERNAME' : 'redis-user' ,
'PASSWORD' : 'secret' ,
'SOCKET_TIMEOUT' : 0.3 ,
'CONNECTION_KWARGS' : { # Eventual additional Redis connection arguments
'ssl' : True
},
'SENTINEL_KWARGS' : { # Eventual Sentinel connection arguments
# If Sentinel also has auth, username/password can be passed here
'username' : 'sentinel-user' ,
'password' : 'secret' ,
},
},
'high' : {
'URL' : os . getenv ( 'REDISTOGO_URL' , 'redis://localhost:6379/0' ), # If you're on Heroku
'DEFAULT_TIMEOUT' : 500 ,
},
'low' : {
'HOST' : 'localhost' ,
'PORT' : 6379 ,
'DB' : 0 ,
}
}
RQ_EXCEPTION_HANDLERS = [ 'path.to.my.handler' ] # If you need custom exception handlers
django_rq.urls
in Ihre urls.py
ein: urlpatterns += [
path ( 'django-rq/' , include ( 'django_rq.urls' ))
]
Mit Django-RQ können Sie Jobs ganz einfach in jede der in settings.py
definierten Warteschlangen stellen. Es verfügt über einige Hilfsfunktionen:
enqueue
– einen Job in die default
verschieben: import django_rq
django_rq . enqueue ( func , foo , bar = baz )
get_queue
– gibt eine Queue
Instanz zurück. import django_rq
queue = django_rq . get_queue ( 'high' )
queue . enqueue ( func , foo , bar = baz )
Zusätzlich zum name
akzeptiert get_queue
auch die Argumente default_timeout
, is_async
, autocommit
, connection
und queue_class
. Zum Beispiel:
queue = django_rq . get_queue ( 'default' , autocommit = True , is_async = True , default_timeout = 360 )
queue . enqueue ( func , foo , bar = baz )
Sie können dieser Funktion Ihr eigenes Singleton-Redis-Verbindungsobjekt bereitstellen, sodass nicht für jede Warteschlangendefinition ein neues Verbindungsobjekt erstellt wird. Dadurch können Sie die Anzahl der Verbindungen zum Redis-Server begrenzen. Zum Beispiel:
import django_rq
import redis
redis_cursor = redis . StrictRedis ( host = '' , port = '' , db = '' , password = '' )
high_queue = django_rq . get_queue ( 'high' , connection = redis_cursor )
low_queue = django_rq . get_queue ( 'low' , connection = redis_cursor )
get_connection
– akzeptiert ein einzelnes Warteschlangennamenargument (standardmäßig „default“) und gibt eine Verbindung zum Redis-Server der Warteschlange zurück: import django_rq
redis_conn = django_rq . get_connection ( 'high' )
get_worker
– akzeptiert optionale Warteschlangennamen und gibt eine neue RQ- Worker
Instanz für angegebene Warteschlangen (oder default
) zurück: import django_rq
worker = django_rq . get_worker () # Returns a worker for "default" queue
worker . work ()
worker = django_rq . get_worker ( 'low' , 'high' ) # Returns a worker for "low" and "high"
Um einen Callable einfach in eine RQ-Aufgabe umzuwandeln, können Sie auch den @job
decorator verwenden, der mit django_rq
geliefert wird:
from django_rq import job
@ job
def long_running_func ():
pass
long_running_func . delay () # Enqueue function in "default" queue
@ job ( 'high' )
def long_running_func ():
pass
long_running_func . delay () # Enqueue function in "high" queue
Sie können alle Argumente übergeben, die der Job-Dekorator von RQ akzeptiert:
@ job ( 'default' , timeout = 3600 )
def long_running_func ():
pass
long_running_func . delay () # Enqueue function with a timeout of 3600 seconds.
Es ist möglich, den Standardwert für das Schlüsselwortargument result_ttl
decorator über die Einstellung DEFAULT_RESULT_TTL
anzugeben:
RQ = {
'DEFAULT_RESULT_TTL' : 5000 ,
}
Mit dieser Einstellung setzt der Job Decorator result_ttl
auf 5000, sofern dies nicht explizit angegeben wird.
django_rq stellt einen Verwaltungsbefehl bereit, der einen Worker für jede als Argumente angegebene Warteschlange startet:
Python manage.py rqworker hoch Standard niedrig
Wenn Sie rqworker
im Burst-Modus ausführen möchten, können Sie das Flag --burst
übergeben:
Python manage.py rqworker hoch Standard niedrig --burst
Wenn Sie benutzerdefinierte Worker-, Job- oder Warteschlangenklassen verwenden müssen, verwenden Sie am besten globale Einstellungen (siehe Benutzerdefinierte Warteschlangenklassen und Benutzerdefinierte Job- und Worker-Klassen). Es ist jedoch auch möglich, solche Einstellungen wie folgt mit Befehlszeilenoptionen zu überschreiben.
Um eine benutzerdefinierte Worker-Klasse zu verwenden, können Sie das Flag --worker-class
mit dem Pfad zu Ihrem Worker übergeben:
python manage.py rqworker hoch Standard niedrig --worker-class 'path.to.GeventWorker'
Um eine benutzerdefinierte Warteschlangenklasse zu verwenden, können Sie das Flag --queue-class
mit dem Pfad zu Ihrer Warteschlangenklasse übergeben:
Python manage.py rqworker hoch Standard niedrig --queue-class 'path.to.CustomQueue'
Um eine benutzerdefinierte Jobklasse zu verwenden, stellen Sie das Flag --job-class
bereit.
Ab Version 2.10 wird auch die Ausführung des Worker-Pools von RQ unterstützt:
python manage.py rqworker-pool Standard niedrig mittel --num-workers 4
Mit RQ 1.2.0. Sie können den integrierten Planer für Ihre Jobs verwenden. Zum Beispiel:
from django_rq . queues import get_queue
queue = get_queue ( 'default' )
job = queue . enqueue_at ( datetime ( 2020 , 10 , 10 ), func )
Wenn Sie den integrierten Scheduler verwenden, müssen Sie Worker mit Scheduler-Unterstützung starten:
python manage.py rqworker --with-scheduler
Alternativ können Sie den RQ Scheduler verwenden. Nach der Installation können Sie auch die Funktion get_scheduler
verwenden, um eine Scheduler
Instanz für Warteschlangen zurückzugeben, die in RQ_QUEUES
von Settings.py definiert sind. Zum Beispiel:
import django_rq
scheduler = django_rq . get_scheduler ( 'default' )
job = scheduler . enqueue_at ( datetime ( 2020 , 10 , 10 ), func )
Sie können den Scheduler auch mit dem Verwaltungsbefehl rqscheduler
starten:
python manage.py rqscheduler
Wenn Sie django-redis oder django-redis-cache installiert haben, können Sie django_rq anweisen, dieselben Verbindungsinformationen aus Ihrem Redis-Cache zu verwenden. Dies hat zwei Vorteile: Es ist trocken und nutzt alle Optimierungen, die möglicherweise in Ihrem Cache-Setup durchgeführt werden (z. B. die Verwendung von Verbindungspooling oder Hiredis).
Um es zu konfigurieren, verwenden Sie ein Diktat mit dem Schlüssel USE_REDIS_CACHE
der auf den Namen des gewünschten Caches in Ihrem RQ_QUEUES
Dikt zeigt. Es versteht sich von selbst, dass der gewählte Cache vorhanden sein und das Redis-Backend verwenden muss. Konfigurationsanweisungen finden Sie in der Dokumentation Ihres jeweiligen Redis-Cache-Pakets. Es ist auch wichtig darauf hinzuweisen, dass der django-redis-cache ShardedClient
den Cache auf mehrere Redis-Verbindungen aufteilt und daher nicht funktioniert.
Hier ist ein Beispiel-Einstellungsfragment für Django-Redis:
CACHES = {
'redis-cache' : {
'BACKEND' : 'redis_cache.cache.RedisCache' ,
'LOCATION' : 'localhost:6379:1' ,
'OPTIONS' : {
'CLIENT_CLASS' : 'django_redis.client.DefaultClient' ,
'MAX_ENTRIES' : 5000 ,
},
},
}
RQ_QUEUES = {
'high' : {
'USE_REDIS_CACHE' : 'redis-cache' ,
},
'low' : {
'USE_REDIS_CACHE' : 'redis-cache' ,
},
}
Manchmal möchten Sie RQ möglicherweise anhalten, um zu verhindern, dass neue Jobs verarbeitet werden. Ein klassisches Beispiel ist die Anfangsphase eines Bereitstellungsskripts oder bevor Sie Ihre Site in den Wartungsmodus versetzen. Dies ist besonders hilfreich, wenn Sie Jobs haben, die relativ lange laufen und andernfalls während der Bereitstellung zwangsweise beendet werden könnten.
Der Suspend-Befehl verhindert, dass Worker in _allen_ Warteschlangen (in einer einzelnen Redis-Datenbank) neue Jobs aufnehmen. Derzeit ausgeführte Jobs werden jedoch bis zum Abschluss fortgesetzt.
# Suspend indefinitely
python manage.py rqsuspend
# Suspend for a specific duration (in seconds) then automatically
# resume work again.
python manage.py rqsuspend -d 600
# Resume work again.
python manage.py rqresume
django_rq
bietet außerdem ein Dashboard zur Überwachung des Status Ihrer Warteschlangen unter /django-rq/
(oder einer beliebigen URL, die Sie während der Installation in Ihrer urls.py
festgelegt haben).
Sie können auch einen Link zu diesem Dashboard-Link in /admin
hinzufügen, indem Sie RQ_SHOW_ADMIN_LINK = True
in settings.py
hinzufügen. Seien Sie jedoch vorsichtig, da dadurch die Standard-Administratorvorlage überschrieben wird, sodass es zu Störungen bei anderen Apps kommen kann, die die Standard-Administratorvorlage ändern.
Diese Statistiken sind auch im JSON-Format über /django-rq/stats.json
verfügbar, das für Mitarbeiter zugänglich ist. Wenn Sie über andere HTTP-Clients auf diese Ansicht zugreifen müssen (zu Überwachungszwecken), können Sie RQ_API_TOKEN
definieren und über /django-rq/stats.json/<API_TOKEN>
darauf zugreifen.
Hinweis: Statistiken über geplante Jobs zeigen Jobs vom integrierten RQ-Scheduler an, nicht vom optionalen RQ-Scheduler.
Darüber hinaus sind diese Statistiken auch über die Befehlszeile zugänglich.
python manage.py rqstats
python manage.py rqstats --interval=1 # Refreshes every second
python manage.py rqstats --json # Output as JSON
python manage.py rqstats --yaml # Output as YAML
Sentry sollte innerhalb der settings.py
konfiguriert werden, wie in den Sentry-Dokumenten beschrieben.
Sie können die Standardkonfiguration von Django Sentry überschreiben, wenn Sie den Befehl rqworker
ausführen, indem Sie die Option sentry-dsn
übergeben:
./manage.py rqworker --sentry-dsn=https://*****@sentry.io/222222
Dadurch werden alle vorhandenen Django-Konfigurationen überschrieben und Sentry neu initialisiert, wobei die folgenden Sentry-Optionen festgelegt werden:
{
'debug' : options . get ( 'sentry_debug' ),
'ca_certs' : options . get ( 'sentry_ca_certs' ),
'integrations' : [ RedisIntegration (), RqIntegration (), DjangoIntegration ()]
}
RQ verwendet logging
von Python. Das bedeutet, dass Sie den Protokollierungsmechanismus von rqworker
einfach in settings.py
von Django konfigurieren können. Zum Beispiel:
LOGGING = {
"version" : 1 ,
"disable_existing_loggers" : False ,
"formatters" : {
"rq_console" : {
"format" : "%(asctime)s %(message)s" ,
"datefmt" : "%H:%M:%S" ,
},
},
"handlers" : {
"rq_console" : {
"level" : "DEBUG" ,
"class" : "rq.logutils.ColorizingStreamHandler" ,
"formatter" : "rq_console" ,
"exclude" : [ "%(asctime)s" ],
},
},
'loggers' : {
"rq.worker" : {
"handlers" : [ "rq_console" , "sentry" ],
"level" : "DEBUG"
},
}
}
Standardmäßig verwendet jede Warteschlange DjangoRQ
Klasse. Wenn Sie eine benutzerdefinierte Warteschlangenklasse verwenden möchten, können Sie dies tun, indem Sie in RQ_QUEUES
eine QUEUE_CLASS
Option pro Warteschlange hinzufügen:
RQ_QUEUES = {
'default' : {
'HOST' : 'localhost' ,
'PORT' : 6379 ,
'DB' : 0 ,
'QUEUE_CLASS' : 'module.path.CustomClass' ,
}
}
oder Sie können DjangoRQ
in RQ
-Einstellungen angeben, um eine benutzerdefinierte Klasse für alle Ihre Warteschlangen zu verwenden:
RQ = {
'QUEUE_CLASS' : 'module.path.CustomClass' ,
}
Benutzerdefinierte Warteschlangenklassen sollten von django_rq.queues.DjangoRQ
erben.
Wenn Sie mehr als eine Warteschlangenklasse verwenden (nicht empfohlen), stellen Sie sicher, dass Sie Worker nur in Warteschlangen mit derselben Warteschlangenklasse ausführen. Wenn Sie beispielsweise zwei Warteschlangen in RQ_QUEUES
definiert haben und für eine eine benutzerdefinierte Klasse angegeben ist, müssten Sie für jede Warteschlange mindestens zwei separate Worker ausführen.
Ähnlich wie benutzerdefinierte Warteschlangenklassen können globale benutzerdefinierte Job- und Worker-Klassen mithilfe der Einstellungen JOB_CLASS
und WORKER_CLASS
konfiguriert werden:
RQ = {
'JOB_CLASS' : 'module.path.CustomJobClass' ,
'WORKER_CLASS' : 'module.path.CustomWorkerClass' ,
}
Die benutzerdefinierte Jobklasse sollte von rq.job.Job
erben. Es wird für alle Jobs verwendet, sofern es konfiguriert ist.
Die benutzerdefinierte Worker-Klasse sollte von rq.worker.Worker
erben. Es wird zum Ausführen aller Worker verwendet, sofern es nicht durch die worker-class
Klassenoption rqworker
Verwaltungsbefehls überschrieben wird.
Für einen einfacheren Testprozess können Sie einen Worker auf diese Weise synchron ausführen:
from django . test import TestCase
from django_rq import get_worker
class MyTest ( TestCase ):
def test_something_that_creates_jobs ( self ):
... # Stuff that init jobs.
get_worker (). work ( burst = True ) # Processes all jobs then stop.
... # Asserts that the job stuff is done.
Sie können die Option ASYNC
auf False
setzen, um den synchronen Betrieb zum Standard für eine bestimmte Warteschlange zu machen. Dadurch werden Jobs sofort und im selben Thread ausgeführt, in dem sie versendet werden, was zum Testen und Debuggen nützlich ist. Beispielsweise könnten Sie nach der Warteschlangenkonfiguration in Ihrer Einstellungsdatei Folgendes hinzufügen:
# ... Logic to set DEBUG and TESTING settings to True or False ...
# ... Regular RQ_QUEUES setup code ...
if DEBUG or TESTING :
for queueConfig in RQ_QUEUES . values ():
queueConfig [ 'ASYNC' ] = False
Beachten Sie, dass das explizite Festlegen des Parameters is_async
beim Aufruf von get_queue
diese Einstellung überschreibt.
So führen Sie die Testsuite von django_rq
aus:
`which django-admin` test django_rq --settings=django_rq.tests.settings --pythonpath=.
Erstellen Sie einen Rqworker-Dienst, der die Warteschlangen „Hoch“, „Standard“ und „Niedrig“ ausführt.
sudo vi /etc/systemd/system/rqworker.service
[Unit]
Description=Django-RQ Worker
After=network.target
[Service]
WorkingDirectory= << path_to_your_project_folder>>
ExecStart=/home/ubuntu/.virtualenv/<<your_virtualenv>>/bin/python
<<path_to_your_project_folder>>/manage.py
rqworker high default low
[Install]
WantedBy=multi-user.target
Aktivieren und starten Sie den Dienst
sudo systemctl enable rqworker
sudo systemctl start rqworker
Fügen Sie django-rq zu Ihrer Datei „requirements.txt“ hinzu mit:
pip freeze > requirements.txt
Aktualisieren Sie Ihr Profil auf:
web: gunicorn --pythonpath= " $PWD /your_app_name " config.wsgi:application
worker: python your_app_name/manage.py rqworker high default low
Festschreiben und erneut bereitstellen. Fügen Sie dann Ihren neuen Mitarbeiter hinzu mit:
heroku scale worker=1
Siehe CHANGELOG.md.