Integración de Django con RQ, una biblioteca de colas de Python basada en Redis. Django-RQ es una aplicación sencilla que te permite configurar tus colas en settings.py
de Django y usarlas fácilmente en tu proyecto.
Si encuentra útil django-rq
, considere apoyar su desarrollo a través de Tidelift.
django-rq
(o descárguelo desde PyPI): pip install django - rq
django_rq
a INSTALLED_APPS
en settings.py
: INSTALLED_APPS = (
# other apps
"django_rq" ,
)
settings.py
de Django: RQ_QUEUES = {
'default' : {
'HOST' : 'localhost' ,
'PORT' : 6379 ,
'DB' : 0 ,
'USERNAME' : 'some-user' ,
'PASSWORD' : 'some-password' ,
'DEFAULT_TIMEOUT' : 360 ,
'REDIS_CLIENT_KWARGS' : { # Eventual additional Redis connection arguments
'ssl_cert_reqs' : None ,
},
},
'with-sentinel' : {
'SENTINELS' : [( 'localhost' , 26736 ), ( 'localhost' , 26737 )],
'MASTER_NAME' : 'redismaster' ,
'DB' : 0 ,
# Redis username/password
'USERNAME' : 'redis-user' ,
'PASSWORD' : 'secret' ,
'SOCKET_TIMEOUT' : 0.3 ,
'CONNECTION_KWARGS' : { # Eventual additional Redis connection arguments
'ssl' : True
},
'SENTINEL_KWARGS' : { # Eventual Sentinel connection arguments
# If Sentinel also has auth, username/password can be passed here
'username' : 'sentinel-user' ,
'password' : 'secret' ,
},
},
'high' : {
'URL' : os . getenv ( 'REDISTOGO_URL' , 'redis://localhost:6379/0' ), # If you're on Heroku
'DEFAULT_TIMEOUT' : 500 ,
},
'low' : {
'HOST' : 'localhost' ,
'PORT' : 6379 ,
'DB' : 0 ,
}
}
RQ_EXCEPTION_HANDLERS = [ 'path.to.my.handler' ] # If you need custom exception handlers
django_rq.urls
en tu urls.py
: urlpatterns += [
path ( 'django-rq/' , include ( 'django_rq.urls' ))
]
Django-RQ le permite colocar trabajos fácilmente en cualquiera de las colas definidas en settings.py
. Viene con algunas funciones de utilidad:
enqueue
: envía un trabajo a la cola default
: import django_rq
django_rq . enqueue ( func , foo , bar = baz )
get_queue
: devuelve una instancia Queue
. import django_rq
queue = django_rq . get_queue ( 'high' )
queue . enqueue ( func , foo , bar = baz )
Además del argumento name
, get_queue
también acepta los argumentos default_timeout
, is_async
, autocommit
, connection
y queue_class
. Por ejemplo:
queue = django_rq . get_queue ( 'default' , autocommit = True , is_async = True , default_timeout = 360 )
queue . enqueue ( func , foo , bar = baz )
Puede proporcionar su propio objeto de conexión singleton de Redis a esta función para que no cree un nuevo objeto de conexión para cada definición de cola. Esto le ayudará a limitar la cantidad de conexiones al servidor Redis. Por ejemplo:
import django_rq
import redis
redis_cursor = redis . StrictRedis ( host = '' , port = '' , db = '' , password = '' )
high_queue = django_rq . get_queue ( 'high' , connection = redis_cursor )
low_queue = django_rq . get_queue ( 'low' , connection = redis_cursor )
get_connection
: acepta un único argumento de nombre de cola (el valor predeterminado es "predeterminado") y devuelve una conexión al servidor Redis de la cola: import django_rq
redis_conn = django_rq . get_connection ( 'high' )
get_worker
: acepta nombres de cola opcionales y devuelve una nueva instancia de RQ Worker
para colas específicas (o cola default
): import django_rq
worker = django_rq . get_worker () # Returns a worker for "default" queue
worker . work ()
worker = django_rq . get_worker ( 'low' , 'high' ) # Returns a worker for "low" and "high"
Para convertir fácilmente una tarea invocable en una tarea RQ, también puedes usar el decorador @job
que viene con django_rq
:
from django_rq import job
@ job
def long_running_func ():
pass
long_running_func . delay () # Enqueue function in "default" queue
@ job ( 'high' )
def long_running_func ():
pass
long_running_func . delay () # Enqueue function in "high" queue
Puede pasar cualquier argumento que acepte el decorador de trabajos de RQ:
@ job ( 'default' , timeout = 3600 )
def long_running_func ():
pass
long_running_func . delay () # Enqueue function with a timeout of 3600 seconds.
Es posible especificar el valor predeterminado para el argumento de palabra clave del decorador result_ttl
mediante la configuración DEFAULT_RESULT_TTL
:
RQ = {
'DEFAULT_RESULT_TTL' : 5000 ,
}
Con esta configuración, el decorador de trabajos establecerá result_ttl
en 5000 a menos que se especifique explícitamente.
django_rq proporciona un comando de administración que inicia un trabajador para cada cola especificada como argumentos:
python administrar.py rqworker alto valor predeterminado bajo
Si desea ejecutar rqworker
en modo ráfaga, puede pasar el indicador --burst
:
python Manage.py rqworker alto valor predeterminado bajo --burst
Si necesita utilizar clases de cola, trabajo o trabajador personalizadas, es mejor utilizar la configuración global (consulte Clases de cola personalizadas y Clases de trabajo y trabajador personalizadas). Sin embargo, también es posible anular dichas configuraciones con opciones de línea de comando de la siguiente manera.
Para usar una clase de trabajador personalizada, puede pasar el indicador --worker-class
con la ruta a su trabajador:
python Manage.py rqworker alto valor predeterminado bajo --worker-class 'path.to.GeventWorker'
Para usar una clase de cola personalizada, puede pasar el indicador --queue-class
con la ruta a su clase de cola:
python Manage.py rqworker alto valor predeterminado bajo --queue-class 'path.to.CustomQueue'
Para utilizar una clase de trabajo personalizada, proporcione el indicador --job-class
.
A partir de la versión 2.10, también se admite la ejecución del grupo de trabajadores de RQ:
python Manage.py rqworker-pool valor predeterminado bajo medio --num-workers 4
Con RQ 1.2.0. puede utilizar el programador integrado para sus trabajos. Por ejemplo:
from django_rq . queues import get_queue
queue = get_queue ( 'default' )
job = queue . enqueue_at ( datetime ( 2020 , 10 , 10 ), func )
Si está utilizando un programador integrado, debe iniciar trabajadores con soporte de programador:
python administrar.py rqworker --con-programador
Alternativamente, puede utilizar RQ Scheduler. Después de la instalación, también puede usar la función get_scheduler
para devolver una instancia Scheduler
para las colas definidas en RQ_QUEUES
de settings.py. Por ejemplo:
import django_rq
scheduler = django_rq . get_scheduler ( 'default' )
job = scheduler . enqueue_at ( datetime ( 2020 , 10 , 10 ), func )
También puede utilizar el comando de administración rqscheduler
para iniciar el programador:
python administrar.py rqscheduler
Si tiene instalado django-redis o django-redis-cache, puede indicarle a django_rq que use la misma información de conexión de su caché de Redis. Esto tiene dos ventajas: es SECO y aprovecha cualquier optimización que pueda estar sucediendo en la configuración de su caché (como el uso de agrupaciones de conexiones o Hiredis).
Para configurarlo, use un dictado con la clave USE_REDIS_CACHE
apuntando al nombre del caché deseado en su dictado RQ_QUEUES
. No hace falta decir que el caché elegido debe existir y utilizar el backend de Redis. Consulte los documentos respectivos del paquete de caché de Redis para obtener instrucciones de configuración. También es importante señalar que dado que django-redis-cache ShardedClient
divide el caché en múltiples conexiones de Redis, no funciona.
Aquí hay un fragmento de configuración de ejemplo para django-redis:
CACHES = {
'redis-cache' : {
'BACKEND' : 'redis_cache.cache.RedisCache' ,
'LOCATION' : 'localhost:6379:1' ,
'OPTIONS' : {
'CLIENT_CLASS' : 'django_redis.client.DefaultClient' ,
'MAX_ENTRIES' : 5000 ,
},
},
}
RQ_QUEUES = {
'high' : {
'USE_REDIS_CACHE' : 'redis-cache' ,
},
'low' : {
'USE_REDIS_CACHE' : 'redis-cache' ,
},
}
A veces es posible que desee suspender RQ para evitar que procese nuevos trabajos. Un ejemplo clásico es durante la fase inicial de un script de implementación o antes de poner su sitio en modo de mantenimiento. Esto es particularmente útil cuando tiene trabajos que duran relativamente mucho tiempo y que, de otro modo, podrían verse obligados a finalizar durante la implementación.
El comando suspender impide que los trabajadores de _todas_ las colas (en una única base de datos de Redis) seleccionen nuevos trabajos. Sin embargo, los trabajos actualmente en ejecución continuarán hasta su finalización.
# Suspend indefinitely
python manage.py rqsuspend
# Suspend for a specific duration (in seconds) then automatically
# resume work again.
python manage.py rqsuspend -d 600
# Resume work again.
python manage.py rqresume
django_rq
también proporciona un panel para monitorear el estado de sus colas en /django-rq/
(o cualquier URL que establezca en su urls.py
durante la instalación.
También puede agregar un enlace a este enlace del panel en /admin
agregando RQ_SHOW_ADMIN_LINK = True
en settings.py
. Sin embargo, tenga cuidado, esto anulará la plantilla de administración predeterminada, por lo que puede interferir con otras aplicaciones que modifican la plantilla de administración predeterminada.
Estas estadísticas también están disponibles en formato JSON a través de /django-rq/stats.json
, al que pueden acceder los miembros del personal. Si necesita acceder a esta vista a través de otros clientes HTTP (para fines de monitoreo), puede definir RQ_API_TOKEN
y acceder a ella a través de /django-rq/stats.json/<API_TOKEN>
.
Nota: Las estadísticas de trabajos programados muestran trabajos del programador integrado de RQ, no del programador de RQ opcional.
Además, también se puede acceder a estas estadísticas desde la línea de comando.
python manage.py rqstats
python manage.py rqstats --interval=1 # Refreshes every second
python manage.py rqstats --json # Output as JSON
python manage.py rqstats --yaml # Output as YAML
Sentry debe configurarse dentro de Django settings.py
como se describe en los documentos de Sentry.
Puede anular la configuración predeterminada de Django Sentry al ejecutar el comando rqworker
pasando la opción sentry-dsn
:
./manage.py rqworker --sentry-dsn=https://*****@sentry.io/222222
Esto anulará cualquier configuración existente de Django y reinicializará Sentry, configurando las siguientes opciones de Sentry:
{
'debug' : options . get ( 'sentry_debug' ),
'ca_certs' : options . get ( 'sentry_ca_certs' ),
'integrations' : [ RedisIntegration (), RqIntegration (), DjangoIntegration ()]
}
RQ utiliza logging
de Python, esto significa que puede configurar fácilmente el mecanismo de registro de rqworker
en settings.py
de django. Por ejemplo:
LOGGING = {
"version" : 1 ,
"disable_existing_loggers" : False ,
"formatters" : {
"rq_console" : {
"format" : "%(asctime)s %(message)s" ,
"datefmt" : "%H:%M:%S" ,
},
},
"handlers" : {
"rq_console" : {
"level" : "DEBUG" ,
"class" : "rq.logutils.ColorizingStreamHandler" ,
"formatter" : "rq_console" ,
"exclude" : [ "%(asctime)s" ],
},
},
'loggers' : {
"rq.worker" : {
"handlers" : [ "rq_console" , "sentry" ],
"level" : "DEBUG"
},
}
}
De forma predeterminada, cada cola utilizará la clase DjangoRQ
. Si desea utilizar una clase de cola personalizada, puede hacerlo agregando una opción QUEUE_CLASS
por cola en RQ_QUEUES
:
RQ_QUEUES = {
'default' : {
'HOST' : 'localhost' ,
'PORT' : 6379 ,
'DB' : 0 ,
'QUEUE_CLASS' : 'module.path.CustomClass' ,
}
}
o puede especificar DjangoRQ
para usar una clase personalizada para todas sus colas en la configuración RQ
:
RQ = {
'QUEUE_CLASS' : 'module.path.CustomClass' ,
}
Las clases de cola personalizadas deben heredar de django_rq.queues.DjangoRQ
.
Si está utilizando más de una clase de cola (no recomendado), asegúrese de ejecutar trabajadores solo en colas con la misma clase de cola. Por ejemplo, si tiene dos colas definidas en RQ_QUEUES
y una tiene una clase personalizada especificada, deberá ejecutar al menos dos trabajadores separados para cada cola.
De manera similar a las clases de cola personalizadas, las clases de trabajadores y trabajos personalizados globales se pueden configurar usando las configuraciones JOB_CLASS
y WORKER_CLASS
:
RQ = {
'JOB_CLASS' : 'module.path.CustomJobClass' ,
'WORKER_CLASS' : 'module.path.CustomWorkerClass' ,
}
La clase de trabajo personalizada debe heredar de rq.job.Job
. Se utilizará para todos los trabajos si está configurado.
La clase de trabajador personalizada debe heredar de rq.worker.Worker
. Se utilizará para ejecutar todos los trabajadores a menos que lo anule la opción worker-class
del comando de administración rqworker
.
Para facilitar el proceso de prueba, puede ejecutar un trabajador sincrónicamente de esta manera:
from django . test import TestCase
from django_rq import get_worker
class MyTest ( TestCase ):
def test_something_that_creates_jobs ( self ):
... # Stuff that init jobs.
get_worker (). work ( burst = True ) # Processes all jobs then stop.
... # Asserts that the job stuff is done.
Puede configurar la opción ASYNC
en False
para que la operación sincrónica sea la predeterminada para una cola determinada. Esto hará que los trabajos se ejecuten inmediatamente y en el mismo hilo en el que se envían, lo que resulta útil para realizar pruebas y depurar. Por ejemplo, puede agregar lo siguiente después de configurar la cola en su archivo de configuración:
# ... Logic to set DEBUG and TESTING settings to True or False ...
# ... Regular RQ_QUEUES setup code ...
if DEBUG or TESTING :
for queueConfig in RQ_QUEUES . values ():
queueConfig [ 'ASYNC' ] = False
Tenga en cuenta que configurar el parámetro is_async
explícitamente al llamar get_queue
anulará esta configuración.
Para ejecutar el conjunto de pruebas de django_rq
:
`cuál django-admin` prueba django_rq --settings=django_rq.tests.settings --pythonpath=.
Cree un servicio rqworker que ejecute las colas alta, predeterminada y baja.
sudo vi /etc/systemd/system/rqworker.service
[Unit]
Description=Django-RQ Worker
After=network.target
[Service]
WorkingDirectory= << path_to_your_project_folder>>
ExecStart=/home/ubuntu/.virtualenv/<<your_virtualenv>>/bin/python
<<path_to_your_project_folder>>/manage.py
rqworker high default low
[Install]
WantedBy=multi-user.target
Habilitar e iniciar el servicio.
sudo systemctl enable rqworker
sudo systemctl start rqworker
Agregue django-rq a su archivo require.txt con:
pip freeze > requirements.txt
Actualice su perfil a:
web: gunicorn --pythonpath= " $PWD /your_app_name " config.wsgi:application
worker: python your_app_name/manage.py rqworker high default low
Confirmar y volver a implementar. Luego agregue su nuevo trabajador con:
heroku scale worker=1
Consulte CHANGELOG.md.