Integração do Django com RQ, uma biblioteca de filas Python baseada em Redis. Django-RQ é um aplicativo simples que permite configurar suas filas no settings.py
do Django e usá-las facilmente em seu projeto.
Se você achar django-rq
útil, considere apoiar seu desenvolvimento via Tidelift.
django-rq
(ou baixe do PyPI): pip install django - rq
django_rq
a INSTALLED_APPS
em settings.py
: INSTALLED_APPS = (
# other apps
"django_rq" ,
)
settings.py
do Django: RQ_QUEUES = {
'default' : {
'HOST' : 'localhost' ,
'PORT' : 6379 ,
'DB' : 0 ,
'USERNAME' : 'some-user' ,
'PASSWORD' : 'some-password' ,
'DEFAULT_TIMEOUT' : 360 ,
'REDIS_CLIENT_KWARGS' : { # Eventual additional Redis connection arguments
'ssl_cert_reqs' : None ,
},
},
'with-sentinel' : {
'SENTINELS' : [( 'localhost' , 26736 ), ( 'localhost' , 26737 )],
'MASTER_NAME' : 'redismaster' ,
'DB' : 0 ,
# Redis username/password
'USERNAME' : 'redis-user' ,
'PASSWORD' : 'secret' ,
'SOCKET_TIMEOUT' : 0.3 ,
'CONNECTION_KWARGS' : { # Eventual additional Redis connection arguments
'ssl' : True
},
'SENTINEL_KWARGS' : { # Eventual Sentinel connection arguments
# If Sentinel also has auth, username/password can be passed here
'username' : 'sentinel-user' ,
'password' : 'secret' ,
},
},
'high' : {
'URL' : os . getenv ( 'REDISTOGO_URL' , 'redis://localhost:6379/0' ), # If you're on Heroku
'DEFAULT_TIMEOUT' : 500 ,
},
'low' : {
'HOST' : 'localhost' ,
'PORT' : 6379 ,
'DB' : 0 ,
}
}
RQ_EXCEPTION_HANDLERS = [ 'path.to.my.handler' ] # If you need custom exception handlers
django_rq.urls
em seu urls.py
: urlpatterns += [
path ( 'django-rq/' , include ( 'django_rq.urls' ))
]
Django-RQ permite que você coloque facilmente jobs em qualquer uma das filas definidas em settings.py
. Ele vem com algumas funções utilitárias:
enqueue
- envia um trabalho para a fila default
: import django_rq
django_rq . enqueue ( func , foo , bar = baz )
get_queue
– retorna uma instância Queue
. import django_rq
queue = django_rq . get_queue ( 'high' )
queue . enqueue ( func , foo , bar = baz )
Além do argumento name
, get_queue
também aceita os argumentos default_timeout
, is_async
, autocommit
, connection
e queue_class
. Por exemplo:
queue = django_rq . get_queue ( 'default' , autocommit = True , is_async = True , default_timeout = 360 )
queue . enqueue ( func , foo , bar = baz )
Você pode fornecer seu próprio objeto de conexão Redis singleton para esta função para que ela não crie um novo objeto de conexão para cada definição de fila. Isso o ajudará a limitar o número de conexões com o servidor Redis. Por exemplo:
import django_rq
import redis
redis_cursor = redis . StrictRedis ( host = '' , port = '' , db = '' , password = '' )
high_queue = django_rq . get_queue ( 'high' , connection = redis_cursor )
low_queue = django_rq . get_queue ( 'low' , connection = redis_cursor )
get_connection
- aceita um único argumento de nome de fila (o padrão é "default") e retorna uma conexão com o servidor Redis da fila: import django_rq
redis_conn = django_rq . get_connection ( 'high' )
get_worker
- aceita nomes de filas opcionais e retorna uma nova instância do RQ Worker
para filas especificadas (ou fila default
): import django_rq
worker = django_rq . get_worker () # Returns a worker for "default" queue
worker . work ()
worker = django_rq . get_worker ( 'low' , 'high' ) # Returns a worker for "low" and "high"
Para transformar facilmente uma tarefa callable em uma tarefa RQ, você também pode usar o decorador @job
que vem com django_rq
:
from django_rq import job
@ job
def long_running_func ():
pass
long_running_func . delay () # Enqueue function in "default" queue
@ job ( 'high' )
def long_running_func ():
pass
long_running_func . delay () # Enqueue function in "high" queue
Você pode passar quaisquer argumentos que o decorador de tarefas do RQ aceite:
@ job ( 'default' , timeout = 3600 )
def long_running_func ():
pass
long_running_func . delay () # Enqueue function with a timeout of 3600 seconds.
É possível especificar o padrão para o argumento da palavra-chave do decorador result_ttl
por meio da configuração DEFAULT_RESULT_TTL
:
RQ = {
'DEFAULT_RESULT_TTL' : 5000 ,
}
Com esta configuração, o decorador de tarefas definirá result_ttl
como 5000, a menos que seja especificado explicitamente.
django_rq fornece um comando de gerenciamento que inicia um trabalhador para cada fila especificada como argumentos:
python manager.py rqworker alto padrão baixo
Se você deseja executar rqworker
no modo burst, você pode passar o sinalizador --burst
:
python manager.py rqworker alto padrão baixo --burst
Se você precisar usar classes de trabalho, de trabalho ou de fila customizadas, é melhor usar configurações globais (consulte Classes de fila customizadas e Classes de trabalho e de trabalho customizadas). No entanto, também é possível substituir essas configurações pelas opções de linha de comando, como segue.
Para usar uma classe de trabalhador personalizada, você pode passar o sinalizador --worker-class
com o caminho para seu trabalhador:
python manager.py rqworker alto padrão baixo --worker-class 'path.to.GeventWorker'
Para usar uma classe de fila personalizada, você pode passar o sinalizador --queue-class
com o caminho para sua classe de fila:
python manager.py rqworker alto padrão baixo --queue-class 'path.to.CustomQueue'
Para usar uma classe de trabalho personalizada, forneça o sinalizador --job-class
.
A partir da versão 2.10, a execução do conjunto de trabalhadores do RQ também é suportada:
python manager.py rqworker-pool padrão baixo médio --num-workers 4
Com RQ 1.2.0. você pode usar o agendador integrado para seus trabalhos. Por exemplo:
from django_rq . queues import get_queue
queue = get_queue ( 'default' )
job = queue . enqueue_at ( datetime ( 2020 , 10 , 10 ), func )
Se você estiver usando o agendador integrado, deverá iniciar os trabalhadores com suporte ao agendador:
python manager.py rqworker --with-scheduler
Alternativamente, você pode usar o RQ Scheduler. Após a instalação, você também pode usar a função get_scheduler
para retornar uma instância Scheduler
para filas definidas em RQ_QUEUES
de settings.py. Por exemplo:
import django_rq
scheduler = django_rq . get_scheduler ( 'default' )
job = scheduler . enqueue_at ( datetime ( 2020 , 10 , 10 ), func )
Também é possível usar o comando de gerenciamento rqscheduler
para iniciar o planejador:
python gerencia.py rqscheduler
Se você tiver django-redis ou django-redis-cache instalado, você pode instruir django_rq para usar as mesmas informações de conexão do seu cache Redis. Isso tem duas vantagens: é DRY e aproveita qualquer otimização que possa estar acontecendo na configuração do cache (como usar pool de conexões ou Hiredis).
Para configurá-lo, use um dict com a chave USE_REDIS_CACHE
apontando para o nome do cache desejado em seu dict RQ_QUEUES
. Nem é preciso dizer que o cache escolhido deve existir e usar o backend Redis. Consulte os respectivos documentos do pacote de cache Redis para obter instruções de configuração. Também é importante ressaltar que como o django-redis-cache ShardedClient
divide o cache em múltiplas conexões Redis, ele não funciona.
Aqui está um exemplo de fragmento de configurações para Django-redis:
CACHES = {
'redis-cache' : {
'BACKEND' : 'redis_cache.cache.RedisCache' ,
'LOCATION' : 'localhost:6379:1' ,
'OPTIONS' : {
'CLIENT_CLASS' : 'django_redis.client.DefaultClient' ,
'MAX_ENTRIES' : 5000 ,
},
},
}
RQ_QUEUES = {
'high' : {
'USE_REDIS_CACHE' : 'redis-cache' ,
},
'low' : {
'USE_REDIS_CACHE' : 'redis-cache' ,
},
}
Às vezes você pode querer suspender o RQ para evitar que ele processe novos trabalhos. Um exemplo clássico é durante a fase inicial de um script de implantação ou antes de colocar seu site em modo de manutenção. Isso é particularmente útil quando você tem trabalhos de execução relativamente longa e que, de outra forma, poderiam ser eliminados à força durante a implantação.
O comando suspend impede que trabalhadores em _todas_ as filas (em um único banco de dados Redis) escolham novos trabalhos. No entanto, os trabalhos em execução continuarão até a conclusão.
# Suspend indefinitely
python manage.py rqsuspend
# Suspend for a specific duration (in seconds) then automatically
# resume work again.
python manage.py rqsuspend -d 600
# Resume work again.
python manage.py rqresume
django_rq
também fornece um painel para monitorar o status de suas filas em /django-rq/
(ou qualquer URL que você definir em seu urls.py
durante a instalação.
Você também pode adicionar um link para este link do painel em /admin
adicionando RQ_SHOW_ADMIN_LINK = True
em settings.py
. Porém, tenha cuidado, pois isso substituirá o modelo de administração padrão, podendo interferir em outros aplicativos que modificam o modelo de administração padrão.
Essas estatísticas também estão disponíveis no formato JSON via /django-rq/stats.json
, que é acessível aos membros da equipe. Se precisar acessar esta visualização através de outros clientes HTTP (para fins de monitoramento), você pode definir RQ_API_TOKEN
e acessá-la via /django-rq/stats.json/<API_TOKEN>
.
Nota: As estatísticas de trabalhos agendados exibem trabalhos do agendador integrado do RQ, e não do agendador RQ opcional.
Além disso, essas estatísticas também podem ser acessadas na linha de comando.
python manage.py rqstats
python manage.py rqstats --interval=1 # Refreshes every second
python manage.py rqstats --json # Output as JSON
python manage.py rqstats --yaml # Output as YAML
O Sentry deve ser configurado no Django settings.py
conforme descrito na documentação do Sentry.
Você pode substituir a configuração padrão do Django Sentry ao executar o comando rqworker
passando a opção sentry-dsn
:
./manage.py rqworker --sentry-dsn=https://*****@sentry.io/222222
Isto irá substituir qualquer configuração existente do Django e reinicializar o Sentry, definindo as seguintes opções do Sentry:
{
'debug' : options . get ( 'sentry_debug' ),
'ca_certs' : options . get ( 'sentry_ca_certs' ),
'integrations' : [ RedisIntegration (), RqIntegration (), DjangoIntegration ()]
}
RQ usa logging
do Python, isso significa que você pode configurar facilmente o mecanismo de log do rqworker
no settings.py
do Django. Por exemplo:
LOGGING = {
"version" : 1 ,
"disable_existing_loggers" : False ,
"formatters" : {
"rq_console" : {
"format" : "%(asctime)s %(message)s" ,
"datefmt" : "%H:%M:%S" ,
},
},
"handlers" : {
"rq_console" : {
"level" : "DEBUG" ,
"class" : "rq.logutils.ColorizingStreamHandler" ,
"formatter" : "rq_console" ,
"exclude" : [ "%(asctime)s" ],
},
},
'loggers' : {
"rq.worker" : {
"handlers" : [ "rq_console" , "sentry" ],
"level" : "DEBUG"
},
}
}
Por padrão, cada fila usará a classe DjangoRQ
. Se quiser usar uma classe de fila personalizada, você pode fazer isso adicionando uma opção QUEUE_CLASS
por fila em RQ_QUEUES
:
RQ_QUEUES = {
'default' : {
'HOST' : 'localhost' ,
'PORT' : 6379 ,
'DB' : 0 ,
'QUEUE_CLASS' : 'module.path.CustomClass' ,
}
}
ou você pode especificar DjangoRQ
para usar uma classe personalizada para todas as suas filas nas configurações RQ
:
RQ = {
'QUEUE_CLASS' : 'module.path.CustomClass' ,
}
Classes de fila customizadas devem herdar de django_rq.queues.DjangoRQ
.
Se você estiver usando mais de uma classe de fila (não recomendado), certifique-se de executar somente trabalhadores em filas com a mesma classe de fila. Por exemplo, se você tiver duas filas definidas em RQ_QUEUES
e uma tiver uma classe customizada especificada, será necessário executar pelo menos dois trabalhadores separados para cada fila.
Da mesma forma que as classes de fila customizadas, as classes de trabalho e de trabalho customizadas globais podem ser configuradas usando as configurações JOB_CLASS
e WORKER_CLASS
:
RQ = {
'JOB_CLASS' : 'module.path.CustomJobClass' ,
'WORKER_CLASS' : 'module.path.CustomWorkerClass' ,
}
A classe de trabalho personalizada deve herdar de rq.job.Job
. Ele será usado para todos os trabalhos, se configurado.
A classe de trabalho personalizada deve herdar de rq.worker.Worker
. Ele será usado para executar todos os trabalhadores, a menos que seja substituído pela opção worker-class
do comando de gerenciamento rqworker
.
Para um processo de teste mais fácil, você pode executar um trabalhador de forma síncrona desta forma:
from django . test import TestCase
from django_rq import get_worker
class MyTest ( TestCase ):
def test_something_that_creates_jobs ( self ):
... # Stuff that init jobs.
get_worker (). work ( burst = True ) # Processes all jobs then stop.
... # Asserts that the job stuff is done.
Você pode definir a opção ASYNC
como False
para tornar a operação síncrona o padrão para uma determinada fila. Isso fará com que os trabalhos sejam executados imediatamente e no mesmo thread em que foram despachados, o que é útil para testes e depuração. Por exemplo, você pode adicionar o seguinte depois de enfileirar a configuração em seu arquivo de configurações:
# ... Logic to set DEBUG and TESTING settings to True or False ...
# ... Regular RQ_QUEUES setup code ...
if DEBUG or TESTING :
for queueConfig in RQ_QUEUES . values ():
queueConfig [ 'ASYNC' ] = False
Observe que definir explicitamente o parâmetro is_async
ao chamar get_queue
substituirá essa configuração.
Para executar o conjunto de testes do django_rq
:
`qual django-admin` testa django_rq --settings=django_rq.tests.settings --pythonpath=.
Crie um serviço rqworker que execute as filas alta, padrão e baixa.
sudo vi /etc/systemd/system/rqworker.service
[Unit]
Description=Django-RQ Worker
After=network.target
[Service]
WorkingDirectory= << path_to_your_project_folder>>
ExecStart=/home/ubuntu/.virtualenv/<<your_virtualenv>>/bin/python
<<path_to_your_project_folder>>/manage.py
rqworker high default low
[Install]
WantedBy=multi-user.target
Habilite e inicie o serviço
sudo systemctl enable rqworker
sudo systemctl start rqworker
Adicione django-rq ao seu arquivo requirements.txt com:
pip freeze > requirements.txt
Atualize seu perfil para:
web: gunicorn --pythonpath= " $PWD /your_app_name " config.wsgi:application
worker: python your_app_name/manage.py rqworker high default low
Confirme e reimplante. Em seguida, adicione seu novo trabalhador com:
heroku scale worker=1
Consulte CHANGELOG.md.