Intégration de Django avec RQ, une bibliothèque de files d'attente Python basée sur Redis. Django-RQ est une application simple qui vous permet de configurer vos files d'attente dans settings.py
de Django et de les utiliser facilement dans votre projet.
Si vous trouvez django-rq
utile, pensez à soutenir son développement via Tidelift.
django-rq
(ou téléchargez depuis PyPI) : pip install django - rq
django_rq
à INSTALLED_APPS
dans settings.py
: INSTALLED_APPS = (
# other apps
"django_rq" ,
)
settings.py
de Django : RQ_QUEUES = {
'default' : {
'HOST' : 'localhost' ,
'PORT' : 6379 ,
'DB' : 0 ,
'USERNAME' : 'some-user' ,
'PASSWORD' : 'some-password' ,
'DEFAULT_TIMEOUT' : 360 ,
'REDIS_CLIENT_KWARGS' : { # Eventual additional Redis connection arguments
'ssl_cert_reqs' : None ,
},
},
'with-sentinel' : {
'SENTINELS' : [( 'localhost' , 26736 ), ( 'localhost' , 26737 )],
'MASTER_NAME' : 'redismaster' ,
'DB' : 0 ,
# Redis username/password
'USERNAME' : 'redis-user' ,
'PASSWORD' : 'secret' ,
'SOCKET_TIMEOUT' : 0.3 ,
'CONNECTION_KWARGS' : { # Eventual additional Redis connection arguments
'ssl' : True
},
'SENTINEL_KWARGS' : { # Eventual Sentinel connection arguments
# If Sentinel also has auth, username/password can be passed here
'username' : 'sentinel-user' ,
'password' : 'secret' ,
},
},
'high' : {
'URL' : os . getenv ( 'REDISTOGO_URL' , 'redis://localhost:6379/0' ), # If you're on Heroku
'DEFAULT_TIMEOUT' : 500 ,
},
'low' : {
'HOST' : 'localhost' ,
'PORT' : 6379 ,
'DB' : 0 ,
}
}
RQ_EXCEPTION_HANDLERS = [ 'path.to.my.handler' ] # If you need custom exception handlers
django_rq.urls
dans votre urls.py
: urlpatterns += [
path ( 'django-rq/' , include ( 'django_rq.urls' ))
]
Django-RQ vous permet de placer facilement des tâches dans n'importe laquelle des files d'attente définies dans settings.py
. Il est livré avec quelques fonctions utilitaires :
enqueue
- pousse un travail vers la file d'attente default
: import django_rq
django_rq . enqueue ( func , foo , bar = baz )
get_queue
- renvoie une instance Queue
. import django_rq
queue = django_rq . get_queue ( 'high' )
queue . enqueue ( func , foo , bar = baz )
En plus de l'argument name
, get_queue
accepte également les arguments default_timeout
, is_async
, autocommit
, connection
et queue_class
. Par exemple:
queue = django_rq . get_queue ( 'default' , autocommit = True , is_async = True , default_timeout = 360 )
queue . enqueue ( func , foo , bar = baz )
Vous pouvez fournir votre propre objet de connexion Redis singleton à cette fonction afin qu'elle ne crée pas de nouvel objet de connexion pour chaque définition de file d'attente. Cela vous aidera à limiter le nombre de connexions au serveur Redis. Par exemple:
import django_rq
import redis
redis_cursor = redis . StrictRedis ( host = '' , port = '' , db = '' , password = '' )
high_queue = django_rq . get_queue ( 'high' , connection = redis_cursor )
low_queue = django_rq . get_queue ( 'low' , connection = redis_cursor )
get_connection
- accepte un seul argument de nom de file d'attente (la valeur par défaut est "default") et renvoie une connexion au serveur Redis de la file d'attente : import django_rq
redis_conn = django_rq . get_connection ( 'high' )
get_worker
- accepte les noms de file d'attente facultatifs et renvoie une nouvelle instance de RQ Worker
pour les files d'attente spécifiées (ou la file d'attente default
) : import django_rq
worker = django_rq . get_worker () # Returns a worker for "default" queue
worker . work ()
worker = django_rq . get_worker ( 'low' , 'high' ) # Returns a worker for "low" and "high"
Pour transformer facilement un appelable en tâche RQ, vous pouvez également utiliser le décorateur @job
fourni avec django_rq
:
from django_rq import job
@ job
def long_running_func ():
pass
long_running_func . delay () # Enqueue function in "default" queue
@ job ( 'high' )
def long_running_func ():
pass
long_running_func . delay () # Enqueue function in "high" queue
Vous pouvez transmettre tous les arguments acceptés par le décorateur de travail de RQ :
@ job ( 'default' , timeout = 3600 )
def long_running_func ():
pass
long_running_func . delay () # Enqueue function with a timeout of 3600 seconds.
Il est possible de spécifier la valeur par défaut pour l'argument de mot-clé du décorateur result_ttl
via le paramètre DEFAULT_RESULT_TTL
:
RQ = {
'DEFAULT_RESULT_TTL' : 5000 ,
}
Avec ce paramètre, le décorateur de tâches définira result_ttl
sur 5 000, sauf si cela est spécifié explicitement.
django_rq fournit une commande de gestion qui démarre un travailleur pour chaque file d'attente spécifiée comme arguments :
python manage.py rqworker élevé par défaut faible
Si vous souhaitez exécuter rqworker
en mode rafale, vous pouvez transmettre l'indicateur --burst
:
python manage.py rqworker haute valeur par défaut faible --burst
Si vous devez utiliser des classes de tâches, de tâches ou de files d'attente personnalisées, il est préférable d'utiliser des paramètres globaux (voir Classes de files d'attente personnalisées et Classes de tâches et de tâches personnalisées). Cependant, il est également possible de remplacer ces paramètres avec les options de ligne de commande comme suit.
Pour utiliser une classe de travail personnalisée, vous pouvez transmettre l'indicateur --worker-class
avec le chemin d'accès à votre travailleur :
python manage.py rqworker high default low --worker-class 'path.to.GeventWorker'
Pour utiliser une classe de file d'attente personnalisée, vous pouvez transmettre l'indicateur --queue-class
avec le chemin d'accès à votre classe de file d'attente :
python manage.py rqworker high default low --queue-class 'path.to.CustomQueue'
Pour utiliser une classe de travail personnalisée, fournissez l'indicateur --job-class
.
À partir de la version 2.10, l'exécution du pool de tâches de RQ est également prise en charge :
python manage.py rqworker-pool par défaut faible moyen --num-workers 4
Avec QR 1.2.0. vous pouvez utiliser le planificateur intégré pour vos tâches. Par exemple:
from django_rq . queues import get_queue
queue = get_queue ( 'default' )
job = queue . enqueue_at ( datetime ( 2020 , 10 , 10 ), func )
Si vous utilisez le planificateur intégré, vous devez démarrer les Workers avec la prise en charge du planificateur :
python manage.py rqworker --with-scheduler
Vous pouvez également utiliser RQ Scheduler. Après l'installation, vous pouvez également utiliser la fonction get_scheduler
pour renvoyer une instance Scheduler
pour les files d'attente définies dans RQ_QUEUES
de settings.py. Par exemple:
import django_rq
scheduler = django_rq . get_scheduler ( 'default' )
job = scheduler . enqueue_at ( datetime ( 2020 , 10 , 10 ), func )
Vous pouvez également utiliser la commande de gestion rqscheduler
pour démarrer le planificateur :
python manage.py rqscheduler
Si Django-redis ou Django-redis-cache est installé, vous pouvez demander à Django_rq d'utiliser les mêmes informations de connexion à partir de votre cache Redis. Cela présente deux avantages : c'est DRY et cela profite de toute optimisation qui peut être en cours dans la configuration de votre cache (comme l'utilisation du pool de connexions ou de Hiredis.)
Pour l'utiliser, utilisez un dict avec la clé USE_REDIS_CACHE
pointant vers le nom du cache souhaité dans votre dict RQ_QUEUES
. Il va de soi que le cache choisi doit exister et utiliser le backend Redis. Consultez la documentation de votre package de cache Redis respectif pour les instructions de configuration. Il est également important de souligner que puisque Django-redis-cache ShardedClient
divise le cache sur plusieurs connexions Redis, cela ne fonctionne pas.
Voici un exemple de fragment de paramètres pour Django-redis :
CACHES = {
'redis-cache' : {
'BACKEND' : 'redis_cache.cache.RedisCache' ,
'LOCATION' : 'localhost:6379:1' ,
'OPTIONS' : {
'CLIENT_CLASS' : 'django_redis.client.DefaultClient' ,
'MAX_ENTRIES' : 5000 ,
},
},
}
RQ_QUEUES = {
'high' : {
'USE_REDIS_CACHE' : 'redis-cache' ,
},
'low' : {
'USE_REDIS_CACHE' : 'redis-cache' ,
},
}
Parfois, vous souhaiterez peut-être suspendre RQ pour l'empêcher de traiter de nouvelles tâches. Un exemple classique est celui de la phase initiale d'un script de déploiement ou avant la mise de votre site en mode maintenance. Ceci est particulièrement utile lorsque vous avez des tâches qui durent relativement longtemps et qui pourraient autrement être supprimées de force pendant le déploiement.
La commande suspend empêche les travailleurs de _toutes_ les files d'attente (dans une seule base de données Redis) de récupérer de nouvelles tâches. Cependant, les tâches en cours d'exécution se poursuivront jusqu'à leur fin.
# Suspend indefinitely
python manage.py rqsuspend
# Suspend for a specific duration (in seconds) then automatically
# resume work again.
python manage.py rqsuspend -d 600
# Resume work again.
python manage.py rqresume
django_rq
fournit également un tableau de bord pour surveiller l'état de vos files d'attente sur /django-rq/
(ou quelle que soit l'URL que vous avez définie dans votre urls.py
lors de l'installation.
Vous pouvez également ajouter un lien vers ce lien de tableau de bord dans /admin
en ajoutant RQ_SHOW_ADMIN_LINK = True
dans settings.py
. Attention cependant, cela remplacera le modèle d'administration par défaut et pourrait donc interférer avec d'autres applications qui modifient le modèle d'administration par défaut.
Ces statistiques sont également disponibles au format JSON via /django-rq/stats.json
, accessible aux membres du personnel. Si vous devez accéder à cette vue via d'autres clients HTTP (à des fins de surveillance), vous pouvez définir RQ_API_TOKEN
et y accéder via /django-rq/stats.json/<API_TOKEN>
.
Remarque : Les statistiques des tâches planifiées affichent les tâches du planificateur intégré RQ, et non du planificateur RQ facultatif.
De plus, ces statistiques sont également accessibles depuis la ligne de commande.
python manage.py rqstats
python manage.py rqstats --interval=1 # Refreshes every second
python manage.py rqstats --json # Output as JSON
python manage.py rqstats --yaml # Output as YAML
Sentry doit être configuré dans Django settings.py
comme décrit dans la documentation Sentry.
Vous pouvez remplacer la configuration par défaut de Django Sentry lors de l'exécution de la commande rqworker
en passant l'option sentry-dsn
:
./manage.py rqworker --sentry-dsn=https://*****@sentry.io/222222
Cela remplacera toute configuration Django existante et réinitialisera Sentry, en définissant les options Sentry suivantes :
{
'debug' : options . get ( 'sentry_debug' ),
'ca_certs' : options . get ( 'sentry_ca_certs' ),
'integrations' : [ RedisIntegration (), RqIntegration (), DjangoIntegration ()]
}
RQ utilise logging
de Python, cela signifie que vous pouvez facilement configurer le mécanisme de journalisation de rqworker
dans settings.py
de Django. Par exemple:
LOGGING = {
"version" : 1 ,
"disable_existing_loggers" : False ,
"formatters" : {
"rq_console" : {
"format" : "%(asctime)s %(message)s" ,
"datefmt" : "%H:%M:%S" ,
},
},
"handlers" : {
"rq_console" : {
"level" : "DEBUG" ,
"class" : "rq.logutils.ColorizingStreamHandler" ,
"formatter" : "rq_console" ,
"exclude" : [ "%(asctime)s" ],
},
},
'loggers' : {
"rq.worker" : {
"handlers" : [ "rq_console" , "sentry" ],
"level" : "DEBUG"
},
}
}
Par défaut, chaque file d'attente utilisera la classe DjangoRQ
. Si vous souhaitez utiliser une classe de file d'attente personnalisée, vous pouvez le faire en ajoutant une option QUEUE_CLASS
par file d'attente dans RQ_QUEUES
:
RQ_QUEUES = {
'default' : {
'HOST' : 'localhost' ,
'PORT' : 6379 ,
'DB' : 0 ,
'QUEUE_CLASS' : 'module.path.CustomClass' ,
}
}
ou vous pouvez spécifier DjangoRQ
pour utiliser une classe personnalisée pour toutes vos files d'attente dans les paramètres RQ
:
RQ = {
'QUEUE_CLASS' : 'module.path.CustomClass' ,
}
Les classes de files d'attente personnalisées doivent hériter de django_rq.queues.DjangoRQ
.
Si vous utilisez plusieurs classes de file d'attente (non recommandé), veillez à exécuter uniquement les nœuds de calcul sur des files d'attente ayant la même classe de file d'attente. Par exemple, si vous avez deux files d'attente définies dans RQ_QUEUES
et que l'une d'elles a une classe personnalisée spécifiée, vous devrez exécuter au moins deux travailleurs distincts pour chaque file d'attente.
De la même manière que les classes de file d'attente personnalisées, les classes de travail et de travail personnalisées globales peuvent être configurées à l'aide des paramètres JOB_CLASS
et WORKER_CLASS
:
RQ = {
'JOB_CLASS' : 'module.path.CustomJobClass' ,
'WORKER_CLASS' : 'module.path.CustomWorkerClass' ,
}
La classe de travail personnalisée doit hériter de rq.job.Job
. Il sera utilisé pour tous les travaux s'il est configuré.
La classe de travail personnalisée doit hériter de rq.worker.Worker
. Il sera utilisé pour exécuter tous les travailleurs, sauf s'il est remplacé par l'option worker-class
de la commande de gestion rqworker
.
Pour un processus de test plus simple, vous pouvez exécuter un travailleur de manière synchrone de cette manière :
from django . test import TestCase
from django_rq import get_worker
class MyTest ( TestCase ):
def test_something_that_creates_jobs ( self ):
... # Stuff that init jobs.
get_worker (). work ( burst = True ) # Processes all jobs then stop.
... # Asserts that the job stuff is done.
Vous pouvez définir l'option ASYNC
sur False
pour que l'opération synchrone soit la valeur par défaut pour une file d'attente donnée. Cela entraînera l'exécution des tâches immédiatement et sur le même thread au moment où elles sont distribuées, ce qui est utile pour les tests et le débogage. Par exemple, vous pouvez ajouter les éléments suivants après la configuration de la file d'attente dans votre fichier de paramètres :
# ... Logic to set DEBUG and TESTING settings to True or False ...
# ... Regular RQ_QUEUES setup code ...
if DEBUG or TESTING :
for queueConfig in RQ_QUEUES . values ():
queueConfig [ 'ASYNC' ] = False
Notez que la définition explicite du paramètre is_async
lors de l’appel get_queue
remplacera ce paramètre.
Pour exécuter la suite de tests de django_rq
:
`quel django-admin` teste django_rq --settings=django_rq.tests.settings --pythonpath=.
Créez un service rqworker qui exécute les files d'attente haute, par défaut et basse.
sudo vi /etc/systemd/system/rqworker.service
[Unit]
Description=Django-RQ Worker
After=network.target
[Service]
WorkingDirectory= << path_to_your_project_folder>>
ExecStart=/home/ubuntu/.virtualenv/<<your_virtualenv>>/bin/python
<<path_to_your_project_folder>>/manage.py
rqworker high default low
[Install]
WantedBy=multi-user.target
Activer et démarrer le service
sudo systemctl enable rqworker
sudo systemctl start rqworker
Ajoutez Django-rq à votre fichier exigences.txt avec :
pip freeze > requirements.txt
Mettez à jour votre profil pour :
web: gunicorn --pythonpath= " $PWD /your_app_name " config.wsgi:application
worker: python your_app_name/manage.py rqworker high default low
Validez et redéployez. Ajoutez ensuite votre nouveau travailleur avec :
heroku scale worker=1
Voir CHANGELOG.md.