Интеграция Django с RQ, библиотекой очередей Python на основе Redis. Django-RQ — это простое приложение, которое позволяет вам настраивать очереди в settings.py
django и легко использовать их в своем проекте.
Если вы считаете django-rq
полезным, рассмотрите возможность поддержки его разработки через Tidelift.
django-rq
(или скачайте с PyPI): pip install django - rq
django_rq
в INSTALLED_APPS
в settings.py
: INSTALLED_APPS = (
# other apps
"django_rq" ,
)
settings.py
django: RQ_QUEUES = {
'default' : {
'HOST' : 'localhost' ,
'PORT' : 6379 ,
'DB' : 0 ,
'USERNAME' : 'some-user' ,
'PASSWORD' : 'some-password' ,
'DEFAULT_TIMEOUT' : 360 ,
'REDIS_CLIENT_KWARGS' : { # Eventual additional Redis connection arguments
'ssl_cert_reqs' : None ,
},
},
'with-sentinel' : {
'SENTINELS' : [( 'localhost' , 26736 ), ( 'localhost' , 26737 )],
'MASTER_NAME' : 'redismaster' ,
'DB' : 0 ,
# Redis username/password
'USERNAME' : 'redis-user' ,
'PASSWORD' : 'secret' ,
'SOCKET_TIMEOUT' : 0.3 ,
'CONNECTION_KWARGS' : { # Eventual additional Redis connection arguments
'ssl' : True
},
'SENTINEL_KWARGS' : { # Eventual Sentinel connection arguments
# If Sentinel also has auth, username/password can be passed here
'username' : 'sentinel-user' ,
'password' : 'secret' ,
},
},
'high' : {
'URL' : os . getenv ( 'REDISTOGO_URL' , 'redis://localhost:6379/0' ), # If you're on Heroku
'DEFAULT_TIMEOUT' : 500 ,
},
'low' : {
'HOST' : 'localhost' ,
'PORT' : 6379 ,
'DB' : 0 ,
}
}
RQ_EXCEPTION_HANDLERS = [ 'path.to.my.handler' ] # If you need custom exception handlers
django_rq.urls
в свой urls.py
: urlpatterns += [
path ( 'django-rq/' , include ( 'django_rq.urls' ))
]
Django-RQ позволяет вам легко помещать задания в любую из очередей, определенных в settings.py
. Он поставляется с несколькими полезными функциями:
enqueue
— поместить задание в очередь default
: import django_rq
django_rq . enqueue ( func , foo , bar = baz )
get_queue
— возвращает экземпляр Queue
. import django_rq
queue = django_rq . get_queue ( 'high' )
queue . enqueue ( func , foo , bar = baz )
Помимо аргумента name
, get_queue
также принимает аргументы default_timeout
, is_async
, autocommit
, connection
и queue_class
. Например:
queue = django_rq . get_queue ( 'default' , autocommit = True , is_async = True , default_timeout = 360 )
queue . enqueue ( func , foo , bar = baz )
Вы можете предоставить этой функции свой собственный одноэлементный объект соединения Redis, чтобы она не создавала новый объект соединения для каждого определения очереди. Это поможет вам ограничить количество подключений к серверу Redis. Например:
import django_rq
import redis
redis_cursor = redis . StrictRedis ( host = '' , port = '' , db = '' , password = '' )
high_queue = django_rq . get_queue ( 'high' , connection = redis_cursor )
low_queue = django_rq . get_queue ( 'low' , connection = redis_cursor )
get_connection
— принимает один аргумент имени очереди (по умолчанию «по умолчанию») и возвращает соединение с сервером Redis очереди: import django_rq
redis_conn = django_rq . get_connection ( 'high' )
get_worker
— принимает необязательные имена очередей и возвращает новый экземпляр RQ Worker
для указанных очередей (или очереди default
): import django_rq
worker = django_rq . get_worker () # Returns a worker for "default" queue
worker . work ()
worker = django_rq . get_worker ( 'low' , 'high' ) # Returns a worker for "low" and "high"
Чтобы легко превратить вызываемый объект в задачу RQ, вы также можете использовать декоратор @job
, который поставляется с django_rq
:
from django_rq import job
@ job
def long_running_func ():
pass
long_running_func . delay () # Enqueue function in "default" queue
@ job ( 'high' )
def long_running_func ():
pass
long_running_func . delay () # Enqueue function in "high" queue
Вы можете передать любые аргументы, которые принимает декоратор задания RQ:
@ job ( 'default' , timeout = 3600 )
def long_running_func ():
pass
long_running_func . delay () # Enqueue function with a timeout of 3600 seconds.
Можно указать значение по умолчанию для аргумента ключевого слова декоратора result_ttl
с помощью параметра DEFAULT_RESULT_TTL
:
RQ = {
'DEFAULT_RESULT_TTL' : 5000 ,
}
При использовании этого параметра декоратор задания установит result_ttl
равным 5000, если это не указано явно.
django_rq предоставляет команду управления, которая запускает работника для каждой очереди, указанной в качестве аргументов:
python Manage.py rqworker высокий по умолчанию низкий
Если вы хотите запустить rqworker
в пакетном режиме, вы можете передать флаг --burst
:
python Manage.py rqworker высокий по умолчанию низкий --burst
Если вам нужно использовать пользовательские классы рабочих, заданий или очередей, лучше всего использовать глобальные настройки (см. Пользовательские классы очередей и Пользовательские классы заданий и рабочих). Однако такие настройки также можно переопределить с помощью параметров командной строки следующим образом.
Чтобы использовать собственный рабочий класс, вы можете передать флаг --worker-class
с путем к вашему рабочему классу:
python Manage.py rqworker высокий по умолчанию низкий --worker-class 'path.to.GeventWorker'
Чтобы использовать собственный класс очереди, вы можете передать флаг --queue-class
с путем к вашему классу очереди:
python Manage.py rqworker высокий по умолчанию низкий --queue-class 'path.to.CustomQueue'
Чтобы использовать собственный класс задания, укажите флаг --job-class
.
Начиная с версии 2.10, также поддерживается запуск рабочего пула RQ:
python Manage.py rqworker-pool по умолчанию низкий средний --num-workers 4
С РК 1.2.0. вы можете использовать встроенный планировщик для своих заданий. Например:
from django_rq . queues import get_queue
queue = get_queue ( 'default' )
job = queue . enqueue_at ( datetime ( 2020 , 10 , 10 ), func )
Если вы используете встроенный планировщик, вам необходимо запускать воркеры с поддержкой планировщика:
python Manage.py rqworker --with-scheduler
Альтернативно вы можете использовать RQ Scheduler. После установки вы также можете использовать функцию get_scheduler
для возврата экземпляра Scheduler
для очередей, определенных в RQ_QUEUES
файла settings.py. Например:
import django_rq
scheduler = django_rq . get_scheduler ( 'default' )
job = scheduler . enqueue_at ( datetime ( 2020 , 10 , 10 ), func )
Вы также можете использовать команду управления rqscheduler
для запуска планировщика:
Python Manage.py rqscheduler
Если у вас установлен django-redis или django-redis-cache, вы можете указать django_rq использовать ту же информацию о соединении из вашего кеша Redis. Это имеет два преимущества: это DRY и он использует любую оптимизацию, которая может происходить в настройке вашего кэша (например, использование пула соединений или Hiredis).
Чтобы использовать его настройку, используйте dict с ключом USE_REDIS_CACHE
указывающим на имя желаемого кеша в вашем словаре RQ_QUEUES
. Само собой разумеется, что выбранный кеш должен существовать и использовать бэкэнд Redis. Инструкции по настройке см. в документации соответствующего пакета кэша Redis. Также важно отметить, что, поскольку ShardedClient
django-redis-cache разделяет кеш на несколько соединений Redis, он не работает.
Вот пример фрагмента настроек для django-redis:
CACHES = {
'redis-cache' : {
'BACKEND' : 'redis_cache.cache.RedisCache' ,
'LOCATION' : 'localhost:6379:1' ,
'OPTIONS' : {
'CLIENT_CLASS' : 'django_redis.client.DefaultClient' ,
'MAX_ENTRIES' : 5000 ,
},
},
}
RQ_QUEUES = {
'high' : {
'USE_REDIS_CACHE' : 'redis-cache' ,
},
'low' : {
'USE_REDIS_CACHE' : 'redis-cache' ,
},
}
Иногда вам может потребоваться приостановить RQ, чтобы предотвратить обработку новых заданий. Классический пример — на начальном этапе сценария развертывания или перед переводом сайта в режим обслуживания. Это особенно полезно, если у вас есть задания, которые выполняются относительно долго и в противном случае могут быть принудительно прекращены во время развертывания.
Команда suspend не позволяет работникам во всех очередях (в одной базе данных Redis) получать новые задания. Однако текущие задания будут продолжаться до завершения.
# Suspend indefinitely
python manage.py rqsuspend
# Suspend for a specific duration (in seconds) then automatically
# resume work again.
python manage.py rqsuspend -d 600
# Resume work again.
python manage.py rqresume
django_rq
также предоставляет панель мониторинга для мониторинга состояния ваших очередей в /django-rq/
(или любой другой URL-адрес, который вы указали в своем urls.py
во время установки).
Вы также можете добавить ссылку на эту панель мониторинга в /admin
добавив RQ_SHOW_ADMIN_LINK = True
в settings.py
. Однако будьте осторожны: это переопределит шаблон администратора по умолчанию и может помешать работе других приложений, изменяющих шаблон администратора по умолчанию.
Эта статистика также доступна в формате JSON через /django-rq/stats.json
, который доступен сотрудникам. Если вам необходимо получить доступ к этому представлению через другие HTTP-клиенты (в целях мониторинга), вы можете определить RQ_API_TOKEN
и получить к нему доступ через /django-rq/stats.json/<API_TOKEN>
.
Примечание. В статистике запланированных заданий отображаются задания из встроенного планировщика RQ, а не из дополнительного планировщика RQ.
Кроме того, эта статистика также доступна из командной строки.
python manage.py rqstats
python manage.py rqstats --interval=1 # Refreshes every second
python manage.py rqstats --json # Output as JSON
python manage.py rqstats --yaml # Output as YAML
Sentry следует настроить в файле settings.py
Django, как описано в документации Sentry.
Вы можете переопределить конфигурацию Django Sentry по умолчанию при запуске команды rqworker
, передав опцию sentry-dsn
:
./manage.py rqworker --sentry-dsn=https://*****@sentry.io/222222
Это переопределит любую существующую конфигурацию Django и повторно инициализирует Sentry, установив следующие параметры Sentry:
{
'debug' : options . get ( 'sentry_debug' ),
'ca_certs' : options . get ( 'sentry_ca_certs' ),
'integrations' : [ RedisIntegration (), RqIntegration (), DjangoIntegration ()]
}
RQ использует logging
Python, это означает, что вы можете легко настроить механизм ведения журнала rqworker
в settings.py
django. Например:
LOGGING = {
"version" : 1 ,
"disable_existing_loggers" : False ,
"formatters" : {
"rq_console" : {
"format" : "%(asctime)s %(message)s" ,
"datefmt" : "%H:%M:%S" ,
},
},
"handlers" : {
"rq_console" : {
"level" : "DEBUG" ,
"class" : "rq.logutils.ColorizingStreamHandler" ,
"formatter" : "rq_console" ,
"exclude" : [ "%(asctime)s" ],
},
},
'loggers' : {
"rq.worker" : {
"handlers" : [ "rq_console" , "sentry" ],
"level" : "DEBUG"
},
}
}
По умолчанию каждая очередь будет использовать класс DjangoRQ
. Если вы хотите использовать собственный класс очереди, вы можете сделать это, добавив параметр QUEUE_CLASS
для каждой очереди в RQ_QUEUES
:
RQ_QUEUES = {
'default' : {
'HOST' : 'localhost' ,
'PORT' : 6379 ,
'DB' : 0 ,
'QUEUE_CLASS' : 'module.path.CustomClass' ,
}
}
или вы можете указать DjangoRQ
использовать собственный класс для всех ваших очередей в настройках RQ
:
RQ = {
'QUEUE_CLASS' : 'module.path.CustomClass' ,
}
Пользовательские классы очередей должны наследовать от django_rq.queues.DjangoRQ
.
Если вы используете более одного класса очереди (не рекомендуется), обязательно запускайте рабочие процессы только в очередях с одним и тем же классом очереди. Например, если у вас есть две очереди, определенные в RQ_QUEUES
, и в одной указан пользовательский класс, вам придется запустить как минимум два отдельных исполнителя для каждой очереди.
Подобно пользовательским классам очередей, глобальные пользовательские классы заданий и рабочих классов можно настроить с помощью параметров JOB_CLASS
и WORKER_CLASS
:
RQ = {
'JOB_CLASS' : 'module.path.CustomJobClass' ,
'WORKER_CLASS' : 'module.path.CustomWorkerClass' ,
}
Пользовательский класс задания должен наследовать от rq.job.Job
. Если он настроен, он будет использоваться для всех заданий.
Пользовательский рабочий класс должен наследовать от rq.worker.Worker
. Он будет использоваться для запуска всех рабочих процессов, если он не переопределен опцией worker-class
команды управления rqworker
.
Для упрощения процесса тестирования вы можете запустить рабочий процесс синхронно следующим образом:
from django . test import TestCase
from django_rq import get_worker
class MyTest ( TestCase ):
def test_something_that_creates_jobs ( self ):
... # Stuff that init jobs.
get_worker (). work ( burst = True ) # Processes all jobs then stop.
... # Asserts that the job stuff is done.
Вы можете установить для параметра ASYNC
значение False
чтобы сделать синхронную операцию по умолчанию для данной очереди. Это приведет к немедленному выполнению заданий в том же потоке, в котором они были отправлены, что полезно для тестирования и отладки. Например, вы можете добавить следующее после постановки конфигурации в очередь в файле настроек:
# ... Logic to set DEBUG and TESTING settings to True or False ...
# ... Regular RQ_QUEUES setup code ...
if DEBUG or TESTING :
for queueConfig in RQ_QUEUES . values ():
queueConfig [ 'ASYNC' ] = False
Обратите внимание, что явная установка параметра is_async
при вызове get_queue
переопределит этот параметр.
Чтобы запустить набор тестов django_rq
:
`который django-admin` проверяет django_rq --settings=django_rq.tests.settings --pythonpath=.
Создайте службу rqworker, которая запускает очереди высокого, стандартного и низкого уровня.
sudo vi /etc/systemd/system/rqworker.service
[Unit]
Description=Django-RQ Worker
After=network.target
[Service]
WorkingDirectory= << path_to_your_project_folder>>
ExecStart=/home/ubuntu/.virtualenv/<<your_virtualenv>>/bin/python
<<path_to_your_project_folder>>/manage.py
rqworker high default low
[Install]
WantedBy=multi-user.target
Включите и запустите службу
sudo systemctl enable rqworker
sudo systemctl start rqworker
Добавьте django-rq в файл require.txt с помощью:
pip freeze > requirements.txt
Обновите свой профиль, чтобы:
web: gunicorn --pythonpath= " $PWD /your_app_name " config.wsgi:application
worker: python your_app_name/manage.py rqworker high default low
Зафиксируйте и повторно разверните. Затем добавьте нового работника с помощью:
heroku scale worker=1
См. CHANGELOG.md.