Django と Redis ベースの Python キューイング ライブラリである RQ の統合。 Django-RQ は、Django のsettings.py
でキューを構成し、プロジェクトで簡単にキューを使用できるようにするシンプルなアプリです。
django-rq
が役に立つと思われる場合は、Tidelift を通じてその開発をサポートすることを検討してください。
django-rq
をインストールします (または PyPI からダウンロードします)。 pip install django - rq
django_rq
settings.py
のINSTALLED_APPS
に追加します。 INSTALLED_APPS = (
# other apps
"django_rq" ,
)
settings.py
でキューを設定します。 RQ_QUEUES = {
'default' : {
'HOST' : 'localhost' ,
'PORT' : 6379 ,
'DB' : 0 ,
'USERNAME' : 'some-user' ,
'PASSWORD' : 'some-password' ,
'DEFAULT_TIMEOUT' : 360 ,
'REDIS_CLIENT_KWARGS' : { # Eventual additional Redis connection arguments
'ssl_cert_reqs' : None ,
},
},
'with-sentinel' : {
'SENTINELS' : [( 'localhost' , 26736 ), ( 'localhost' , 26737 )],
'MASTER_NAME' : 'redismaster' ,
'DB' : 0 ,
# Redis username/password
'USERNAME' : 'redis-user' ,
'PASSWORD' : 'secret' ,
'SOCKET_TIMEOUT' : 0.3 ,
'CONNECTION_KWARGS' : { # Eventual additional Redis connection arguments
'ssl' : True
},
'SENTINEL_KWARGS' : { # Eventual Sentinel connection arguments
# If Sentinel also has auth, username/password can be passed here
'username' : 'sentinel-user' ,
'password' : 'secret' ,
},
},
'high' : {
'URL' : os . getenv ( 'REDISTOGO_URL' , 'redis://localhost:6379/0' ), # If you're on Heroku
'DEFAULT_TIMEOUT' : 500 ,
},
'low' : {
'HOST' : 'localhost' ,
'PORT' : 6379 ,
'DB' : 0 ,
}
}
RQ_EXCEPTION_HANDLERS = [ 'path.to.my.handler' ] # If you need custom exception handlers
urls.py
にdjango_rq.urls
を含めます。 urlpatterns += [
path ( 'django-rq/' , include ( 'django_rq.urls' ))
]
Django-RQ を使用すると、 settings.py
で定義されたキューにジョブを簡単に入れることができます。いくつかのユーティリティ関数が付属しています。
enqueue
- ジョブをdefault
キューにプッシュします。 import django_rq
django_rq . enqueue ( func , foo , bar = baz )
get_queue
- Queue
インスタンスを返します。 import django_rq
queue = django_rq . get_queue ( 'high' )
queue . enqueue ( func , foo , bar = baz )
name
引数に加えて、 get_queue
default_timeout
、 is_async
、 autocommit
、 connection
、およびqueue_class
引数も受け入れます。例えば:
queue = django_rq . get_queue ( 'default' , autocommit = True , is_async = True , default_timeout = 360 )
queue . enqueue ( func , foo , bar = baz )
キュー定義ごとに新しい接続オブジェクトが作成されないように、独自のシングルトン Redis 接続オブジェクトをこの関数に提供できます。これは、Redis サーバーへの接続数を制限するのに役立ちます。例えば:
import django_rq
import redis
redis_cursor = redis . StrictRedis ( host = '' , port = '' , db = '' , password = '' )
high_queue = django_rq . get_queue ( 'high' , connection = redis_cursor )
low_queue = django_rq . get_queue ( 'low' , connection = redis_cursor )
get_connection
- 単一のキュー名引数 (デフォルトは「default」) を受け入れ、キューの Redis サーバーへの接続を返します。 import django_rq
redis_conn = django_rq . get_connection ( 'high' )
get_worker
- オプションのキュー名を受け入れ、指定されたキュー (またはdefault
キュー) の新しい RQ Worker
インスタンスを返します。 import django_rq
worker = django_rq . get_worker () # Returns a worker for "default" queue
worker . work ()
worker = django_rq . get_worker ( 'low' , 'high' ) # Returns a worker for "low" and "high"
呼び出し可能オブジェクトを RQ タスクに簡単に変換するには、 django_rq
に付属する@job
デコレータを使用することもできます。
from django_rq import job
@ job
def long_running_func ():
pass
long_running_func . delay () # Enqueue function in "default" queue
@ job ( 'high' )
def long_running_func ():
pass
long_running_func . delay () # Enqueue function in "high" queue
RQ のジョブ デコレータが受け入れる任意の引数を渡すことができます。
@ job ( 'default' , timeout = 3600 )
def long_running_func ():
pass
long_running_func . delay () # Enqueue function with a timeout of 3600 seconds.
DEFAULT_RESULT_TTL
設定を使用して、 result_ttl
デコレータ キーワード引数のデフォルトを指定できます。
RQ = {
'DEFAULT_RESULT_TTL' : 5000 ,
}
この設定では、明示的に指定されていない限り、ジョブ デコレーターはresult_ttl
5000 に設定します。
django_rq は、引数として指定されたキューごとにワーカーを開始する管理コマンドを提供します。
python manage.py rqworker 高デフォルト低
rqworker
バースト モードで実行する場合は、 --burst
フラグを渡すことができます。
python manage.py rqworker 高デフォルト低 --burst
カスタム ワーカー、ジョブ、またはキュー クラスを使用する必要がある場合は、グローバル設定を使用するのが最善です (カスタム キュー クラスおよびカスタム ジョブおよびワーカー クラスを参照)。ただし、次のようにコマンド ライン オプションを使用してそのような設定をオーバーライドすることもできます。
カスタム ワーカー クラスを使用するには、ワーカーへのパスを指定して--worker-class
フラグを渡すことができます。
python manage.py rqworker high デフォルト low --worker-class 'path.to.GeventWorker'
カスタム キュー クラスを使用するには、キュー クラスへのパスを指定した--queue-class
フラグを渡します。
python manage.py rqworker high デフォルト low --queue-class 'path.to.CustomQueue'
カスタム ジョブ クラスを使用するには、 --job-class
フラグを指定します。
バージョン 2.10 以降、RQ のワーカー プールの実行もサポートされています。
python manage.py rqworker-pool デフォルト低中 --num-workers 4
RQ 1.2.0 を使用。ジョブに組み込みのスケジューラを使用できます。例えば:
from django_rq . queues import get_queue
queue = get_queue ( 'default' )
job = queue . enqueue_at ( datetime ( 2020 , 10 , 10 ), func )
組み込みスケジューラを使用している場合は、スケジューラ サポートを使用してワーカーを開始する必要があります。
python manage.py rqworker --with-scheduler
あるいは、RQ スケジューラを使用することもできます。インストール後、 get_scheduler
関数を使用して、 settings.py のRQ_QUEUES
で定義されたキューのScheduler
インスタンスを返すこともできます。例えば:
import django_rq
scheduler = django_rq . get_scheduler ( 'default' )
job = scheduler . enqueue_at ( datetime ( 2020 , 10 , 10 ), func )
管理コマンドrqscheduler
使用してスケジューラーを開始することもできます。
Python manage.py rqscheduler
django-redis または django-redis-cache がインストールされている場合は、Redis キャッシュから同じ接続情報を使用するように django_rq に指示できます。これには 2 つの利点があります。DRY であることと、キャッシュ設定で行われる最適化 (接続プーリングや Hiredis の使用など) を利用できることです。
これを構成するには、 RQ_QUEUES
辞書内の目的のキャッシュの名前を指すキーUSE_REDIS_CACHE
を持つ辞書を使用します。言うまでもなく、選択したキャッシュが存在し、Redis バックエンドを使用する必要があります。構成手順については、各 Redis キャッシュ パッケージのドキュメントを参照してください。 django-redis-cache ShardedClient
キャッシュを複数の Redis 接続に分割するため、機能しないことを指摘することも重要です。
django-redis の設定フラグメントの例を次に示します。
CACHES = {
'redis-cache' : {
'BACKEND' : 'redis_cache.cache.RedisCache' ,
'LOCATION' : 'localhost:6379:1' ,
'OPTIONS' : {
'CLIENT_CLASS' : 'django_redis.client.DefaultClient' ,
'MAX_ENTRIES' : 5000 ,
},
},
}
RQ_QUEUES = {
'high' : {
'USE_REDIS_CACHE' : 'redis-cache' ,
},
'low' : {
'USE_REDIS_CACHE' : 'redis-cache' ,
},
}
場合によっては、RQ が新しいジョブを処理できないようにするために、RQ を一時停止したい場合があります。典型的な例は、展開スクリプトの初期段階中、またはサイトをメンテナンス モードに移行する前です。これは、比較的長時間実行され、展開中に強制終了される可能性があるジョブがある場合に特に役立ちます。
stop コマンドは、(単一 Redis データベース内の) _all_ キュー上のワーカーが新しいジョブを取得するのを停止します。ただし、現在実行中のジョブは完了するまで続行されます。
# Suspend indefinitely
python manage.py rqsuspend
# Suspend for a specific duration (in seconds) then automatically
# resume work again.
python manage.py rqsuspend -d 600
# Resume work again.
python manage.py rqresume
django_rq
/django-rq/
(またはインストール中にurls.py
に設定した任意の URL) でキューのステータスを監視するためのダッシュボードも提供します。
settings.py
にRQ_SHOW_ADMIN_LINK = True
を追加することで、 /admin
にこのダッシュボード リンクへのリンクを追加することもできます。ただし、これによりデフォルトの管理テンプレートがオーバーライドされるため、デフォルトの管理テンプレートを変更する他のアプリと干渉する可能性があることに注意してください。
これらの統計は、スタッフ メンバーがアクセスできる/django-rq/stats.json
経由で JSON 形式でも入手できます。 (監視目的で) 他の HTTP クライアント経由でこのビューにアクセスする必要がある場合は、 RQ_API_TOKEN
定義し、 /django-rq/stats.json/<API_TOKEN>
経由でアクセスできます。
注: スケジュールされたジョブの統計には、オプションの RQ スケジューラではなく、RQ 組み込みスケジューラからのジョブが表示されます。
さらに、これらの統計にはコマンド ラインからもアクセスできます。
python manage.py rqstats
python manage.py rqstats --interval=1 # Refreshes every second
python manage.py rqstats --json # Output as JSON
python manage.py rqstats --yaml # Output as YAML
Sentry のドキュメントの説明に従って、Sentry は Django settings.py
内で構成する必要があります。
rqworker
コマンドの実行時に、 sentry-dsn
オプションを渡すことで、デフォルトの Django Sentry 構成をオーバーライドできます。
./manage.py rqworker --sentry-dsn=https://*****@sentry.io/222222
これにより、既存の Django 構成がオーバーライドされ、Sentry が再初期化され、次の Sentry オプションが設定されます。
{
'debug' : options . get ( 'sentry_debug' ),
'ca_certs' : options . get ( 'sentry_ca_certs' ),
'integrations' : [ RedisIntegration (), RqIntegration (), DjangoIntegration ()]
}
RQ は Python のlogging
を使用します。これは、django のsettings.py
でrqworker
のロギング メカニズムを簡単に設定できることを意味します。例えば:
LOGGING = {
"version" : 1 ,
"disable_existing_loggers" : False ,
"formatters" : {
"rq_console" : {
"format" : "%(asctime)s %(message)s" ,
"datefmt" : "%H:%M:%S" ,
},
},
"handlers" : {
"rq_console" : {
"level" : "DEBUG" ,
"class" : "rq.logutils.ColorizingStreamHandler" ,
"formatter" : "rq_console" ,
"exclude" : [ "%(asctime)s" ],
},
},
'loggers' : {
"rq.worker" : {
"handlers" : [ "rq_console" , "sentry" ],
"level" : "DEBUG"
},
}
}
デフォルトでは、すべてのキューでDjangoRQ
クラスが使用されます。カスタム キュー クラスを使用したい場合は、 RQ_QUEUES
でキューごとにQUEUE_CLASS
オプションを追加します。
RQ_QUEUES = {
'default' : {
'HOST' : 'localhost' ,
'PORT' : 6379 ,
'DB' : 0 ,
'QUEUE_CLASS' : 'module.path.CustomClass' ,
}
}
または、 RQ
設定ですべてのキューにカスタム クラスを使用するようにDjangoRQ
指定することもできます。
RQ = {
'QUEUE_CLASS' : 'module.path.CustomClass' ,
}
カスタム キュー クラスはdjango_rq.queues.DjangoRQ
から継承する必要があります。
複数のキュー クラスを使用している場合 (推奨されません)、同じキュー クラスのキューでのみワーカーを実行するようにしてください。たとえば、 RQ_QUEUES
で定義されたキューが 2 つあり、そのうちの 1 つにカスタム クラスが指定されている場合、キューごとに少なくとも 2 つの別々のワーカーを実行する必要があります。
カスタム キュー クラスと同様に、グローバル カスタム ジョブおよびワーカー クラスは、 JOB_CLASS
およびWORKER_CLASS
設定を使用して構成できます。
RQ = {
'JOB_CLASS' : 'module.path.CustomJobClass' ,
'WORKER_CLASS' : 'module.path.CustomWorkerClass' ,
}
カスタム ジョブ クラスはrq.job.Job
から継承する必要があります。構成されている場合は、すべてのジョブに使用されます。
カスタム ワーカー クラスはrq.worker.Worker
から継承する必要があります。これは、 rqworker
管理コマンドのworker-class
オプションによってオーバーライドされない限り、すべてのワーカーの実行に使用されます。
テストプロセスを簡単にするために、次の方法でワーカーを同期的に実行できます。
from django . test import TestCase
from django_rq import get_worker
class MyTest ( TestCase ):
def test_something_that_creates_jobs ( self ):
... # Stuff that init jobs.
get_worker (). work ( burst = True ) # Processes all jobs then stop.
... # Asserts that the job stuff is done.
オプションASYNC
False
に設定すると、特定のキューの同期操作をデフォルトにすることができます。これにより、ジョブがディスパッチされたときと同じスレッド上で即時に実行されるため、テストやデバッグに役立ちます。たとえば、設定ファイルで構成をキューに追加した後に、次の内容を追加できます。
# ... Logic to set DEBUG and TESTING settings to True or False ...
# ... Regular RQ_QUEUES setup code ...
if DEBUG or TESTING :
for queueConfig in RQ_QUEUES . values ():
queueConfig [ 'ASYNC' ] = False
get_queue
呼び出すときにis_async
パラメーターを明示的に設定すると、この設定がオーバーライドされることに注意してください。
django_rq
のテスト スイートを実行するには:
`どの django-admin` が django_rq --settings=django_rq.tests.settings --pythonpath= をテストします。
高キュー、デフォルトキュー、低キューを実行する rqworker サービスを作成します。
sudo vi /etc/systemd/system/rqworker.service
[Unit]
Description=Django-RQ Worker
After=network.target
[Service]
WorkingDirectory= << path_to_your_project_folder>>
ExecStart=/home/ubuntu/.virtualenv/<<your_virtualenv>>/bin/python
<<path_to_your_project_folder>>/manage.py
rqworker high default low
[Install]
WantedBy=multi-user.target
サービスを有効にして開始する
sudo systemctl enable rqworker
sudo systemctl start rqworker
次のように django-rq をrequirements.txt ファイルに追加します。
pip freeze > requirements.txt
Procfile を次のように更新します。
web: gunicorn --pythonpath= " $PWD /your_app_name " config.wsgi:application
worker: python your_app_name/manage.py rqworker high default low
コミットして再デプロイします。次に、次のようにして新しいワーカーを追加します。
heroku scale worker=1
CHANGELOG.mdを参照してください。