RabbitMqBundle
、php-amqplib ライブラリを使用して、RabbitMQ 経由でアプリケーションにメッセージングを組み込みます。
このバンドルは、Thumper ライブラリで見られるようないくつかのメッセージング パターンを実装しています。したがって、Symfony コントローラーから RabbitMQ にメッセージをパブリッシュするのは次のように簡単です。
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ));
後で、 upload_pictures
キューから 50 個のメッセージを消費したい場合は、CLI で次のコマンドを実行するだけです。
$ ./app/console rabbitmq:consumer -m 50 upload_picture
すべての例では、RabbitMQ サーバーが実行されていることを想定しています。
このバンドルは、Symfony Live Paris 2011 カンファレンスで発表されました。こちらのスライドをご覧ください。
Symfony >=4.4 によって発生した重大な変更のため、新しいタグがリリースされ、バンドルは Symfony >=4.4 と互換性がありました。
バンドルとその依存関係を Composer に要求します。
$ composer require php-amqplib/rabbitmq-bundle
バンドルを登録します。
// app/AppKernel.php
public function registerBundles ()
{
$ bundles = array (
new OldSound RabbitMqBundle OldSoundRabbitMqBundle (),
);
}
楽しむ !
RabbitMQ コンシューマーの実行に使用されるコンソール アプリケーションがある場合、Symfony HttpKernel と FrameworkBundle は必要ありません。バージョン 1.6 以降では、Dependency Injection コンポーネントを使用してこのバンドル構成とサービスをロードし、consumer コマンドを使用できます。
Composer.json ファイルにバンドルが必要です。
{
"require": {
"php-amqplib/rabbitmq-bundle": "^2.0",
}
}
拡張機能とコンパイラ パスを登録します。
use OldSound RabbitMqBundle DependencyInjection OldSoundRabbitMqExtension ;
use OldSound RabbitMqBundle DependencyInjection Compiler RegisterPartsPass ;
// ...
$ containerBuilder -> registerExtension ( new OldSoundRabbitMqExtension ());
$ containerBuilder -> addCompilerPass ( new RegisterPartsPass ());
2012 年 6 月 4 日以降、「プロデューサー」構成セクションで宣言された取引所の一部のデフォルト オプションが、「コンシューマー」セクションで宣言された取引所のデフォルトと一致するように変更されました。影響を受ける設定は次のとおりです。
durable
false
からtrue
に変更されました。auto_delete
true
からfalse
に変更されました。以前のデフォルト値に依存していた場合は、構成を更新する必要があります。
2012 年 4 月 24 日以降、ConsumerInterface::execute メソッドのシグネチャが変更されました
2012 年 1 月 3 日以降、コンシューマーの実行メソッドは本文だけでなく AMQP メッセージ オブジェクト全体を取得するようになりました。詳細については、CHANGELOG ファイルを参照してください。
構成ファイルにold_sound_rabbit_mq
セクションを追加します。
old_sound_rabbit_mq :
connections :
default :
host : ' localhost '
port : 5672
user : ' guest '
password : ' guest '
vhost : ' / '
lazy : false
connection_timeout : 3
read_write_timeout : 3
# the timeout when waiting for a response from rabbitMQ (0.0 means waits forever)
channel_rpc_timeout : 0.0
# requires php-amqplib v2.4.1+ and PHP5.4+
keepalive : false
# requires php-amqplib v2.4.1+
heartbeat : 0
# requires php_sockets.dll
use_socket : true # default false
login_method : ' AMQPLAIN ' # default 'AMQPLAIN', can be 'EXTERNAL' or 'PLAIN', see https://www.rabbitmq.com/docs/access-control#mechanisms
another :
# A different (unused) connection defined by an URL. One can omit all parts,
# except the scheme (amqp:). If both segment in the URL and a key value (see above)
# are given the value from the URL takes precedence.
# See https://www.rabbitmq.com/uri-spec.html on how to encode values.
url : ' amqp://guest:password@localhost:5672/vhost?lazy=1&connection_timeout=6 '
producers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
service_alias : my_app_service # no alias by default
default_routing_key : ' optional.routing.key ' # defaults to '' if not set
default_content_type : ' content/type ' # defaults to 'text/plain'
default_delivery_mode : 2 # optional. 1 means non-persistent, 2 means persistent. Defaults to "2".
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
options :
no_ack : false # optional. If set to "true", automatic acknowledgement mode will be used by this consumer. Default "false". See https://www.rabbitmq.com/confirms.html for details.
ここでは、アプリケーションが持つ接続サービスとメッセージ エンドポイントを構成します。この例では、サービス コンテナにはサービスold_sound_rabbit_mq.upload_picture_producer
およびold_sound_rabbit_mq.upload_picture_consumer
が含まれます。後者は、 upload_picture_service
というサービスがあることを想定しています。
クライアントの接続を指定しない場合、クライアントは同じエイリアスを持つ接続を検索します。そのため、 upload_picture
の場合、サービス コンテナは、 upload_picture
接続を探します。
オプションのキュー引数を追加する必要がある場合、キューのオプションは次のようになります。
queue_options : {name: 'upload-picture', arguments: {'x-ha-policy': ['S', 'all']}}
メッセージ TTL が 20 秒の別の例:
queue_options : {name: 'upload-picture', arguments: {'x-message-ttl': ['I', 20000]}}
引数値はデータ型と値のリストである必要があります。有効なデータ型は次のとおりです。
S
- 文字列I
- 整数D
- 10進数T
- タイムスタンプF
- テーブルA
- 配列t
- ブール値必要に応じてarguments
を調整します。
キューを特定のルーティング キーでバインドしたい場合は、プロデューサーまたはコンシューマー構成でそれを宣言できます。
queue_options :
name : " upload-picture "
routing_keys :
- ' android.#.upload '
- ' iphone.upload '
Symfony 環境では、すべてのサービスはリクエストごとに完全にブートストラップされます。バージョン 4.3 以上では、サービスを遅延 (Lazy Services) として宣言できます。このバンドルはまだ新しい Lazy Services 機能をサポートしていませんが、接続構成でlazy: true
を設定すると、リクエストごとにメッセージ ブローカーへの不必要な接続を回避できます。パフォーマンス上の理由から、遅延接続を使用することを強くお勧めします。ただし、すでにこのバンドルを使用しているアプリケーションでの中断の可能性を避けるために、遅延オプションはデフォルトで無効になっています。
ソケットがオープンになるように、 read_write_timeout
ハートビートの 2 倍に設定することをお勧めします。これを行わない場合、または別の乗算器を使用すると、コンシューマソケットがタイムアウトになる危険性があります。
一般的にタスクがハートビート期間よりも長く実行されている場合、問題が発生する可能性があり、適切な解決策が存在しないことに留意してください (リンク)。ハートビートに大きな値を使用するか、TCP のkeepalive
(クライアント側とサーバー側の両方) とgraceful_max_execution_timeout
機能を優先してハートビートを無効のままにすることを検討してください。
1 つの接続に複数のホストを指定できます。これにより、複数のノードで RabbitMQ クラスターを使用できるようになります。
old_sound_rabbit_mq :
connections :
default :
hosts :
- host : host1
port : 3672
user : user1
password : password1
vhost : vhost1
- url : ' amqp://guest:password@localhost:5672/vhost '
connection_timeout : 3
read_write_timeout : 3
指定はできませんのでご注意ください
connection_timeout
read_write_timeout
use_socket
ssl_context
keepalive
heartbeat
connection_parameters_provider
パラメータを各ホストに個別に送信します。
場合によっては、接続情報を動的にする必要があるかもしれません。動的接続パラメータを使用すると、サービスを通じてプログラムでパラメータを指定またはオーバーライドできます。
たとえば、接続のvhost
パラメータがホワイトラベル アプリケーションの現在のテナントに依存しており、その構成を毎回変更したくない (または変更できない) シナリオの場合です。
ConnectionParametersProviderInterface
実装するサービスをconnection_parameters_provider
の下に定義し、それを適切なconnections
構成に追加します。
connections :
default :
host : ' localhost '
port : 5672
user : ' guest '
password : ' guest '
vhost : ' foo ' # to be dynamically overridden by `connection_parameters_provider`
connection_parameters_provider : connection_parameters_provider_service
実装例:
class ConnectionParametersProviderService implements ConnectionParametersProvider {
...
public function getConnectionParameters () {
return array ( ' vhost ' => $ this -> getVhost ());
}
. . .
}
この場合、 vhost
パラメータはgetVhost()
の出力によってオーバーライドされます。
メッセージング アプリケーションでは、ブローカーにメッセージを送信するプロセスはプロデューサーと呼ばれ、それらのメッセージを受信するプロセスはコンシューマーと呼ばれます。アプリケーションには、構成内のそれぞれのエントリの下にリストできるものがいくつかあります。
プロデューサは、サーバーにメッセージを送信するために使用されます。 AMQP モデルでは、メッセージはExchangeに送信されます。これは、プロデューサーの構成で、Exchange オプションとともに接続オプションを指定する必要があることを意味します。これは通常、Exchange の名前とそのタイプになります。
ここで、写真のアップロードをバックグラウンドで処理したいとします。画像を最終的な場所に移動した後、次の情報を含むメッセージをサーバーに公開します。
public function indexAction ( $ name )
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ));
}
ご覧のとおり、構成にUpload_pictureというプロデューサーがある場合、サービス コンテナーにはold_sound_rabbit_mq.upload_picture_Producerというサービスが存在します。
メッセージ自体に加えて、 OldSoundRabbitMqBundleRabbitMqProducer#publish()
メソッドは、オプションのルーティング キー パラメーターと追加プロパティのオプションの配列も受け入れます。追加のプロパティの配列を使用すると、デフォルトでPhpAmqpLibMessageAMQPMessage
オブジェクトが構築されるプロパティを変更できます。このようにして、たとえばアプリケーション ヘッダーを変更できます。
setContentType メソッドとsetDeliverModeメソッドを使用して、メッセージ コンテンツ タイプとメッセージ配信モードをそれぞれ設定し、「プロデューサー」構成セクションで設定されたデフォルトをオーバーライドできます。 (以下の例のように) 「プロデューサー」構成またはこれらのメソッドの明示的な呼び出しによってオーバーライドされない場合、デフォルト値は、コンテンツ タイプの場合はtext/plain 、配信モードの場合は2です。
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> setContentType ( ' application/json ' );
プロデューサーのカスタム クラス ( OldSoundRabbitMqBundleRabbitMqProducer
から継承する必要がある) を使用する必要がある場合は、 class
オプションを使用できます。
...
producers :
upload_picture :
class : MyCustomProducer
connection : default
exchange_options : {name: 'upload-picture', type: direct}
...
パズルの次のピースは、メッセージをキューから取り出し、それに応じて処理するコンシューマーを用意することです。
現在、プロデューサーによって発行されたイベントが 2 つあります。
このイベントは、メッセージを公開する直前に発生します。これは、実際にメッセージを送信する前に最終的なログ記録や検証などを行うための優れたフックです。リスナーの実装例:
namespace App EventListener ;
use OldSound RabbitMqBundle Event BeforeProducerPublishMessageEvent ;
use Symfony Component EventDispatcher Attribute AsEventListener ;
#[AsEventListener(event: BeforeProducerPublishMessageEvent:: NAME )]
final class AMQPBeforePublishEventListener
{
public function __invoke ( BeforeProducerPublishMessageEvent $ event ): void
{
// Your code goes here
}
}
このイベントは、メッセージの公開直後に発生します。これは、実際にメッセージを送信した後に確認のログ記録やコミットなどを行うための優れたフックです。リスナーの実装例:
namespace App EventListener ;
use OldSound RabbitMqBundle Event AfterProducerPublishMessageEvent ;
use Symfony Component EventDispatcher Attribute AsEventListener ;
#[AsEventListener(event: AfterProducerPublishMessageEvent:: NAME )]
final class AMQPBeforePublishEventListener
{
public function __invoke ( AfterProducerPublishMessageEvent $ event ): void
{
// Your code goes here
}
}
コンシューマはサーバーに接続し、受信メッセージの処理を待つループを開始します。このようなコンシューマの動作は、指定されたコールバックに応じて異なります。上記のコンシューマー構成を確認してみましょう。
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
ここでわかるように、コールバックオプションには、 upload_picture_serviceへの参照があります。コンシューマーがサーバーからメッセージを受け取ると、そのようなコールバックが実行されます。テストまたはデバッグの目的で別のコールバックを指定する必要がある場合は、そこで変更できます。
コールバックとは別に、プロデューサーの場合と同じ方法で、使用する接続も指定します。残りのオプションは、 exchange_optionsとqueue_optionsです。 exchange_options は、プロデューサーに使用されるものと同じである必要があります。 queue_optionsでキュー名を指定します。なぜ?
前述したように、AMQP のメッセージはExchangeにパブリッシュされます。これは、メッセージがキューに到達したことを意味するものではありません。これを実現するには、まずそのようなキューを作成し、それをExchangeにバインドする必要があります。これの優れた点は、複数のキューを1 つのExchangeにバインドできることです。これにより、1 つのメッセージが複数の宛先に到着できるようになります。このアプローチの利点は、生産者と消費者が切り離されることです。プロデューサーは、何人のコンシューマーが自分のメッセージを処理するかについては気にしません。必要なのは、彼のメッセージがサーバーに到着することだけです。このようにして、コントローラーのコードを変更することなく、写真がアップロードされるたびに実行するアクションを拡張できます。
さて、コンシューマをどのように実行すればよいでしょうか?次のように実行できるコマンドがあります。
$ ./app/console rabbitmq:consumer -m 50 upload_picture
これはどういう意味ですか?ここでは、 upload_pictureコンシューマを実行して、メッセージを 50 個だけ消費するように指示しています。コンシューマーはサーバーからメッセージを受信するたびに、AMQP メッセージをPhpAmqpLibMessageAMQPMessage
クラスのインスタンスとして渡す、構成されたコールバックを実行します。メッセージ本文は、 $msg->body
呼び出すことで取得できます。デフォルトでは、コンシューマーは、いくつかの定義の無限ループでメッセージを処理します。
コンシューマが Unix シグナルで即座に実行を終了することを確認したい場合は、 -w
フラグを付けてコマンドを実行できます。
$ ./app/console rabbitmq:consumer -w upload_picture
その後、コンシューマーは即座に実行を終了します。
このフラグを指定してコマンドを使用するには、PCNTL 拡張子を付けて PHP をインストールする必要があります。
コンシューマのメモリ制限を設定したい場合は、フラグ-l
使用して設定できます。次の例では、このフラグにより 256 MB のメモリ制限が追加されます。 PHP の許容メモリ サイズ エラーを回避するために、コンシューマは 256MB に達する 5 MB 手前で停止されます。
$ ./app/console rabbitmq:consumer -l 256
キュー内で待機しているすべてのメッセージを削除する場合は、次のコマンドを実行してこのキューをパージできます。
$ ./app/console rabbitmq:purge --no-confirmation upload_picture
コンシューマのキューを削除するには、次のコマンドを使用します。
$ ./app/console rabbitmq:delete --no-confirmation upload_picture
これは多くのシナリオで役立ちます。 AMQPEvent は 3 つあります。
class OnConsumeEvent extends AMQPEvent
{
const NAME = AMQPEvent:: ON_CONSUME ;
/**
* OnConsumeEvent constructor.
*
* @param Consumer $consumer
*/
public function __construct ( Consumer $ consumer )
{
$ this -> setConsumer ( $ consumer );
}
}
s say you need to sleep / stop consumer/s on a new application deploy. You can listen for
OldSoundRabbitMqBundleEventOnConsumeEvent` s say you need to sleep / stop consumer/s on a new application deploy. You can listen for
、新しいアプリケーションのデプロイを確認できます。
class BeforeProcessingMessageEvent extends AMQPEvent
{
const NAME = AMQPEvent:: BEFORE_PROCESSING_MESSAGE ;
/**
* BeforeProcessingMessageEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer , AMQPMessage $ AMQPMessage )
{
$ this -> setConsumer ( $ consumer );
$ this -> setAMQPMessage ( $ AMQPMessage );
}
}
AMQPMessage
処理する前に発生したイベント。
class AfterProcessingMessageEvent extends AMQPEvent
{
const NAME = AMQPEvent:: AFTER_PROCESSING_MESSAGE ;
/**
* AfterProcessingMessageEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer , AMQPMessage $ AMQPMessage )
{
$ this -> setConsumer ( $ consumer );
$ this -> setAMQPMessage ( $ AMQPMessage );
}
}
AMQPMessage
の処理後に発生したイベント。プロセス メッセージが例外をスローする場合、イベントは発生しません。
<?php
class OnIdleEvent extends AMQPEvent
{
const NAME = AMQPEvent:: ON_IDLE ;
/**
* OnIdleEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer )
{
$ this -> setConsumer ( $ consumer );
$ this -> forceStop = true ;
}
}
wait
メソッドがメッセージを受信せずにタイムアウトによって終了すると、イベントが発生します。このイベントを利用するには、コンシューマのidle_timeout
設定する必要があります。デフォルトではアイドルタイムアウトでプロセスが終了しますが、リスナーで$event->setForceStop(false)
設定することでこれを防ぐことができます。
一定期間キューからメッセージがないときにタイムアウトを設定する必要がある場合は、 idle_timeout
秒単位で設定できます。 idle_timeout_exit_code
は、アイドル タイムアウトが発生したときにコンシューマーによって返される終了コードを指定します。これを指定しない場合、コンシューマはPhpAmqpLibExceptionAMQPTimeoutException
例外をスローします。
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
idle_timeout : 60
idle_timeout_exit_code : 0
timeout_wait
秒単位で設定します。 timeout_wait
現在の接続がまだ有効であることを確認するまでに、コンシューマーが新しいメッセージを受信せずに待機する時間を指定します。
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
idle_timeout : 60
idle_timeout_exit_code : 0
timeout_wait : 10
コンシューマーを特定の時間まで実行してから正常に終了したい場合は、 graceful_max_execution.timeout
秒単位で設定します。 「正常に終了」とは、コンシューマーが現在実行中のタスクの終了後、または新しいタスクを待機しているときに直ちに終了することを意味します。 graceful_max_execution.exit_code
は、グレースフル最大実行タイムアウトが発生したときにコンシューマーによって返される終了コードを指定します。これを指定しないと、コンシューマはステータス0
で終了します。
この機能は、supervisord と組み合わせると優れており、定期的なメモリ リークのクリーンアップ、データベース/rabbitmq の更新などとの接続が可能になります。
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
graceful_max_execution :
timeout : 1800 # 30 minutes
exit_code : 10 # default is 0
お気づきかもしれませんが、ディスパッチがまだ期待どおりに機能していません。たとえば、2 人のワーカーがいる状況で、奇数のメッセージがすべて重く、偶数のメッセージが軽い場合、1 人のワーカーは常にビジー状態になり、もう 1 人のワーカーはほとんど仕事をしません。まあ、RabbitMQ はそれについて何も知りませんが、それでもメッセージを均等にディスパッチします。
これは、RabbitMQ がメッセージがキューに入ったときにメッセージをディスパッチするだけであるために発生します。コンシューマの未確認メッセージの数は調べません。 n 番目ごとに、n 番目のコンシューマーにメッセージを盲目的にディスパッチするだけです。
これを無効にするために、prefetch_count=1 設定で Basic.qos メソッドを使用できます。これは、RabbitMQ に一度に複数のメッセージをワーカーに与えないように指示します。言い換えると、ワーカーが前のメッセージを処理して確認応答するまで、新しいメッセージをワーカーにディスパッチしないでください。代わりに、まだビジーではない次のワーカーにディスパッチします。
出典: http://www.rabbitmq.com/tutorials/tutorial-two-python.html
公平なディスパッチングを実装すると、パフォーマンスに悪影響を与える遅延が発生するため注意してください (このブログ投稿を参照)。ただし、これを実装すると、キューの増加に応じて水平方向に動的にスケーリングできるようになります。ブログ投稿で推奨されているように、各メッセージの処理にかかる時間とネットワーク パフォーマンスに応じて prefetch_size の適切な値を評価する必要があります。
RabbitMqBundle を使用すると、次のようにコンシューマごとに qos_options を設定できます。
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
qos_options : {prefetch_size: 0, prefetch_count: 1, global: false}
Symfony 4.2+で使用する場合、バンドルはプロデューサーと通常のコンシューマのエイリアスのコンテナ セットで宣言します。これらは、宣言された型と引数名に基づいて引数を自動配線するために使用されます。これにより、前のプロデューサーの例を次のように変更できます。
public function indexAction ( $ name , ProducerInterface $ uploadPictureProducer )
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ uploadPictureProducer -> publish ( serialize ( $ msg ));
}
引数の名前は、設定のプロデューサ名またはコンシューマ名から構築され、タイプに応じてプロデューサまたはコンシューマの語が接尾辞として付けられます。コンテナ項目の命名規則とは対照的に、名前が既に接尾辞として付けられている場合、単語接尾辞 (プロデューサーまたはコンシューマー) は重複しません。 upload_picture
プロデューサー キーは$uploadPictureProducer
引数名に変更されます。 upload_picture_producer
プロデューサー キーは、 $uploadPictureProducer
引数名のエイリアスとしても使用されます。このような類似した名前は避けるのが最善です。
すべてのプロデューサーは、構成からOldSoundRabbitMqBundleRabbitMqProducerInterface
とプロデューサー クラス オプションにエイリアスされます。サンドボックス モードでは、 ProducerInterface
エイリアスのみが作成されます。プロデューサー インジェクションのヒント引数を入力する場合は、 ProducerInterface
クラスを使用することを強くお勧めします。
すべてのコンシューマはOldSoundRabbitMqBundleRabbitMqConsumerInterface
および%old_sound_rabbit_mq.consumer.class%
構成オプション値のエイリアスになります。通常モードとサンドボックス モードに違いはありません。クライアント インジェクションの引数を型ヒントする場合は、 ConsumerInterface
を使用することを強くお勧めします。
コールバックの例を次に示します。
<?php
//src/Acme/DemoBundle/Consumer/UploadPictureConsumer.php
namespace Acme DemoBundle Consumer ;
use OldSound RabbitMqBundle RabbitMq ConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class UploadPictureConsumer implements ConsumerInterface
{
public function execute ( AMQPMessage $ msg )
{
//Process picture upload.
//$msg will be an instance of `PhpAmqpLibMessageAMQPMessage` with the $msg->body being the data sent over RabbitMQ.
$ isUploadSuccess = someUploadPictureMethod ();
if (! $ isUploadSuccess ) {
// If your image upload failed due to a temporary error you can return false
// from your callback so the message will be rejected by the consumer and
// requeued by RabbitMQ.
// Any other value not equal to false will acknowledge the message and remove it
// from the queue
return false ;
}
}
}
ご覧のとおり、これは 1 つのメソッドConsumerInterface::execute を実装するだけで簡単です。
コールバックは通常の Symfony サービスとして登録する必要があることに注意してください。そこにサービスコンテナ、データベースサービス、Symfony ロガーなどを挿入できます。
メッセージ インスタンスの一部の詳細については、https://github.com/php-amqplib/php-amqplib/blob/master/doc/AMQPMessage.md を参照してください。
コンシューマーを停止するには、コールバックでStopConsumerException
(最後に消費されたメッセージは ACK になりません) またはAckStopConsumerException
(メッセージは ACK になります) をスローできます。悪魔化されたもの (例: スーパーバイザ) を使用している場合、コンシューマは実際に再起動します。
これは、メッセージを送信するだけでもかなりの作業であるように思えます。概要を理解するために要約してみましょう。メッセージを生成/消費するために必要なものは次のとおりです。
それで終わりです!
これは、受信/公開されたメッセージの追跡可能性を確保するための要件でした。これを有効にするには、 enable_logger
構成をコンシューマーまたはパブリッシャーに追加する必要があります。
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
enable_logger : true
必要に応じて、チャネルphpamqplib
を参照して、monolog の異なるハンドラーを使用してキューからのロギングを処理することもできます。
ここまでは消費者にメッセージを送信しただけですが、消費者からの返信を受け取りたい場合はどうすればよいでしょうか?これを実現するには、RPC 呼び出しをアプリケーションに実装する必要があります。このバンドルを使用すると、Symfony でそのようなことを非常に簡単に実現できます。
RPC クライアントとサーバーを構成に追加しましょう。
rpc_clients :
integer_store :
connection : default # default: default
unserializer : json_decode # default: unserialize
lazy : true # default: false
direct_reply_to : false
rpc_servers :
random_int :
connection : default
callback : random_int_server
qos_options : {prefetch_size: 0, prefetch_count: 1, global: false}
exchange_options : {name: random_int, type: topic}
queue_options : {name: random_int_queue, durable: false, auto_delete: true}
serializer : json_encode
完全な設定リファレンスについては、 php app/console config:dump-reference old_sound_rabbit_mq
コマンドを使用してください。
ここには非常に便利なサーバーがあり、クライアントにランダムな整数を返します。リクエストの処理に使用されるコールバックは、 random_int_serverサービスになります。次に、コントローラーからそれを呼び出す方法を見てみましょう。
まず、コマンドラインからサーバーを起動する必要があります。
$ ./app/console_dev rabbitmq:rpc-server random_int
次に、次のコードをコントローラーに追加します。
public function indexAction ( $ name )
{
$ client = $ this -> get ( ' old_sound_rabbit_mq.integer_store_rpc ' );
$ client -> addRequest ( serialize ( array ( ' min ' => 0 , ' max ' => 10 )), ' random_int ' , ' request_id ' );
$ replies = $ client -> getReplies ();
}
ご覧のとおり、クライアント ID がinteger_storeの場合、サービス名はold_sound_rabbit_mq.integer_store_rpcになります。そのオブジェクトを取得したら、次の 3 つのパラメーターを要求するaddRequest
を呼び出してサーバーにリクエストを送信します。
送信する引数は、 rand()
関数の最小値と最大値です。配列をシリアル化して送信します。サーバーが JSON 情報または XML を必要とする場合は、そのようなデータをここに送信します。
最後の部分は、返信を受け取ることです。 PHP スクリプトは、サーバーが値を返すまでブロックします。 $replies変数は、サーバーからの各応答がそれぞれのrequest_idキーに含まれる連想配列になります。
デフォルトでは、RPC クライアントは応答がシリアル化されることを期待します。使用しているサーバーがシリアル化されていない結果を返す場合は、RPC クライアントのexpect_serialized_response
オプションを false に設定します。たとえば、 integer_storeサーバーが結果をシリアル化しなかった場合、クライアントは次のように設定されます。
rpc_clients :
integer_store :
connection : default
expect_serialized_response : false
リクエストの有効期限をミリ秒単位で設定することもできます。この期限を過ぎると、メッセージはサーバーによって処理されなくなり、クライアントのリクエストは単にタイムアウトになります。メッセージの有効期限の設定は、RabbitMQ 3.x 以降でのみ機能します。詳細については、http://www.rabbitmq.com/ttl.html#per-message-ttl を参照してください。
public function indexAction ( $ name )
{
$ expiration = 5000 ; // milliseconds
$ client = $ this -> get ( ' old_sound_rabbit_mq.integer_store_rpc ' );
$ client -> addRequest ( $ body , $ server , $ requestId , $ routingKey , $ expiration );
try {
$ replies = $ client -> getReplies ();
// process $replies['request_id'];
} catch ( PhpAmqpLib Exception AMQPTimeoutException $ e ) {
// handle timeout
}
}
ご想像のとおり、並列 RPC 呼び出しを行うこともできます。
ある Web ページをレンダリングするために、2 つのデータベース クエリを実行する必要があるとします。1 つは完了するまでに 5 秒かかり、もう 1 つは 2 秒かかります (非常にコストのかかるクエリです)。これらを順番に実行すると、約 7 秒でページを配信できる状態になります。これらを並行して実行すると、約 5 秒でページが表示されます。 RabbitMqBundle
を使用すると、このような並列呼び出しを簡単に行うことができます。構成内で並列クライアントと別の RPC サーバーを定義しましょう。
rpc_clients :
parallel :
connection : default
rpc_servers :
char_count :
connection : default
callback : char_count_server
random_int :
connection : default
callback : random_int_server
次に、このコードをコントローラーに挿入する必要があります。
public function indexAction ( $ name )
{
$ client = $ this -> get ( ' old_sound_rabbit_mq.parallel_rpc ' );
$ client -> addRequest ( $ name , ' char_count ' , ' char_count ' );
$ client -> addRequest ( serialize ( array ( ' min ' => 0 , ' max ' => 10 )), ' random_int ' , ' random_int ' );
$ replies = $ client -> getReplies ();
}
前の例と非常に似ていますが、追加のaddRequest
呼び出しがあるだけです。また、後で$replies配列内で必要な応答を簡単に見つけられるように、意味のあるリクエスト識別子を提供します。
クライアントへの直接返信を有効にするには、クライアントのrpc_clients設定でオプションdirect_reply_toを有効にするだけです。
このオプションは、RPC 呼び出しを実行するときに疑似キューamq.rabbitmq.reply-toを使用します。 RPC サーバーでは変更は必要ありません。
RabbitMQ には、バージョン 3.5.0 の時点でコアに優先キューが実装されています。クライアントが提供するオプションの引数を使用して、どのキューでも優先キューに変えることができます (ただし、ポリシーではなくオプションの引数を使用する他の機能とは異なります)。この実装では、限られた数の優先順位 (255) がサポートされます。1 ~ 10 の値が推奨されます。ドキュメントを確認する
優先キューを宣言する方法は次のとおりです
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture', arguments: {'x-max-priority': ['I', 10]} }
callback : upload_picture_service
upload-picture
キューが存在する場合は、 rabbitmq:setup-fabric
コマンドを実行する前にこのキューを削除する必要があります
ここで、優先度の高いメッセージを作成したいとします。この追加情報を含めてメッセージを公開する必要があります。
public function indexAction ()
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ additionalProperties = [ ' priority ' => 10 ] ;
$ routing_key = '' ;
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ), $ routing_key , $ additionalProperties );
}
ロジックを分離するために多数のキューを用意することをお勧めします。単純なコンシューマの場合、キューごとに 1 つのワーカー (コンシューマ) を作成する必要があり、多くの進化に対処する場合は管理が困難になる可能性があります (スーパーバイザ設定に行を追加するのを忘れませんか?)。これは、キューほど多くのワーカーを必要とせず、柔軟性と分離の原則を失わずにいくつかのタスクを再グループ化したい場合があるため、小さなキューにも役立ちます。
複数のコンシューマを使用すると、同じコンシューマで複数のキューをリッスンすることで、このユースケースを処理できます。
コンシューマに複数のキューを設定する方法は次のとおりです。
multiple_consumers :
upload :
connection : default
exchange_options : {name: 'upload', type: direct}
queues_provider : queues_provider_service
queues :
upload-picture :
name : upload_picture
callback : upload_picture_service
routing_keys :
- picture
upload-video :
name : upload_video
callback : upload_video_service
routing_keys :
- video
upload-stats :
name : upload_stats
callback : upload_stats
コールバックは各キューの下で指定されるようになり、単純なコンシューマと同様にConsumerInterface
実装する必要があります。コンシューマのqueues-options
のすべてのオプションは、各キューで使用できます。
すべてのキューが同じ交換の下にあることに注意してください。コールバックの正しいルーティングを設定するのはユーザーの責任です。
queues_provider
は、キューを動的に提供するオプションのサービスです。 QueuesProviderInterface
実装する必要があります。
キュープロバイダーはsetDequeuer
への適切な呼び出しを担当し、コールバックは呼び出し可能である ( ConsumerInterface
ではない) ことに注意してください。キューを提供するサービスがDequeuerAwareInterface
を実装している場合、 setDequeuer
への呼び出しが、現在MultipleConsumer
であるDequeuerInterface
を使用してサービスの定義に追加されます。
アプリケーションに複雑なワークフローがあり、任意のバインディングが必要になる場合があります。任意のバインディング シナリオには、 destination_is_exchange
プロパティを介したバインディング間の交換が含まれる場合があります。
bindings :
- {exchange: foo, destination: bar, routing_key: 'baz.*' }
- {exchange: foo1, destination: foo, routing_key: 'baz.*', destination_is_exchange: true}
Rabbitmq:setup-fabric コマンドは、任意のバインディングを作成する前に、プロデューサ、コンシューマ、およびマルチ コンシューマの構成で定義されているエクスチェンジとキューを宣言します。ただし、rabbitmq:setup-fabric は、バインディングで定義された追加のキューと交換を宣言しません。エクスチェンジ/キューが宣言されていることを確認するのはあなた次第です。
場合によっては、コンシューマーの構成をその場で変更する必要があります。動的コンシューマを使用すると、コンテキストに基づいてコンシューマのキュー オプションをプログラムで定義できます。
たとえば、定義されたコンシューマーが動的な数のトピックを担当する必要があり、その構成を毎回変更したくない (またはできない) シナリオの場合です。
QueueOptionsProviderInterface
実装するサービスqueue_options_provider
を定義し、 dynamic_consumers
構成に追加します。
dynamic_consumers :
proc_logs :
connection : default
exchange_options : {name: 'logs', type: topic}
callback : parse_logs_service
queue_options_provider : queue_options_provider_service
使用例:
$ ./app/console rabbitmq:dynamic-consumer proc_logs server1
この場合、 proc_logs
コンシューマはserver1
に対して実行され、使用するキュー オプションを決定できます。
さて、なぜ匿名の消費者が必要になるのでしょうか?これはインターネットの脅威か何かのように聞こえます...読み続けてください。
AMQP には、トピックと呼ばれるタイプの交換があり、メッセージは、ご想像のとおり、メッセージのトピックに基づいてキューにルーティングされます。ログが作成されたホスト名とそのようなログの重大度をトピックとして使用して、アプリケーションに関するログを RabbiMQ トピック交換に送信できます。メッセージ本文はログの内容となり、ルーティング キーは次のようになります。
無制限のログでキューを埋めたくないので、システムを監視したいときに、キューを作成し、トピックに基づいてログ交換に接続するコンシューマを起動できます。たとえば、 、サーバーから報告されたすべてのエラーを確認したいと思います。ルーティング キーは#.errorのようなものになります。このような場合、キュー名を考え出し、それを交換にバインドし、ログを取得し、バインドを解除してキューを削除する必要があります。幸いなことに、キューを宣言してバインドするときに適切なオプションを指定すると、AMPQ はこれを自動的に行う方法を提供します。問題は、これらのオプションをすべて覚えておきたくないことです。このような理由から、匿名消費者パターンを実装しました。
匿名コンシューマを開始すると、そのような詳細は処理されるため、メッセージが到着したときのコールバックの実装について考えるだけで済みます。キュー名を指定せず、RabbitMQ がランダムなキュー名を割り当てるのを待つため、匿名と呼ばれます。
では、このようなコンシューマをどのように構成して実行すればよいでしょうか?
anon_consumers :
logs_watcher :
connection : default
exchange_options : {name: 'app-logs', type: topic}
callback : logs_watcher
そこで、交換名とそのタイプ、およびメッセージの到着時に実行する必要があるコールバックを指定します。
この匿名コンシューマは、同じ交換にリンクされ、タイプがtopicであるプロデューサーをリッスンできるようになりました。
producers :
app_logs :
connection : default
exchange_options : {name: 'app-logs', type: topic}
匿名コンシューマを開始するには、次のコマンドを使用します。
$ ./app/console_dev rabbitmq:anon-consumer -m 5 -r ' #.error ' logs_watcher
これまでに見たコマンドと比較して新しいオプションは、ルーティング キーを指定するオプション-r '#.error'
だけです。
場合によっては、メッセージのバッチを取得して、それらすべてに対して何らかの処理を実行したい場合があります。バッチ コンシューマを使用すると、このタイプの処理のロジックを定義できます。
例: データベースに情報を挿入するためのメッセージを受け取るキューがあるとします。バッチ挿入を実行した方が、1 つずつ挿入するよりもはるかに優れていることがわかりました。
BatchConsumerInterface
を実装するコールバック サービスを定義し、コンシューマの定義を構成に追加します。
batch_consumers :
batch_basic_consumer :
connection : default
exchange_options : {name: 'batch', type: fanout}
queue_options : {name: 'batch'}
callback : batch.basic
qos_options : {prefetch_size: 0, prefetch_count: 2, global: false}
timeout_wait : 5
auto_setup_fabric : false
idle_timeout_exit_code : -2
keep_alive : false
graceful_max_execution :
timeout : 60
注: keep_alive
オプションがtrue
に設定されている場合、 idle_timeout_exit_code
は無視され、コンシューマー プロセスは続行されます。
1 回の返信ですべてのメッセージを確認するバッチ コンシューマーを実装することも、どのメッセージを確認するかを制御することもできます。
namespace AppBundle Service ;
use OldSound RabbitMqBundle RabbitMq BatchConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class DevckBasicConsumer implements BatchConsumerInterface
{
/**
* @inheritDoc
*/
public function batchExecute ( array $ messages )
{
echo sprintf ( ' Doing batch execution%s ' , PHP_EOL );
foreach ( $ messages as $ message ) {
$ this -> executeSomeLogicPerMessage ( $ message );
}
// you ack all messages got in batch
return true ;
}
}
namespace AppBundle Service ;
use OldSound RabbitMqBundle RabbitMq BatchConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class DevckBasicConsumer implements BatchConsumerInterface
{
/**
* @inheritDoc
*/
public function batchExecute ( array $ messages )
{
echo sprintf ( ' Doing batch execution%s ' , PHP_EOL );
$ result = [];
/** @var AMQPMessage $message */
foreach ( $ messages as $ message ) {
$ result [ $ message -> getDeliveryTag ()] = $ this -> executeSomeLogicPerMessage ( $ message );
}
// you ack only some messages that have return true
// e.g:
// $return = [
// 1 => true,
// 2 => true,
// 3 => false,
// 4 => true,
// 5 => -1,
// 6 => 2,
// ];
// The following will happen:
// * ack: 1,2,4
// * reject and requeq: 3
// * nack and requeue: 6
// * reject and drop: 5
return $ result ;
}
}
次のバッチ コンシューマーを実行する方法:
$ ./bin/console rabbitmq:batch:consumer batch_basic_consumer -w
重要: BatchConsumers では、 -m|messages
オプションを使用できません。 重要: BatchConsumers では、特定の数のバッチのみを使用してコンシューマを停止する場合、 -b|batches
オプションを使用することもできます。バッチ メッセージが消費された後にコンシューマを停止したい場合にのみ、バッチの数を指定してください。
STDIN からデータを読み取り、それを RabbitMQ キューに発行するコマンドがあります。これを使用するには、最初に次のように構成ファイルでproducer
サービスを構成する必要があります。
producers :
words :
connection : default
exchange_options : {name: 'words', type: direct}
そのプロデューサーがwords
を直接交わすメッセージを公開します。もちろん、お好みに合わせて構成を変更することもできます。
次に、いくつかの XML ファイルの内容を公開して、コンシューマのファームで処理できるようにしたいとします。次のようなコマンドを使用するだけで、それらを公開できます。
$ find vendor/symfony/ -name " *.xml " -print0 | xargs -0 cat | ./app/console rabbitmq:stdin-producer words
これは、プレーンな Unix コマンドを使用してプロデューサーを作成できることを意味します。
この 1 つのライナーを分解してみましょう。
$ find vendor/symfony/ -name " *.xml " -print0
このコマンドは、symfony フォルダー内のすべての.xml
ファイルを検索し、ファイル名を出力します。これらの各ファイル名は、 xargs
経由でcat
にパイプされます。
$ xargs -0 cat
そして最後に、 cat
の出力は、次のように呼び出されるプロデューサーに直接送られます。
$ ./app/console rabbitmq:stdin-producer words
引数は 1 つだけ取ります。これは、 config.yml
ファイルで設定したプロデューサの名前です。
このバンドルの目的は、アプリケーションでメッセージを生成し、構成したいくつかの交換にメッセージを公開できるようにすることです。
場合によっては、構成が正しい場合でも、作成中のメッセージはキューが存在しないため、どのキューにもルーティングされません。キューを作成するには、キューの消費を担当するコンシューマを実行する必要があります。
コンシューマの数が多い場合、コンシューマごとにコマンドを起動するのは悪夢のような作業になる可能性があります。
エクスチェンジ、キュー、バインディングを一度に作成し、メッセージが失われないようにするには、次のコマンドを実行します。
$ ./app/console rabbitmq:setup-fabric
必要に応じて、RabbitMQ ファブリックがすでに定義されていると想定するようにコンシューマとプロデューサを構成できます。これを行うには、構成に以下を追加します。
producers :
upload_picture :
auto_setup_fabric : false
consumers :
upload_picture :
auto_setup_fabric : false
デフォルトでは、コンシューマーまたはプロデューサーは、RabbitMQ の開始時に必要なものすべてを宣言します。エクスチェンジまたはキューが定義されていない場合、エラーが発生するため、これを使用する場合は注意してください。構成を変更した場合は、上記の setup-fabric コマンドを実行して構成を宣言する必要があります。
貢献するには、新しい機能を追加したり既存の機能を変更したりする場合は、その内容をこの README に文書化する必要があることを考慮して、新しいコードを含むプル リクエストを開くだけです。 BC を破った場合は、それを文書化する必要もあります。また、CHANGELOG を更新する必要があります。それで:
参照: resource/meta/LICENSE.md
バンドル構造とドキュメントは部分的に RedisBundle に基づいています。