Django 與 RQ 集成,RQ 是一個基於 Redis 的 Python 佇列庫。 Django-RQ 是一個簡單的應用程序,可讓您在 django 的settings.py
中配置佇列並在專案中輕鬆使用它們。
如果您發現django-rq
有用,請考慮透過 Tidelift 支援其開發。
django-rq
(或從 PyPI 下載): pip install django - rq
django_rq
加入settings.py
中的INSTALLED_APPS
中: INSTALLED_APPS = (
# other apps
"django_rq" ,
)
settings.py
中配置佇列: RQ_QUEUES = {
'default' : {
'HOST' : 'localhost' ,
'PORT' : 6379 ,
'DB' : 0 ,
'USERNAME' : 'some-user' ,
'PASSWORD' : 'some-password' ,
'DEFAULT_TIMEOUT' : 360 ,
'REDIS_CLIENT_KWARGS' : { # Eventual additional Redis connection arguments
'ssl_cert_reqs' : None ,
},
},
'with-sentinel' : {
'SENTINELS' : [( 'localhost' , 26736 ), ( 'localhost' , 26737 )],
'MASTER_NAME' : 'redismaster' ,
'DB' : 0 ,
# Redis username/password
'USERNAME' : 'redis-user' ,
'PASSWORD' : 'secret' ,
'SOCKET_TIMEOUT' : 0.3 ,
'CONNECTION_KWARGS' : { # Eventual additional Redis connection arguments
'ssl' : True
},
'SENTINEL_KWARGS' : { # Eventual Sentinel connection arguments
# If Sentinel also has auth, username/password can be passed here
'username' : 'sentinel-user' ,
'password' : 'secret' ,
},
},
'high' : {
'URL' : os . getenv ( 'REDISTOGO_URL' , 'redis://localhost:6379/0' ), # If you're on Heroku
'DEFAULT_TIMEOUT' : 500 ,
},
'low' : {
'HOST' : 'localhost' ,
'PORT' : 6379 ,
'DB' : 0 ,
}
}
RQ_EXCEPTION_HANDLERS = [ 'path.to.my.handler' ] # If you need custom exception handlers
urls.py
中包含django_rq.urls
: urlpatterns += [
path ( 'django-rq/' , include ( 'django_rq.urls' ))
]
Django-RQ 可讓您輕鬆地將作業放入settings.py
中定義的任何佇列中。它帶有一些實用功能:
enqueue
- 將作業推送到default
佇列: import django_rq
django_rq . enqueue ( func , foo , bar = baz )
get_queue
- 傳回一個Queue
實例。 import django_rq
queue = django_rq . get_queue ( 'high' )
queue . enqueue ( func , foo , bar = baz )
除了name
參數之外, get_queue
還接受default_timeout
、 is_async
、 autocommit
、 connection
和queue_class
參數。例如:
queue = django_rq . get_queue ( 'default' , autocommit = True , is_async = True , default_timeout = 360 )
queue . enqueue ( func , foo , bar = baz )
您可以向此函數提供您自己的單例 Redis 連接對象,這樣它就不會為每個隊列定義建立新的連接對象。這將幫助您限制與 Redis 伺服器的連線數量。例如:
import django_rq
import redis
redis_cursor = redis . StrictRedis ( host = '' , port = '' , db = '' , password = '' )
high_queue = django_rq . get_queue ( 'high' , connection = redis_cursor )
low_queue = django_rq . get_queue ( 'low' , connection = redis_cursor )
get_connection
- 接受單一佇列名稱參數(預設為「default」)並傳回至佇列的 Redis 伺服器的連線: import django_rq
redis_conn = django_rq . get_connection ( 'high' )
get_worker
- 接受可選佇列名稱並傳回指定佇列(或default
佇列)的新 RQ Worker
實例: import django_rq
worker = django_rq . get_worker () # Returns a worker for "default" queue
worker . work ()
worker = django_rq . get_worker ( 'low' , 'high' ) # Returns a worker for "low" and "high"
要輕鬆地將可調用任務轉換為 RQ 任務,您也可以使用django_rq
附帶的@job
裝飾器:
from django_rq import job
@ job
def long_running_func ():
pass
long_running_func . delay () # Enqueue function in "default" queue
@ job ( 'high' )
def long_running_func ():
pass
long_running_func . delay () # Enqueue function in "high" queue
您可以傳入 RQ 的作業裝飾器所接受的任何參數:
@ job ( 'default' , timeout = 3600 )
def long_running_func ():
pass
long_running_func . delay () # Enqueue function with a timeout of 3600 seconds.
可以透過DEFAULT_RESULT_TTL
設定指定result_ttl
裝飾器關鍵字參數的預設值:
RQ = {
'DEFAULT_RESULT_TTL' : 5000 ,
}
使用此設置,作業裝飾器會將result_ttl
設為 5000,除非明確指定。
django_rq 提供了一個管理指令,為每個指定為參數的佇列啟動一個工作進程:
python manage.py rqworker 高預設低
如果你想在突發模式下執行rqworker
,你可以傳入--burst
標誌:
python manage.py rqworker 高預設低--burst
如果您需要使用自訂工作程序、作業或佇列類,最好使用全域設定(請參閱自訂佇列類別和自訂作業和工作程式類別)。但是,也可以使用命令列選項覆蓋此類設置,如下所示。
若要使用自訂工作程序類,您可以傳入--worker-class
標誌以及工作程序的路徑:
python manage.py rqworker 高 default low --worker-class 'path.to.GeventWorker'
若要使用自訂佇列類,您可以傳入--queue-class
標誌以及佇列類別的路徑:
python manage.py rqworker 高預設低 --queue-class 'path.to.CustomQueue'
若要使用自訂作業類,請提供--job-class
標誌。
從2.10版本開始,也支援執行RQ的worker-pool:
python manage.py rqworker-pool預設低中--num-workers 4
使用 RQ 1.2.0。您可以使用內建的排程器來完成您的作業。例如:
from django_rq . queues import get_queue
queue = get_queue ( 'default' )
job = queue . enqueue_at ( datetime ( 2020 , 10 , 10 ), func )
如果您使用內建調度程序,則必須啟動具有調度程序支援的工作程序:
python manage.py rqworker --with-scheduler
或者,您可以使用 RQ Scheduler。安裝後,您也可以使用get_scheduler
函數為 settings.py 的RQ_QUEUES
中定義的佇列傳回Scheduler
實例。例如:
import django_rq
scheduler = django_rq . get_scheduler ( 'default' )
job = scheduler . enqueue_at ( datetime ( 2020 , 10 , 10 ), func )
您也可以使用管理命令rqscheduler
來啟動調度程式:
python 管理.py rqscheduler
如果您安裝了 django-redis 或 django-redis-cache,您可以指示 django_rq 使用 Redis 快取中的相同連線資訊。這有兩個優點:它是 DRY,並且它利用了快取設定中可能進行的任何最佳化(例如使用連接池或 Hiredis)。
若要使用配置它,請使用字典,其鍵USE_REDIS_CACHE
指向RQ_QUEUES
字典中所需快取的名稱。不用說,所選的快取必須存在並且使用 Redis 後端。有關配置說明,請參閱各自的 Redis 快取包文件。還需要指出的是,由於 django-redis-cache ShardedClient
將快取拆分到多個 Redis 連接上,因此它不起作用。
這是 django-redis 的範例設定片段:
CACHES = {
'redis-cache' : {
'BACKEND' : 'redis_cache.cache.RedisCache' ,
'LOCATION' : 'localhost:6379:1' ,
'OPTIONS' : {
'CLIENT_CLASS' : 'django_redis.client.DefaultClient' ,
'MAX_ENTRIES' : 5000 ,
},
},
}
RQ_QUEUES = {
'high' : {
'USE_REDIS_CACHE' : 'redis-cache' ,
},
'low' : {
'USE_REDIS_CACHE' : 'redis-cache' ,
},
}
有時您可能希望暫停 RQ 以防止其處理新作業。一個典型的範例是在部署腳本的初始階段或將網站置於維護模式之前。當您的作業運行時間相對較長並且可能在部署期間強制終止時,這特別有用。
掛起指令會阻止所有佇列(在單一 Redis 資料庫中)上的工作執行緒拾取新作業。但是,目前正在運行的作業將繼續直至完成。
# Suspend indefinitely
python manage.py rqsuspend
# Suspend for a specific duration (in seconds) then automatically
# resume work again.
python manage.py rqsuspend -d 600
# Resume work again.
python manage.py rqresume
django_rq
還提供了一個儀表板來監視/django-rq/
中的佇列狀態(或您在安裝過程中在urls.py
中設定的任何 URL)。
您也可以透過在settings.py
中新增RQ_SHOW_ADMIN_LINK = True
來在/admin
中新增指向此儀表板連結的連結。但請注意,這將覆蓋預設管理模板,因此可能會幹擾修改預設管理模板的其他應用程式。
這些統計資料也可以透過/django-rq/stats.json
以 JSON 格式提供,工作人員可以存取。如果您需要透過其他 HTTP 用戶端存取此視圖(用於監控目的),您可以定義RQ_API_TOKEN
並透過/django-rq/stats.json/<API_TOKEN>
存取它。
注意:排程作業的統計資訊顯示來自 RQ 內建排程器的作業,而不是可選的 RQ 排程器。
此外,還可以從命令列存取這些統計資訊。
python manage.py rqstats
python manage.py rqstats --interval=1 # Refreshes every second
python manage.py rqstats --json # Output as JSON
python manage.py rqstats --yaml # Output as YAML
Sentry 應在 Django settings.py
中配置,如 Sentry 文件中所述。
您可以在執行rqworker
命令時透過傳遞sentry-dsn
選項來覆寫預設的 Django Sentry 設定:
./manage.py rqworker --sentry-dsn=https://*****@sentry.io/222222
這將覆寫任何現有的 Django 配置並重新初始化 Sentry,設定以下 Sentry 選項:
{
'debug' : options . get ( 'sentry_debug' ),
'ca_certs' : options . get ( 'sentry_ca_certs' ),
'integrations' : [ RedisIntegration (), RqIntegration (), DjangoIntegration ()]
}
RQ 使用Python 的logging
,這表示您可以在django 的settings.py
中輕鬆設定rqworker
的日誌記錄機制。例如:
LOGGING = {
"version" : 1 ,
"disable_existing_loggers" : False ,
"formatters" : {
"rq_console" : {
"format" : "%(asctime)s %(message)s" ,
"datefmt" : "%H:%M:%S" ,
},
},
"handlers" : {
"rq_console" : {
"level" : "DEBUG" ,
"class" : "rq.logutils.ColorizingStreamHandler" ,
"formatter" : "rq_console" ,
"exclude" : [ "%(asctime)s" ],
},
},
'loggers' : {
"rq.worker" : {
"handlers" : [ "rq_console" , "sentry" ],
"level" : "DEBUG"
},
}
}
預設情況下,每個佇列都將使用DjangoRQ
類別。如果您想要使用自訂佇列類,可以透過在RQ_QUEUES
中為每個佇列新增QUEUE_CLASS
選項來實現:
RQ_QUEUES = {
'default' : {
'HOST' : 'localhost' ,
'PORT' : 6379 ,
'DB' : 0 ,
'QUEUE_CLASS' : 'module.path.CustomClass' ,
}
}
或者您可以指定DjangoRQ
在RQ
設定中為所有佇列使用自訂類別:
RQ = {
'QUEUE_CLASS' : 'module.path.CustomClass' ,
}
自訂佇列類別應該繼承自django_rq.queues.DjangoRQ
。
如果您使用多個佇列類別(不建議),請確保僅在具有相同佇列類別的佇列上執行工作執行緒。例如,如果您在RQ_QUEUES
中定義了兩個佇列,並且其中一個佇列指定了自訂類,則您必須為每個佇列至少執行兩個單獨的工作執行緒。
與自訂佇列類別類似,可以使用JOB_CLASS
和WORKER_CLASS
設定來設定全域自訂作業和工作執行緒類別:
RQ = {
'JOB_CLASS' : 'module.path.CustomJobClass' ,
'WORKER_CLASS' : 'module.path.CustomWorkerClass' ,
}
自訂作業類別應繼承自rq.job.Job
。如果配置,它將用於所有作業。
自訂工作器類別應繼承自rq.worker.Worker
。它將用於運行所有工作程序,除非被rqworker
管理命令worker-class
選項覆蓋。
為了更輕鬆地測試過程,您可以透過以下方式同步執行工作執行緒:
from django . test import TestCase
from django_rq import get_worker
class MyTest ( TestCase ):
def test_something_that_creates_jobs ( self ):
... # Stuff that init jobs.
get_worker (). work ( burst = True ) # Processes all jobs then stop.
... # Asserts that the job stuff is done.
您可以將選項ASYNC
設定為False
以使同步操作成為給定佇列的預設操作。這將導致作業在調度時立即在同一執行緒上執行,這對於測試和偵錯非常有用。例如,您可以在設定檔中對組態進行排隊後新增以下內容:
# ... Logic to set DEBUG and TESTING settings to True or False ...
# ... Regular RQ_QUEUES setup code ...
if DEBUG or TESTING :
for queueConfig in RQ_QUEUES . values ():
queueConfig [ 'ASYNC' ] = False
請注意,在呼叫get_queue
時明確設定is_async
參數將覆蓋此設定。
運行django_rq
的測試套件:
`which django-admin` 測試 django_rq --settings=django_rq.tests.settings --pythonpath=.
建立運行高、預設和低隊列的 rqworker 服務。
sudo vi /etc/systemd/system/rqworker.service
[Unit]
Description=Django-RQ Worker
After=network.target
[Service]
WorkingDirectory= << path_to_your_project_folder>>
ExecStart=/home/ubuntu/.virtualenv/<<your_virtualenv>>/bin/python
<<path_to_your_project_folder>>/manage.py
rqworker high default low
[Install]
WantedBy=multi-user.target
啟用並啟動服務
sudo systemctl enable rqworker
sudo systemctl start rqworker
將 django-rq 加入您的requirements.txt 檔案中:
pip freeze > requirements.txt
將您的 Procfile 更新為:
web: gunicorn --pythonpath= " $PWD /your_app_name " config.wsgi:application
worker: python your_app_name/manage.py rqworker high default low
提交並重新部署。然後新增您的新員工:
heroku scale worker=1
請參閱 CHANGELOG.md。