การรวม Django กับ RQ ซึ่งเป็นไลบรารีการจัดคิว Python ที่ใช้ Redis Django-RQ เป็นแอปเรียบง่ายที่ให้คุณกำหนดค่าคิวใน settings.py
ของ django และใช้ในโปรเจ็กต์ของคุณได้อย่างง่ายดาย
หากคุณพบว่า django-rq
มีประโยชน์ โปรดพิจารณาสนับสนุนการพัฒนาผ่าน Tidelift
django-rq
(หรือดาวน์โหลดจาก PyPI): pip install django - rq
django_rq
ไปที่ INSTALLED_APPS
ใน settings.py
: INSTALLED_APPS = (
# other apps
"django_rq" ,
)
settings.py
ของ django : RQ_QUEUES = {
'default' : {
'HOST' : 'localhost' ,
'PORT' : 6379 ,
'DB' : 0 ,
'USERNAME' : 'some-user' ,
'PASSWORD' : 'some-password' ,
'DEFAULT_TIMEOUT' : 360 ,
'REDIS_CLIENT_KWARGS' : { # Eventual additional Redis connection arguments
'ssl_cert_reqs' : None ,
},
},
'with-sentinel' : {
'SENTINELS' : [( 'localhost' , 26736 ), ( 'localhost' , 26737 )],
'MASTER_NAME' : 'redismaster' ,
'DB' : 0 ,
# Redis username/password
'USERNAME' : 'redis-user' ,
'PASSWORD' : 'secret' ,
'SOCKET_TIMEOUT' : 0.3 ,
'CONNECTION_KWARGS' : { # Eventual additional Redis connection arguments
'ssl' : True
},
'SENTINEL_KWARGS' : { # Eventual Sentinel connection arguments
# If Sentinel also has auth, username/password can be passed here
'username' : 'sentinel-user' ,
'password' : 'secret' ,
},
},
'high' : {
'URL' : os . getenv ( 'REDISTOGO_URL' , 'redis://localhost:6379/0' ), # If you're on Heroku
'DEFAULT_TIMEOUT' : 500 ,
},
'low' : {
'HOST' : 'localhost' ,
'PORT' : 6379 ,
'DB' : 0 ,
}
}
RQ_EXCEPTION_HANDLERS = [ 'path.to.my.handler' ] # If you need custom exception handlers
django_rq.urls
ใน urls.py
ของคุณ : urlpatterns += [
path ( 'django-rq/' , include ( 'django_rq.urls' ))
]
Django-RQ ช่วยให้คุณสามารถใส่งานลงในคิวใดๆ ที่กำหนดไว้ใน settings.py
ได้อย่างง่ายดาย มันมาพร้อมกับฟังก์ชั่นยูทิลิตี้บางอย่าง:
enqueue
- ดันงานไปที่คิว default
: import django_rq
django_rq . enqueue ( func , foo , bar = baz )
get_queue
- ส่งคืนอินสแตนซ์ Queue
import django_rq
queue = django_rq . get_queue ( 'high' )
queue . enqueue ( func , foo , bar = baz )
นอกจากอาร์กิวเมนต์ name
แล้ว get_queue
ยังยอมรับอาร์กิวเมนต์ default_timeout
, is_async
, autocommit
, connection
และ queue_class
ตัวอย่างเช่น:
queue = django_rq . get_queue ( 'default' , autocommit = True , is_async = True , default_timeout = 360 )
queue . enqueue ( func , foo , bar = baz )
คุณสามารถจัดเตรียมออบเจ็กต์การเชื่อมต่อ Redis ซิงเกิลของคุณเองให้กับฟังก์ชันนี้ เพื่อไม่ให้สร้างออบเจ็กต์การเชื่อมต่อใหม่สำหรับแต่ละคำจำกัดความของคิว สิ่งนี้จะช่วยคุณจำกัดจำนวนการเชื่อมต่อไปยังเซิร์ฟเวอร์ Redis ตัวอย่างเช่น:
import django_rq
import redis
redis_cursor = redis . StrictRedis ( host = '' , port = '' , db = '' , password = '' )
high_queue = django_rq . get_queue ( 'high' , connection = redis_cursor )
low_queue = django_rq . get_queue ( 'low' , connection = redis_cursor )
get_connection
- ยอมรับอาร์กิวเมนต์ชื่อคิวเดียว (ค่าเริ่มต้นเป็น "ค่าเริ่มต้น") และส่งคืนการเชื่อมต่อกับเซิร์ฟเวอร์ Redis ของคิว: import django_rq
redis_conn = django_rq . get_connection ( 'high' )
get_worker
- ยอมรับชื่อคิวทางเลือกและส่งคืนอินสแตนซ์ RQ Worker
ใหม่สำหรับคิวที่ระบุ (หรือคิว default
): import django_rq
worker = django_rq . get_worker () # Returns a worker for "default" queue
worker . work ()
worker = django_rq . get_worker ( 'low' , 'high' ) # Returns a worker for "low" and "high"
หากต้องการเปลี่ยน callable ให้เป็นงาน RQ ได้อย่างง่ายดาย คุณสามารถใช้ @job
decorator ที่มาพร้อมกับ django_rq
:
from django_rq import job
@ job
def long_running_func ():
pass
long_running_func . delay () # Enqueue function in "default" queue
@ job ( 'high' )
def long_running_func ():
pass
long_running_func . delay () # Enqueue function in "high" queue
คุณสามารถส่งข้อโต้แย้งใดๆ ที่ผู้ตกแต่งงานของ RQ ยอมรับได้:
@ job ( 'default' , timeout = 3600 )
def long_running_func ():
pass
long_running_func . delay () # Enqueue function with a timeout of 3600 seconds.
เป็นไปได้ที่จะระบุค่าเริ่มต้นสำหรับอาร์กิวเมนต์คำหลักมัณฑนากร result_ttl
ผ่านการตั้งค่า DEFAULT_RESULT_TTL
:
RQ = {
'DEFAULT_RESULT_TTL' : 5000 ,
}
ด้วยการตั้งค่านี้ ผู้ตกแต่งงานจะตั้ง result_ttl
เป็น 5000 เว้นแต่จะระบุไว้อย่างชัดเจน
django_rq จัดเตรียมคำสั่งการจัดการที่เริ่มต้นผู้ปฏิบัติงานสำหรับทุกคิวที่ระบุเป็นอาร์กิวเมนต์:
หลาม Manage.py rqworker สูง ค่าเริ่มต้นต่ำ
หากคุณต้องการรัน rqworker
ในโหมดเบิร์สต์ คุณสามารถส่งผ่านแฟล็ก --burst
ได้:
python Manage.py rqworker ค่าเริ่มต้นสูง ต่ำ --burst
หากคุณต้องการใช้คลาสผู้ปฏิบัติงาน งาน หรือคิวแบบกำหนดเอง วิธีที่ดีที่สุดคือใช้การตั้งค่าส่วนกลาง (ดูคลาสคิวแบบกำหนดเอง และคลาสงานและผู้ปฏิบัติงานแบบกำหนดเอง) อย่างไรก็ตาม คุณยังสามารถแทนที่การตั้งค่าดังกล่าวด้วยตัวเลือกบรรทัดคำสั่งดังต่อไปนี้
หากต้องการใช้คลาสผู้ปฏิบัติงานที่กำหนดเอง คุณสามารถส่งผ่านแฟล็ก --worker-class
พร้อมพาธไปยังผู้ปฏิบัติงานของคุณ:
python Manage.py rqworker ค่าเริ่มต้นสูงต่ำ --worker-class 'path.to.GeventWorker'
หากต้องการใช้คลาสคิวแบบกำหนดเอง คุณสามารถส่งผ่านแฟล็ก --queue-class
พร้อมพาธไปยังคลาสคิวของคุณ:
python Manage.py rqworker ค่าเริ่มต้นสูงต่ำ --queue-class 'path.to.CustomQueue'
หากต้องการใช้คลาสงานที่กำหนดเอง ให้จัดเตรียมแฟล็ก --job-class
ตั้งแต่เวอร์ชัน 2.10 เป็นต้นไป การรันพูลผู้ปฏิบัติงานของ RQ ก็ได้รับการสนับสนุนเช่นกัน:
python Manage.py rqworker-pool ค่าเริ่มต้นต่ำปานกลาง --num-workers 4
ด้วย RQ 1.2.0 คุณสามารถใช้ตัวกำหนดเวลาในตัวสำหรับงานของคุณได้ ตัวอย่างเช่น:
from django_rq . queues import get_queue
queue = get_queue ( 'default' )
job = queue . enqueue_at ( datetime ( 2020 , 10 , 10 ), func )
หากคุณใช้ตัวจัดกำหนดการในตัว คุณต้องเริ่มต้นผู้ปฏิบัติงานด้วยการสนับสนุนตัวจัดกำหนดการ:
หลาม Manage.py rqworker --with-scheduler
หรือคุณสามารถใช้ RQ Scheduler ได้ หลังการติดตั้ง คุณยังสามารถใช้ฟังก์ชัน get_scheduler
เพื่อส่งคืนอินสแตนซ์ Scheduler
สำหรับคิวที่กำหนดใน RQ_QUEUES
ของ settings.py ตัวอย่างเช่น:
import django_rq
scheduler = django_rq . get_scheduler ( 'default' )
job = scheduler . enqueue_at ( datetime ( 2020 , 10 , 10 ), func )
คุณยังสามารถใช้คำสั่งการจัดการ rqscheduler
เพื่อเริ่มตัวกำหนดตารางเวลา:
หลาม Manage.py rqscheduler
หากคุณติดตั้ง django-redis หรือ django-redis-cache คุณสามารถสั่งให้ django_rq ใช้ข้อมูลการเชื่อมต่อเดียวกันจากแคช Redis ของคุณได้ มีข้อดีสองประการ: เป็นแบบแห้ง และใช้ประโยชน์จากการปรับให้เหมาะสมใดๆ ที่อาจเกิดขึ้นในการตั้งค่าแคชของคุณ (เช่น การใช้การรวมการเชื่อมต่อหรือ Hiredis)
หากต้องการใช้การกำหนดค่า ให้ใช้ dict พร้อมคีย์ USE_REDIS_CACHE
ที่ชี้ไปที่ชื่อของแคชที่ต้องการใน dict RQ_QUEUES
ของคุณ ดำเนินไปโดยไม่ได้บอกว่าต้องมีแคชที่เลือกอยู่และใช้แบ็กเอนด์ Redis ดูเอกสารแพ็คเกจแคช Redis ที่เกี่ยวข้องเพื่อดูคำแนะนำในการกำหนดค่า สิ่งสำคัญคือต้องชี้ให้เห็นว่าเนื่องจาก django-redis-cache ShardedClient
แยกแคชผ่านการเชื่อมต่อ Redis หลายรายการ จึงไม่ทำงาน
นี่คือตัวอย่างการตั้งค่าส่วนสำหรับ django-redis:
CACHES = {
'redis-cache' : {
'BACKEND' : 'redis_cache.cache.RedisCache' ,
'LOCATION' : 'localhost:6379:1' ,
'OPTIONS' : {
'CLIENT_CLASS' : 'django_redis.client.DefaultClient' ,
'MAX_ENTRIES' : 5000 ,
},
},
}
RQ_QUEUES = {
'high' : {
'USE_REDIS_CACHE' : 'redis-cache' ,
},
'low' : {
'USE_REDIS_CACHE' : 'redis-cache' ,
},
}
บางครั้งคุณอาจต้องการระงับ RQ เพื่อป้องกันไม่ให้ประมวลผลงานใหม่ ตัวอย่างคลาสสิกคือในระหว่างระยะเริ่มต้นของสคริปต์การปรับใช้หรือก่อนที่เว็บไซต์ของคุณจะเข้าสู่โหมดการบำรุงรักษา สิ่งนี้มีประโยชน์อย่างยิ่งเมื่อคุณมีงานที่ค่อนข้างใช้เวลานาน และอาจถูกบังคับให้หยุดทำงานในระหว่างการปรับใช้
คำสั่ง Suspend จะหยุดผู้ปฏิบัติงานบนคิว _all_ (ในฐานข้อมูล Redis เดียว) ไม่ให้รับงานใหม่ อย่างไรก็ตาม งานที่กำลังดำเนินการอยู่จะยังคงดำเนินต่อไปจนกว่าจะแล้วเสร็จ
# Suspend indefinitely
python manage.py rqsuspend
# Suspend for a specific duration (in seconds) then automatically
# resume work again.
python manage.py rqsuspend -d 600
# Resume work again.
python manage.py rqresume
django_rq
ยังมีแดชบอร์ดสำหรับตรวจสอบสถานะของคิวของคุณที่ /django-rq/
(หรือ URL ใดก็ตามที่คุณตั้งไว้ใน urls.py
ระหว่างการติดตั้ง
คุณยังสามารถเพิ่มลิงก์ไปยังลิงก์แดชบอร์ดนี้ได้ใน /admin
โดยเพิ่ม RQ_SHOW_ADMIN_LINK = True
ใน settings.py
แต่ระวัง การดำเนินการนี้จะแทนที่เทมเพลตผู้ดูแลระบบเริ่มต้น ดังนั้นจึงอาจรบกวนแอปอื่นๆ ที่แก้ไขเทมเพลตผู้ดูแลระบบเริ่มต้น
สถิติเหล่านี้ยังมีอยู่ในรูปแบบ JSON ผ่าน /django-rq/stats.json
ซึ่งเจ้าหน้าที่สามารถเข้าถึงได้ หากคุณต้องการเข้าถึงมุมมองนี้ผ่านไคลเอนต์ HTTP อื่น ๆ (เพื่อวัตถุประสงค์ในการตรวจสอบ) คุณสามารถกำหนด RQ_API_TOKEN
และเข้าถึงผ่านทาง /django-rq/stats.json/<API_TOKEN>
หมายเหตุ: สถิติของงานที่กำหนดเวลาไว้จะแสดงงานจากตัวกำหนดเวลาในตัว RQ ไม่ใช่ตัวกำหนดเวลา RQ เสริม
นอกจากนี้ สถิติเหล่านี้ยังสามารถเข้าถึงได้จากบรรทัดคำสั่งอีกด้วย
python manage.py rqstats
python manage.py rqstats --interval=1 # Refreshes every second
python manage.py rqstats --json # Output as JSON
python manage.py rqstats --yaml # Output as YAML
Sentry ควรได้รับการกำหนดค่าภายใน Django settings.py
ตามที่อธิบายไว้ในเอกสาร Sentry
คุณสามารถแทนที่การกำหนดค่าเริ่มต้นของ Django Sentry ได้เมื่อรันคำสั่ง rqworker
โดยส่งตัวเลือก sentry-dsn
:
./manage.py rqworker --sentry-dsn=https://*****@sentry.io/222222
สิ่งนี้จะแทนที่การกำหนดค่า Django ที่มีอยู่และเริ่มต้น Sentry ใหม่ โดยตั้งค่าตัวเลือก Sentry ต่อไปนี้:
{
'debug' : options . get ( 'sentry_debug' ),
'ca_certs' : options . get ( 'sentry_ca_certs' ),
'integrations' : [ RedisIntegration (), RqIntegration (), DjangoIntegration ()]
}
RQ ใช้ logging
ของ Python ซึ่งหมายความว่าคุณสามารถกำหนดค่ากลไกการบันทึกของ rqworker
ใน settings.py
ของ django ได้อย่างง่ายดาย ตัวอย่างเช่น:
LOGGING = {
"version" : 1 ,
"disable_existing_loggers" : False ,
"formatters" : {
"rq_console" : {
"format" : "%(asctime)s %(message)s" ,
"datefmt" : "%H:%M:%S" ,
},
},
"handlers" : {
"rq_console" : {
"level" : "DEBUG" ,
"class" : "rq.logutils.ColorizingStreamHandler" ,
"formatter" : "rq_console" ,
"exclude" : [ "%(asctime)s" ],
},
},
'loggers' : {
"rq.worker" : {
"handlers" : [ "rq_console" , "sentry" ],
"level" : "DEBUG"
},
}
}
ตามค่าเริ่มต้น ทุกคิวจะใช้คลาส DjangoRQ
หากคุณต้องการใช้คลาสคิวแบบกำหนดเอง คุณสามารถทำได้โดยการเพิ่มตัวเลือก QUEUE_CLASS
ในแต่ละคิวใน RQ_QUEUES
:
RQ_QUEUES = {
'default' : {
'HOST' : 'localhost' ,
'PORT' : 6379 ,
'DB' : 0 ,
'QUEUE_CLASS' : 'module.path.CustomClass' ,
}
}
หรือคุณสามารถระบุ DjangoRQ
เพื่อใช้คลาสที่กำหนดเองสำหรับคิวทั้งหมดของคุณในการตั้งค่า RQ
:
RQ = {
'QUEUE_CLASS' : 'module.path.CustomClass' ,
}
คลาสคิวแบบกำหนดเองควรสืบทอดมาจาก django_rq.queues.DjangoRQ
หากคุณใช้คลาสคิวมากกว่าหนึ่งคลาส (ไม่แนะนำ) ต้องแน่ใจว่ารันเฉพาะผู้ปฏิบัติงานบนคิวที่มีคลาสคิวเดียวกันเท่านั้น ตัวอย่างเช่น หากคุณมีสองคิวที่กำหนดไว้ใน RQ_QUEUES
และอีกหนึ่งคิวมีคลาสที่กำหนดเอง คุณจะต้องรันผู้ปฏิบัติงานแยกกันอย่างน้อยสองคนสำหรับแต่ละคิว
ในทำนองเดียวกันกับคลาสคิวที่กำหนดเอง งานที่กำหนดเองทั่วโลกและคลาสผู้ปฏิบัติงานสามารถกำหนดค่าได้โดยใช้การตั้งค่า JOB_CLASS
และ WORKER_CLASS
:
RQ = {
'JOB_CLASS' : 'module.path.CustomJobClass' ,
'WORKER_CLASS' : 'module.path.CustomWorkerClass' ,
}
คลาสงานแบบกำหนดเองควรสืบทอดจาก rq.job.Job
มันจะถูกใช้สำหรับงานทั้งหมดหากกำหนดค่าไว้
คลาสผู้ปฏิบัติงานแบบกำหนดเองควรสืบทอดมาจาก rq.worker.Worker
มันจะถูกใช้สำหรับการรันผู้ปฏิบัติงานทั้งหมด เว้นแต่จะถูกแทนที่โดยตัวเลือก worker-class
ของคำสั่งการจัดการ rqworker
เพื่อให้กระบวนการทดสอบง่ายขึ้น คุณสามารถเรียกใช้ผู้ปฏิบัติงานพร้อมกันได้ด้วยวิธีนี้:
from django . test import TestCase
from django_rq import get_worker
class MyTest ( TestCase ):
def test_something_that_creates_jobs ( self ):
... # Stuff that init jobs.
get_worker (). work ( burst = True ) # Processes all jobs then stop.
... # Asserts that the job stuff is done.
คุณสามารถตั้งค่าตัวเลือก ASYNC
เป็น False
เพื่อให้การดำเนินการแบบซิงโครนัสเป็นค่าเริ่มต้นสำหรับคิวที่กำหนด สิ่งนี้จะทำให้งานดำเนินการทันทีและบนเธรดเดียวกันกับที่จัดส่ง ซึ่งมีประโยชน์สำหรับการทดสอบและการดีบัก ตัวอย่างเช่น คุณอาจเพิ่มสิ่งต่อไปนี้หลังจากที่คุณจัดคิวการกำหนดค่าในไฟล์การตั้งค่าของคุณ:
# ... Logic to set DEBUG and TESTING settings to True or False ...
# ... Regular RQ_QUEUES setup code ...
if DEBUG or TESTING :
for queueConfig in RQ_QUEUES . values ():
queueConfig [ 'ASYNC' ] = False
โปรดทราบว่าการตั้งค่าพารามิเตอร์ is_async
อย่างชัดเจนเมื่อเรียกใช้ get_queue
จะแทนที่การตั้งค่านี้
หากต้องการรันชุดทดสอบของ django_rq
:
`ซึ่ง django-admin` ทดสอบ django_rq --settings=django_rq.tests.settings --pythonpath=
สร้างบริการ rqworker ที่รันคิวสูง ค่าเริ่มต้น และต่ำ
sudo vi /etc/systemd/system/rqworker.service
[Unit]
Description=Django-RQ Worker
After=network.target
[Service]
WorkingDirectory= << path_to_your_project_folder>>
ExecStart=/home/ubuntu/.virtualenv/<<your_virtualenv>>/bin/python
<<path_to_your_project_folder>>/manage.py
rqworker high default low
[Install]
WantedBy=multi-user.target
เปิดใช้งานและเริ่มบริการ
sudo systemctl enable rqworker
sudo systemctl start rqworker
เพิ่ม django-rq ลงในไฟล์ needs.txt ของคุณด้วย:
pip freeze > requirements.txt
อัปเดต Procfile ของคุณเป็น:
web: gunicorn --pythonpath= " $PWD /your_app_name " config.wsgi:application
worker: python your_app_name/manage.py rqworker high default low
คอมมิตและปรับใช้ใหม่ จากนั้นเพิ่มผู้ปฏิบัติงานใหม่ของคุณด้วย:
heroku scale worker=1
ดู CHANGELOG.md