Integrasi Django dengan RQ, perpustakaan antrian Python berbasis Redis. Django-RQ adalah aplikasi sederhana yang memungkinkan Anda untuk mengkonfigurasi antrian Anda di settings.py
Django dan dengan mudah menggunakannya dalam proyek Anda.
Jika Anda merasa django-rq
berguna, mohon pertimbangkan untuk mendukung pengembangannya melalui Tidelift.
django-rq
(atau unduh dari PyPI): pip install django - rq
django_rq
ke INSTALLED_APPS
di settings.py
: INSTALLED_APPS = (
# other apps
"django_rq" ,
)
settings.py
Django : RQ_QUEUES = {
'default' : {
'HOST' : 'localhost' ,
'PORT' : 6379 ,
'DB' : 0 ,
'USERNAME' : 'some-user' ,
'PASSWORD' : 'some-password' ,
'DEFAULT_TIMEOUT' : 360 ,
'REDIS_CLIENT_KWARGS' : { # Eventual additional Redis connection arguments
'ssl_cert_reqs' : None ,
},
},
'with-sentinel' : {
'SENTINELS' : [( 'localhost' , 26736 ), ( 'localhost' , 26737 )],
'MASTER_NAME' : 'redismaster' ,
'DB' : 0 ,
# Redis username/password
'USERNAME' : 'redis-user' ,
'PASSWORD' : 'secret' ,
'SOCKET_TIMEOUT' : 0.3 ,
'CONNECTION_KWARGS' : { # Eventual additional Redis connection arguments
'ssl' : True
},
'SENTINEL_KWARGS' : { # Eventual Sentinel connection arguments
# If Sentinel also has auth, username/password can be passed here
'username' : 'sentinel-user' ,
'password' : 'secret' ,
},
},
'high' : {
'URL' : os . getenv ( 'REDISTOGO_URL' , 'redis://localhost:6379/0' ), # If you're on Heroku
'DEFAULT_TIMEOUT' : 500 ,
},
'low' : {
'HOST' : 'localhost' ,
'PORT' : 6379 ,
'DB' : 0 ,
}
}
RQ_EXCEPTION_HANDLERS = [ 'path.to.my.handler' ] # If you need custom exception handlers
django_rq.urls
di urls.py
Anda : urlpatterns += [
path ( 'django-rq/' , include ( 'django_rq.urls' ))
]
Django-RQ memungkinkan Anda dengan mudah memasukkan pekerjaan ke dalam antrian mana pun yang ditentukan di settings.py
. Muncul dengan beberapa fungsi utilitas:
enqueue
- mendorong pekerjaan ke antrian default
: import django_rq
django_rq . enqueue ( func , foo , bar = baz )
get_queue
- mengembalikan instance Queue
. import django_rq
queue = django_rq . get_queue ( 'high' )
queue . enqueue ( func , foo , bar = baz )
Selain argumen name
, get_queue
juga menerima argumen default_timeout
, is_async
, autocommit
, connection
dan queue_class
. Misalnya:
queue = django_rq . get_queue ( 'default' , autocommit = True , is_async = True , default_timeout = 360 )
queue . enqueue ( func , foo , bar = baz )
Anda dapat menyediakan objek koneksi Redis tunggal Anda sendiri ke fungsi ini sehingga tidak akan membuat objek koneksi baru untuk setiap definisi antrean. Ini akan membantu Anda membatasi jumlah koneksi ke server Redis. Misalnya:
import django_rq
import redis
redis_cursor = redis . StrictRedis ( host = '' , port = '' , db = '' , password = '' )
high_queue = django_rq . get_queue ( 'high' , connection = redis_cursor )
low_queue = django_rq . get_queue ( 'low' , connection = redis_cursor )
get_connection
- menerima argumen nama antrean tunggal (defaultnya adalah "default") dan mengembalikan koneksi ke server Redis antrean: import django_rq
redis_conn = django_rq . get_connection ( 'high' )
get_worker
- menerima nama antrian opsional dan mengembalikan instance RQ Worker
baru untuk antrian tertentu (atau antrian default
): import django_rq
worker = django_rq . get_worker () # Returns a worker for "default" queue
worker . work ()
worker = django_rq . get_worker ( 'low' , 'high' ) # Returns a worker for "low" and "high"
Untuk dengan mudah mengubah callable menjadi tugas RQ, Anda juga dapat menggunakan dekorator @job
yang disertakan dengan django_rq
:
from django_rq import job
@ job
def long_running_func ():
pass
long_running_func . delay () # Enqueue function in "default" queue
@ job ( 'high' )
def long_running_func ():
pass
long_running_func . delay () # Enqueue function in "high" queue
Anda dapat menyampaikan argumen apa pun yang diterima oleh dekorator pekerjaan RQ:
@ job ( 'default' , timeout = 3600 )
def long_running_func ():
pass
long_running_func . delay () # Enqueue function with a timeout of 3600 seconds.
Dimungkinkan untuk menentukan default untuk argumen kata kunci dekorator result_ttl
melalui pengaturan DEFAULT_RESULT_TTL
:
RQ = {
'DEFAULT_RESULT_TTL' : 5000 ,
}
Dengan pengaturan ini, dekorator pekerjaan akan menyetel result_ttl
ke 5000 kecuali ditentukan secara eksplisit.
Django_rq menyediakan perintah manajemen yang memulai pekerja untuk setiap antrian yang ditentukan sebagai argumen:
python manage.py rqworker tinggi default rendah
Jika Anda ingin menjalankan rqworker
dalam mode burst, Anda dapat meneruskan flag --burst
:
python kelola.py rqworker tinggi default rendah --meledak
Jika Anda perlu menggunakan kelas pekerja, pekerjaan, atau antrian khusus, yang terbaik adalah menggunakan pengaturan global (lihat Kelas antrian khusus dan Kelas pekerjaan dan pekerja khusus). Namun, dimungkinkan juga untuk mengganti pengaturan tersebut dengan opsi baris perintah sebagai berikut.
Untuk menggunakan kelas pekerja khusus, Anda dapat meneruskan tanda --worker-class
dengan jalur ke pekerja Anda:
python kelola.py rqworker tinggi default rendah --kelas pekerja 'path.to.GeventWorker'
Untuk menggunakan kelas antrean khusus, Anda dapat meneruskan tanda --queue-class
dengan jalur ke kelas antrean Anda:
python kelola.py rqworker tinggi default rendah --kelas antrian 'path.to.CustomQueue'
Untuk menggunakan kelas pekerjaan khusus, berikan tanda --job-class
.
Mulai dari versi 2.10, menjalankan kumpulan pekerja RQ juga didukung:
python kelola.py rqworker-pool default rendah menengah --num-workers 4
Dengan RQ 1.2.0. Anda dapat menggunakan penjadwal bawaan untuk pekerjaan Anda. Misalnya:
from django_rq . queues import get_queue
queue = get_queue ( 'default' )
job = queue . enqueue_at ( datetime ( 2020 , 10 , 10 ), func )
Jika Anda menggunakan penjadwal bawaan, Anda harus memulai pekerja dengan dukungan penjadwal:
python kelola.py rqworker --dengan-penjadwal
Alternatifnya, Anda dapat menggunakan Penjadwal RQ. Setelah instalasi, Anda juga dapat menggunakan fungsi get_scheduler
untuk mengembalikan instance Scheduler
untuk antrian yang ditentukan dalam RQ_QUEUES
settings.py. Misalnya:
import django_rq
scheduler = django_rq . get_scheduler ( 'default' )
job = scheduler . enqueue_at ( datetime ( 2020 , 10 , 10 ), func )
Anda juga dapat menggunakan perintah manajemen rqscheduler
untuk memulai penjadwal:
python kelola.py rqscheduler
Jika Anda telah menginstal Django-redis atau Django-redis-cache, Anda dapat menginstruksikan Django_rq untuk menggunakan informasi koneksi yang sama dari cache Redis Anda. Ini memiliki dua keuntungan: KERING dan memanfaatkan pengoptimalan apa pun yang mungkin terjadi dalam pengaturan cache Anda (seperti menggunakan pengumpulan koneksi atau Hiredis.)
Untuk menggunakan konfigurasinya, gunakan dict dengan kunci USE_REDIS_CACHE
yang menunjuk ke nama cache yang diinginkan di dict RQ_QUEUES
Anda. Tentu saja cache yang dipilih harus ada dan menggunakan backend Redis. Lihat dokumen paket cache Redis Anda masing-masing untuk petunjuk konfigurasi. Penting juga untuk menunjukkan bahwa karena Django-redis-cache ShardedClient
membagi cache ke beberapa koneksi Redis, itu tidak berfungsi.
Berikut adalah contoh fragmen pengaturan untuk Django-redis:
CACHES = {
'redis-cache' : {
'BACKEND' : 'redis_cache.cache.RedisCache' ,
'LOCATION' : 'localhost:6379:1' ,
'OPTIONS' : {
'CLIENT_CLASS' : 'django_redis.client.DefaultClient' ,
'MAX_ENTRIES' : 5000 ,
},
},
}
RQ_QUEUES = {
'high' : {
'USE_REDIS_CACHE' : 'redis-cache' ,
},
'low' : {
'USE_REDIS_CACHE' : 'redis-cache' ,
},
}
Terkadang Anda mungkin ingin menangguhkan RQ untuk mencegahnya memproses pekerjaan baru. Contoh klasiknya adalah selama fase awal skrip penerapan atau sebelum menempatkan situs Anda ke mode pemeliharaan. Hal ini sangat membantu ketika Anda memiliki pekerjaan yang relatif berjalan lama dan mungkin akan dihentikan secara paksa selama penerapan.
Perintah suspend menghentikan pekerja di antrean _all_ (dalam satu database Redis) untuk mengambil pekerjaan baru. Namun pekerjaan yang sedang berjalan akan terus berlanjut hingga selesai.
# Suspend indefinitely
python manage.py rqsuspend
# Suspend for a specific duration (in seconds) then automatically
# resume work again.
python manage.py rqsuspend -d 600
# Resume work again.
python manage.py rqresume
django_rq
juga menyediakan dasbor untuk memantau status antrian Anda di /django-rq/
(atau URL apa pun yang Anda atur di urls.py
selama instalasi.
Anda juga dapat menambahkan tautan ke tautan dasbor ini di /admin
dengan menambahkan RQ_SHOW_ADMIN_LINK = True
di settings.py
. Namun berhati-hatilah, ini akan mengesampingkan template admin default sehingga dapat mengganggu aplikasi lain yang memodifikasi template admin default.
Statistik ini juga tersedia dalam format JSON melalui /django-rq/stats.json
, yang dapat diakses oleh anggota staf. Jika Anda perlu mengakses tampilan ini melalui klien HTTP lainnya (untuk tujuan pemantauan), Anda dapat mendefinisikan RQ_API_TOKEN
dan mengaksesnya melalui /django-rq/stats.json/<API_TOKEN>
.
Catatan: Statistik pekerjaan terjadwal menampilkan pekerjaan dari penjadwal bawaan RQ, bukan penjadwal RQ opsional.
Selain itu, statistik ini juga dapat diakses dari baris perintah.
python manage.py rqstats
python manage.py rqstats --interval=1 # Refreshes every second
python manage.py rqstats --json # Output as JSON
python manage.py rqstats --yaml # Output as YAML
Penjaga harus dikonfigurasi dalam Django settings.py
seperti yang dijelaskan dalam dokumen Sentry.
Anda dapat mengganti konfigurasi default Django Sentry saat menjalankan perintah rqworker
dengan meneruskan opsi sentry-dsn
:
./manage.py rqworker --sentry-dsn=https://*****@sentry.io/222222
Ini akan mengesampingkan konfigurasi Django yang ada dan menginisialisasi ulang Sentry, menyetel opsi Sentry berikut:
{
'debug' : options . get ( 'sentry_debug' ),
'ca_certs' : options . get ( 'sentry_ca_certs' ),
'integrations' : [ RedisIntegration (), RqIntegration (), DjangoIntegration ()]
}
RQ menggunakan logging
Python, ini berarti Anda dapat dengan mudah mengkonfigurasi mekanisme pencatatan rqworker
di settings.py
Django. Misalnya:
LOGGING = {
"version" : 1 ,
"disable_existing_loggers" : False ,
"formatters" : {
"rq_console" : {
"format" : "%(asctime)s %(message)s" ,
"datefmt" : "%H:%M:%S" ,
},
},
"handlers" : {
"rq_console" : {
"level" : "DEBUG" ,
"class" : "rq.logutils.ColorizingStreamHandler" ,
"formatter" : "rq_console" ,
"exclude" : [ "%(asctime)s" ],
},
},
'loggers' : {
"rq.worker" : {
"handlers" : [ "rq_console" , "sentry" ],
"level" : "DEBUG"
},
}
}
Secara default, setiap antrian akan menggunakan kelas DjangoRQ
. Jika Anda ingin menggunakan kelas antrian khusus, Anda dapat melakukannya dengan menambahkan opsi QUEUE_CLASS
per antrian di RQ_QUEUES
:
RQ_QUEUES = {
'default' : {
'HOST' : 'localhost' ,
'PORT' : 6379 ,
'DB' : 0 ,
'QUEUE_CLASS' : 'module.path.CustomClass' ,
}
}
atau Anda dapat menentukan DjangoRQ
untuk menggunakan kelas khusus untuk semua antrian Anda dalam pengaturan RQ
:
RQ = {
'QUEUE_CLASS' : 'module.path.CustomClass' ,
}
Kelas antrian khusus harus mewarisi dari django_rq.queues.DjangoRQ
.
Jika Anda menggunakan lebih dari satu kelas antrian (tidak disarankan), pastikan hanya menjalankan pekerja pada antrian dengan kelas antrian yang sama. Misalnya jika Anda memiliki dua antrean yang ditentukan di RQ_QUEUES
dan satu antrean memiliki kelas khusus yang ditentukan, Anda harus menjalankan setidaknya dua pekerja terpisah untuk setiap antrean.
Mirip dengan kelas antrian khusus, kelas pekerjaan dan pekerja khusus global dapat dikonfigurasi menggunakan pengaturan JOB_CLASS
dan WORKER_CLASS
:
RQ = {
'JOB_CLASS' : 'module.path.CustomJobClass' ,
'WORKER_CLASS' : 'module.path.CustomWorkerClass' ,
}
Kelas pekerjaan khusus harus mewarisi dari rq.job.Job
. Ini akan digunakan untuk semua pekerjaan jika dikonfigurasi.
Kelas pekerja khusus harus mewarisi dari rq.worker.Worker
. Ini akan digunakan untuk menjalankan semua pekerja kecuali diganti dengan opsi worker-class
perintah manajemen rqworker
.
Untuk proses pengujian yang lebih mudah, Anda dapat menjalankan pekerja secara sinkron dengan cara ini:
from django . test import TestCase
from django_rq import get_worker
class MyTest ( TestCase ):
def test_something_that_creates_jobs ( self ):
... # Stuff that init jobs.
get_worker (). work ( burst = True ) # Processes all jobs then stop.
... # Asserts that the job stuff is done.
Anda dapat mengatur opsi ASYNC
ke False
untuk menjadikan operasi sinkron sebagai default untuk antrean tertentu. Hal ini akan menyebabkan pekerjaan segera dieksekusi dan berada pada thread yang sama dengan pengirimannya, yang berguna untuk pengujian dan debugging. Misalnya, Anda dapat menambahkan yang berikut ini setelah Anda mengantri konfigurasi di file pengaturan Anda:
# ... Logic to set DEBUG and TESTING settings to True or False ...
# ... Regular RQ_QUEUES setup code ...
if DEBUG or TESTING :
for queueConfig in RQ_QUEUES . values ():
queueConfig [ 'ASYNC' ] = False
Perhatikan bahwa menyetel parameter is_async
secara eksplisit saat memanggil get_queue
akan mengesampingkan setelan ini.
Untuk menjalankan rangkaian pengujian django_rq
:
`yang Django-admin` menguji Django_rq --settings=django_rq.tests.settings --pythonpath=.
Buat layanan rqworker yang menjalankan antrian tinggi, default, dan rendah.
sudo vi /etc/systemd/system/rqworker.service
[Unit]
Description=Django-RQ Worker
After=network.target
[Service]
WorkingDirectory= << path_to_your_project_folder>>
ExecStart=/home/ubuntu/.virtualenv/<<your_virtualenv>>/bin/python
<<path_to_your_project_folder>>/manage.py
rqworker high default low
[Install]
WantedBy=multi-user.target
Aktifkan dan mulai layanan
sudo systemctl enable rqworker
sudo systemctl start rqworker
Tambahkan Django-rq ke berkas persyaratan.txt Anda dengan:
pip freeze > requirements.txt
Perbarui Profil Anda menjadi:
web: gunicorn --pythonpath= " $PWD /your_app_name " config.wsgi:application
worker: python your_app_name/manage.py rqworker high default low
Komit dan terapkan kembali. Kemudian tambahkan pekerja baru Anda dengan:
heroku scale worker=1
Lihat CHANGELOG.md.