Django 与 RQ 集成,RQ 是一个基于 Redis 的 Python 队列库。 Django-RQ 是一个简单的应用程序,允许您在 django 的settings.py
中配置队列并在项目中轻松使用它们。
如果您发现django-rq
有用,请考虑通过 Tidelift 支持其开发。
django-rq
(或从 PyPI 下载): pip install django - rq
django_rq
添加到settings.py
中的INSTALLED_APPS
中: INSTALLED_APPS = (
# other apps
"django_rq" ,
)
settings.py
中配置队列: RQ_QUEUES = {
'default' : {
'HOST' : 'localhost' ,
'PORT' : 6379 ,
'DB' : 0 ,
'USERNAME' : 'some-user' ,
'PASSWORD' : 'some-password' ,
'DEFAULT_TIMEOUT' : 360 ,
'REDIS_CLIENT_KWARGS' : { # Eventual additional Redis connection arguments
'ssl_cert_reqs' : None ,
},
},
'with-sentinel' : {
'SENTINELS' : [( 'localhost' , 26736 ), ( 'localhost' , 26737 )],
'MASTER_NAME' : 'redismaster' ,
'DB' : 0 ,
# Redis username/password
'USERNAME' : 'redis-user' ,
'PASSWORD' : 'secret' ,
'SOCKET_TIMEOUT' : 0.3 ,
'CONNECTION_KWARGS' : { # Eventual additional Redis connection arguments
'ssl' : True
},
'SENTINEL_KWARGS' : { # Eventual Sentinel connection arguments
# If Sentinel also has auth, username/password can be passed here
'username' : 'sentinel-user' ,
'password' : 'secret' ,
},
},
'high' : {
'URL' : os . getenv ( 'REDISTOGO_URL' , 'redis://localhost:6379/0' ), # If you're on Heroku
'DEFAULT_TIMEOUT' : 500 ,
},
'low' : {
'HOST' : 'localhost' ,
'PORT' : 6379 ,
'DB' : 0 ,
}
}
RQ_EXCEPTION_HANDLERS = [ 'path.to.my.handler' ] # If you need custom exception handlers
urls.py
中包含django_rq.urls
: urlpatterns += [
path ( 'django-rq/' , include ( 'django_rq.urls' ))
]
Django-RQ 允许您轻松地将作业放入settings.py
中定义的任何队列中。它带有一些实用功能:
enqueue
- 将作业推送到default
队列: import django_rq
django_rq . enqueue ( func , foo , bar = baz )
get_queue
- 返回一个Queue
实例。 import django_rq
queue = django_rq . get_queue ( 'high' )
queue . enqueue ( func , foo , bar = baz )
除了name
参数之外, get_queue
还接受default_timeout
、 is_async
、 autocommit
、 connection
和queue_class
参数。例如:
queue = django_rq . get_queue ( 'default' , autocommit = True , is_async = True , default_timeout = 360 )
queue . enqueue ( func , foo , bar = baz )
您可以向此函数提供您自己的单例 Redis 连接对象,这样它就不会为每个队列定义创建新的连接对象。这将帮助您限制与 Redis 服务器的连接数量。例如:
import django_rq
import redis
redis_cursor = redis . StrictRedis ( host = '' , port = '' , db = '' , password = '' )
high_queue = django_rq . get_queue ( 'high' , connection = redis_cursor )
low_queue = django_rq . get_queue ( 'low' , connection = redis_cursor )
get_connection
- 接受单个队列名称参数(默认为“default”)并返回到队列的 Redis 服务器的连接: import django_rq
redis_conn = django_rq . get_connection ( 'high' )
get_worker
- 接受可选队列名称并返回指定队列(或default
队列)的新 RQ Worker
实例: import django_rq
worker = django_rq . get_worker () # Returns a worker for "default" queue
worker . work ()
worker = django_rq . get_worker ( 'low' , 'high' ) # Returns a worker for "low" and "high"
要轻松地将可调用任务转换为 RQ 任务,您还可以使用django_rq
附带的@job
装饰器:
from django_rq import job
@ job
def long_running_func ():
pass
long_running_func . delay () # Enqueue function in "default" queue
@ job ( 'high' )
def long_running_func ():
pass
long_running_func . delay () # Enqueue function in "high" queue
您可以传入 RQ 的作业装饰器接受的任何参数:
@ job ( 'default' , timeout = 3600 )
def long_running_func ():
pass
long_running_func . delay () # Enqueue function with a timeout of 3600 seconds.
可以通过DEFAULT_RESULT_TTL
设置指定result_ttl
装饰器关键字参数的默认值:
RQ = {
'DEFAULT_RESULT_TTL' : 5000 ,
}
使用此设置,作业装饰器会将result_ttl
设置为 5000,除非明确指定。
django_rq 提供了一个管理命令,为每个指定为参数的队列启动一个工作进程:
python manage.py rqworker 高默认低
如果你想在突发模式下运行rqworker
,你可以传入--burst
标志:
python manage.py rqworker 高默认低--burst
如果您需要使用自定义工作程序、作业或队列类,最好使用全局设置(请参阅自定义队列类和自定义作业和工作程序类)。但是,也可以使用命令行选项覆盖此类设置,如下所示。
要使用自定义工作程序类,您可以传入--worker-class
标志以及工作程序的路径:
python manage.py rqworker high default low --worker-class 'path.to.GeventWorker'
要使用自定义队列类,您可以传入--queue-class
标志以及队列类的路径:
python manage.py rqworker 高默认低 --queue-class 'path.to.CustomQueue'
要使用自定义作业类,请提供--job-class
标志。
从2.10版本开始,也支持运行RQ的worker-pool:
python manage.py rqworker-pool默认低中--num-workers 4
使用 RQ 1.2.0。您可以使用内置的调度程序来完成您的作业。例如:
from django_rq . queues import get_queue
queue = get_queue ( 'default' )
job = queue . enqueue_at ( datetime ( 2020 , 10 , 10 ), func )
如果您使用内置调度程序,则必须启动具有调度程序支持的工作程序:
python manage.py rqworker --with-scheduler
或者,您可以使用 RQ Scheduler。安装后,您还可以使用get_scheduler
函数为 settings.py 的RQ_QUEUES
中定义的队列返回Scheduler
实例。例如:
import django_rq
scheduler = django_rq . get_scheduler ( 'default' )
job = scheduler . enqueue_at ( datetime ( 2020 , 10 , 10 ), func )
您还可以使用管理命令rqscheduler
来启动调度程序:
python 管理.py rqscheduler
如果您安装了 django-redis 或 django-redis-cache,您可以指示 django_rq 使用 Redis 缓存中的相同连接信息。这有两个优点:它是 DRY,并且它利用了缓存设置中可能进行的任何优化(例如使用连接池或 Hiredis)。
要使用配置它,请使用一个字典,其键USE_REDIS_CACHE
指向RQ_QUEUES
字典中所需缓存的名称。不用说,所选的缓存必须存在并且使用 Redis 后端。有关配置说明,请参阅各自的 Redis 缓存包文档。还需要指出的是,由于 django-redis-cache ShardedClient
将缓存拆分到多个 Redis 连接上,因此它不起作用。
这是 django-redis 的示例设置片段:
CACHES = {
'redis-cache' : {
'BACKEND' : 'redis_cache.cache.RedisCache' ,
'LOCATION' : 'localhost:6379:1' ,
'OPTIONS' : {
'CLIENT_CLASS' : 'django_redis.client.DefaultClient' ,
'MAX_ENTRIES' : 5000 ,
},
},
}
RQ_QUEUES = {
'high' : {
'USE_REDIS_CACHE' : 'redis-cache' ,
},
'low' : {
'USE_REDIS_CACHE' : 'redis-cache' ,
},
}
有时您可能希望暂停 RQ 以防止其处理新作业。一个典型的示例是在部署脚本的初始阶段或将站点置于维护模式之前。当您的作业运行时间相对较长并且可能在部署期间被强制终止时,这特别有用。
挂起命令会阻止所有队列(在单个 Redis 数据库中)上的工作线程拾取新作业。但是,当前正在运行的作业将继续直至完成。
# Suspend indefinitely
python manage.py rqsuspend
# Suspend for a specific duration (in seconds) then automatically
# resume work again.
python manage.py rqsuspend -d 600
# Resume work again.
python manage.py rqresume
django_rq
还提供了一个仪表板来监视/django-rq/
中的队列状态(或者您在安装过程中在urls.py
中设置的任何 URL)。
您还可以通过在settings.py
中添加RQ_SHOW_ADMIN_LINK = True
来在/admin
中添加指向此仪表板链接的链接。但请注意,这将覆盖默认管理模板,因此可能会干扰修改默认管理模板的其他应用程序。
这些统计信息还可以通过/django-rq/stats.json
以 JSON 格式提供,工作人员可以访问。如果您需要通过其他 HTTP 客户端访问此视图(用于监控目的),您可以定义RQ_API_TOKEN
并通过/django-rq/stats.json/<API_TOKEN>
访问它。
注意:计划作业的统计信息显示来自 RQ 内置调度程序的作业,而不是可选的 RQ 调度程序。
此外,还可以从命令行访问这些统计信息。
python manage.py rqstats
python manage.py rqstats --interval=1 # Refreshes every second
python manage.py rqstats --json # Output as JSON
python manage.py rqstats --yaml # Output as YAML
Sentry 应在 Django settings.py
中配置,如 Sentry 文档中所述。
您可以在运行rqworker
命令时通过传递sentry-dsn
选项来覆盖默认的 Django Sentry 配置:
./manage.py rqworker --sentry-dsn=https://*****@sentry.io/222222
这将覆盖任何现有的 Django 配置并重新初始化 Sentry,设置以下 Sentry 选项:
{
'debug' : options . get ( 'sentry_debug' ),
'ca_certs' : options . get ( 'sentry_ca_certs' ),
'integrations' : [ RedisIntegration (), RqIntegration (), DjangoIntegration ()]
}
RQ 使用Python 的logging
,这意味着您可以在django 的settings.py
中轻松配置rqworker
的日志记录机制。例如:
LOGGING = {
"version" : 1 ,
"disable_existing_loggers" : False ,
"formatters" : {
"rq_console" : {
"format" : "%(asctime)s %(message)s" ,
"datefmt" : "%H:%M:%S" ,
},
},
"handlers" : {
"rq_console" : {
"level" : "DEBUG" ,
"class" : "rq.logutils.ColorizingStreamHandler" ,
"formatter" : "rq_console" ,
"exclude" : [ "%(asctime)s" ],
},
},
'loggers' : {
"rq.worker" : {
"handlers" : [ "rq_console" , "sentry" ],
"level" : "DEBUG"
},
}
}
默认情况下,每个队列都将使用DjangoRQ
类。如果您想使用自定义队列类,可以通过在RQ_QUEUES
中为每个队列添加QUEUE_CLASS
选项来实现:
RQ_QUEUES = {
'default' : {
'HOST' : 'localhost' ,
'PORT' : 6379 ,
'DB' : 0 ,
'QUEUE_CLASS' : 'module.path.CustomClass' ,
}
}
或者您可以指定DjangoRQ
在RQ
设置中为所有队列使用自定义类:
RQ = {
'QUEUE_CLASS' : 'module.path.CustomClass' ,
}
自定义队列类应该继承自django_rq.queues.DjangoRQ
。
如果您使用多个队列类(不推荐),请确保仅在具有相同队列类的队列上运行工作线程。例如,如果您在RQ_QUEUES
中定义了两个队列,并且其中一个队列指定了自定义类,则您必须为每个队列至少运行两个单独的工作线程。
与自定义队列类类似,可以使用JOB_CLASS
和WORKER_CLASS
设置来配置全局自定义作业和工作线程类:
RQ = {
'JOB_CLASS' : 'module.path.CustomJobClass' ,
'WORKER_CLASS' : 'module.path.CustomWorkerClass' ,
}
自定义作业类应继承自rq.job.Job
。如果配置,它将用于所有作业。
自定义工作器类应继承自rq.worker.Worker
。它将用于运行所有工作程序,除非被rqworker
管理命令worker-class
选项覆盖。
为了更轻松地测试过程,您可以通过以下方式同步运行工作线程:
from django . test import TestCase
from django_rq import get_worker
class MyTest ( TestCase ):
def test_something_that_creates_jobs ( self ):
... # Stuff that init jobs.
get_worker (). work ( burst = True ) # Processes all jobs then stop.
... # Asserts that the job stuff is done.
您可以将选项ASYNC
设置为False
以使同步操作成为给定队列的默认操作。这将导致作业在调度时立即在同一线程上执行,这对于测试和调试非常有用。例如,您可以在设置文件中对配置进行排队后添加以下内容:
# ... Logic to set DEBUG and TESTING settings to True or False ...
# ... Regular RQ_QUEUES setup code ...
if DEBUG or TESTING :
for queueConfig in RQ_QUEUES . values ():
queueConfig [ 'ASYNC' ] = False
请注意,在调用get_queue
时显式设置is_async
参数将覆盖此设置。
运行django_rq
的测试套件:
`which django-admin` 测试 django_rq --settings=django_rq.tests.settings --pythonpath=.
创建运行高、默认和低队列的 rqworker 服务。
sudo vi /etc/systemd/system/rqworker.service
[Unit]
Description=Django-RQ Worker
After=network.target
[Service]
WorkingDirectory= << path_to_your_project_folder>>
ExecStart=/home/ubuntu/.virtualenv/<<your_virtualenv>>/bin/python
<<path_to_your_project_folder>>/manage.py
rqworker high default low
[Install]
WantedBy=multi-user.target
启用并启动服务
sudo systemctl enable rqworker
sudo systemctl start rqworker
将 django-rq 添加到您的requirements.txt 文件中:
pip freeze > requirements.txt
将您的 Procfile 更新为:
web: gunicorn --pythonpath= " $PWD /your_app_name " config.wsgi:application
worker: python your_app_name/manage.py rqworker high default low
提交并重新部署。然后添加您的新员工:
heroku scale worker=1
请参阅 CHANGELOG.md。