RabbitMqBundle
使用 php-amqplib 函式庫透過 RabbitMQ 將訊息傳遞合併到您的應用程式中。
該套件實現了 Thumper 庫中所示的多種訊息傳遞模式。因此,從 Symfony 控制器向 RabbitMQ 發布訊息非常簡單:
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ));
稍後,當您想要使用upload_pictures
佇列中的 50 個訊息時,只需在 CLI 上執行:
$ ./app/console rabbitmq:consumer -m 50 upload_picture
所有範例都需要一個正在運行的 RabbitMQ 伺服器。
該捆綁包在 Symfony Live Paris 2011 會議上發布。請參閱此處的幻燈片。
由於 Symfony >=4.4 引起的重大更改,發布了一個新標籤,使該套件與 Symfony >=4.4 相容。
需要 Composer 的套件及其相依性:
$ composer require php-amqplib/rabbitmq-bundle
註冊捆綁包:
// app/AppKernel.php
public function registerBundles ()
{
$ bundles = array (
new OldSound RabbitMqBundle OldSoundRabbitMqBundle (),
);
}
享受 !
如果您有一個用於運行 RabbitMQ 用戶的控制台應用程序,則不需要 Symfony HttpKernel 和 FrameworkBundle。從版本 1.6 開始,您可以使用依賴注入元件載入此捆綁包配置和服務,然後使用消費者指令。
在您的composer.json 檔案中需要該套件:
{
"require": {
"php-amqplib/rabbitmq-bundle": "^2.0",
}
}
註冊擴展和編譯器通行證:
use OldSound RabbitMqBundle DependencyInjection OldSoundRabbitMqExtension ;
use OldSound RabbitMqBundle DependencyInjection Compiler RegisterPartsPass ;
// ...
$ containerBuilder -> registerExtension ( new OldSoundRabbitMqExtension ());
$ containerBuilder -> addCompilerPass ( new RegisterPartsPass ());
自 2012 年 6 月 4 日起,「生產者」配置部分中聲明的交易所的一些預設選項已更改,以匹配「消費者」部分中聲明的交易所的預設選項。受影響的設定有:
durable
由false
變為true
,auto_delete
已從true
更改為false
。如果您依賴先前的預設值,則必須更新您的配置。
自 2012-04-24 起 ConsumerInterface::execute 方法簽章已更改
自 2012 年 1 月 3 日起,消費者執行方法取得整個 AMQP 訊息對象,而不僅僅是正文。有關更多詳細信息,請參閱 CHANGELOG 文件。
在設定檔中新增old_sound_rabbit_mq
部分:
old_sound_rabbit_mq :
connections :
default :
host : ' localhost '
port : 5672
user : ' guest '
password : ' guest '
vhost : ' / '
lazy : false
connection_timeout : 3
read_write_timeout : 3
# the timeout when waiting for a response from rabbitMQ (0.0 means waits forever)
channel_rpc_timeout : 0.0
# requires php-amqplib v2.4.1+ and PHP5.4+
keepalive : false
# requires php-amqplib v2.4.1+
heartbeat : 0
# requires php_sockets.dll
use_socket : true # default false
login_method : ' AMQPLAIN ' # default 'AMQPLAIN', can be 'EXTERNAL' or 'PLAIN', see https://www.rabbitmq.com/docs/access-control#mechanisms
another :
# A different (unused) connection defined by an URL. One can omit all parts,
# except the scheme (amqp:). If both segment in the URL and a key value (see above)
# are given the value from the URL takes precedence.
# See https://www.rabbitmq.com/uri-spec.html on how to encode values.
url : ' amqp://guest:password@localhost:5672/vhost?lazy=1&connection_timeout=6 '
producers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
service_alias : my_app_service # no alias by default
default_routing_key : ' optional.routing.key ' # defaults to '' if not set
default_content_type : ' content/type ' # defaults to 'text/plain'
default_delivery_mode : 2 # optional. 1 means non-persistent, 2 means persistent. Defaults to "2".
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
options :
no_ack : false # optional. If set to "true", automatic acknowledgement mode will be used by this consumer. Default "false". See https://www.rabbitmq.com/confirms.html for details.
在這裡,我們配置應用程式將具有的連接服務和訊息端點。在此範例中,您的服務容器將包含服務old_sound_rabbit_mq.upload_picture_producer
Producer 和old_sound_rabbit_mq.upload_picture_consumer
。後者期望有一個名為upload_picture_service
的服務。
如果沒有為客戶端指定連接,則用戶端將尋找具有相同別名的連接。因此,對於我們的upload_picture
服務容器將尋找upload_picture
連接。
如果您需要新增可選的佇列參數,那麼您的佇列選項可以是這樣的:
queue_options : {name: 'upload-picture', arguments: {'x-ha-policy': ['S', 'all']}}
訊息 TTL 為 20 秒的另一個範例:
queue_options : {name: 'upload-picture', arguments: {'x-message-ttl': ['I', 20000]}}
參數值必須是資料類型和值的清單。有效的資料型態是:
S
- 字串I
整數D
- 十進位T
- 時間戳F
表A
- 數組t
- 布爾根據您的需求調整arguments
。
如果你想將佇列與特定的路由鍵綁定,你可以在生產者或消費者配置中聲明它:
queue_options :
name : " upload-picture "
routing_keys :
- ' android.#.upload '
- ' iphone.upload '
在 Symfony 環境中,所有服務都會針對每個要求完全引導,從版本 >= 4.3 開始,您可以將服務宣告為惰性服務(Lazy Services)。此捆綁包仍然不支援新的延遲服務功能,但您可以在連線配置中設定lazy: true
以避免每個請求中與訊息代理程式不必要的連線。由於性能原因,強烈建議使用惰性連接,儘管如此,預設情況下會停用惰性選項,以避免已經使用此捆綁包的應用程式可能會中斷。
最好將read_write_timeout
設定為心跳的 2 倍,這樣您的套接字就會開啟。如果您不這樣做,或使用不同的乘數,消費者套接字就有超時的風險。
請記住,如果您的任務運行時間通常超過心跳週期,那麼您可能會遇到問題,並且沒有好的解決方案(連結)。考慮對心跳使用較大的值,或停用心跳以支援 tcp 的keepalive
(在客戶端和伺服器端)和graceful_max_execution_timeout
功能。
您可以為一個連線提供多個主機。這將允許您使用具有多個節點的 RabbitMQ 叢集。
old_sound_rabbit_mq :
connections :
default :
hosts :
- host : host1
port : 3672
user : user1
password : password1
vhost : vhost1
- url : ' amqp://guest:password@localhost:5672/vhost '
connection_timeout : 3
read_write_timeout : 3
注意不能指定
connection_timeout
read_write_timeout
use_socket
ssl_context
keepalive
heartbeat
connection_parameters_provider
分別給每個主機設定參數。
有時您的連線資訊可能需要是動態的。動態連接參數可讓您透過服務以程式設計方式提供或覆寫參數。
例如,在連接的vhost
參數取決於白標應用程式的當前租戶並且您不希望(或不能)每次都更改其配置的情況下。
在connection_parameters_provider
下定義一個實作ConnectionParametersProviderInterface
的服務,並將其加入適當的connections
配置。
connections :
default :
host : ' localhost '
port : 5672
user : ' guest '
password : ' guest '
vhost : ' foo ' # to be dynamically overridden by `connection_parameters_provider`
connection_parameters_provider : connection_parameters_provider_service
實施範例:
class ConnectionParametersProviderService implements ConnectionParametersProvider {
...
public function getConnectionParameters () {
return array ( ' vhost ' => $ this -> getVhost ());
}
. . .
}
在這種情況下, vhost
參數將被getVhost()
的輸出覆寫。
在訊息傳遞應用程式中,向代理程式發送訊息的進程稱為生產者,而接收這些訊息的進程稱為消費者。在您的應用程式中,您將擁有其中的幾個,您可以在配置中各自的條目下列出它們。
生產者將用於向伺服器發送訊息。在 AMQP 模型中,訊息會傳送到交換器,這表示在生產者的配置中,您必須指定連線選項以及交換選項,通常是交換器的名稱及其類型。
現在假設您要在背景處理圖片上傳。將圖片移動到最終位置後,您將向伺服器發布一條訊息,其中包含以下資訊:
public function indexAction ( $ name )
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ));
}
如您所見,如果在您的設定中您有一個名為upload_picture的生產者,那麼在服務容器中您將有一個名為old_sound_rabbit_mq.upload_picture_ Producer 的服務。
除了訊息本身之外, OldSoundRabbitMqBundleRabbitMqProducer#publish()
方法還接受可選的路由鍵參數和可選的附加屬性陣列。附加屬性陣列可讓您變更預設PhpAmqpLibMessageAMQPMessage
物件的屬性。例如,您可以透過這種方式變更應用程式標頭。
您可以使用setContentType和setDeliveryMode方法分別設定訊息內容類型和訊息傳遞模式,覆蓋「生產者」配置部分中的任何預設設定。如果沒有被「生產者」配置或對這些方法的明確呼叫(如下例所示)覆蓋,則內容類型的預設值為text/plain ,交付模式的預設值為 2 。
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> setContentType ( ' application/json ' );
如果您需要為生產者使用自訂類別(應繼承自OldSoundRabbitMqBundleRabbitMqProducer
),您可以使用class
選項:
...
producers :
upload_picture :
class : MyCustomProducer
connection : default
exchange_options : {name: 'upload-picture', type: direct}
...
下一個難題是讓消費者將訊息從隊列中取出並進行相應的處理。
當前生產者發出了兩個事件。
此事件在發布訊息之前立即發生。這是一個很好的鉤子,可以在實際發送訊息之前進行任何最終日誌記錄、驗證等。監聽器的範例實作:
namespace App EventListener ;
use OldSound RabbitMqBundle Event BeforeProducerPublishMessageEvent ;
use Symfony Component EventDispatcher Attribute AsEventListener ;
#[AsEventListener(event: BeforeProducerPublishMessageEvent:: NAME )]
final class AMQPBeforePublishEventListener
{
public function __invoke ( BeforeProducerPublishMessageEvent $ event ): void
{
// Your code goes here
}
}
該事件在發布訊息後立即發生。這是一個很好的鉤子,可以在實際發送訊息後進行任何確認日誌記錄、提交等。監聽器的範例實作:
namespace App EventListener ;
use OldSound RabbitMqBundle Event AfterProducerPublishMessageEvent ;
use Symfony Component EventDispatcher Attribute AsEventListener ;
#[AsEventListener(event: AfterProducerPublishMessageEvent:: NAME )]
final class AMQPBeforePublishEventListener
{
public function __invoke ( AfterProducerPublishMessageEvent $ event ): void
{
// Your code goes here
}
}
消費者將連接到伺服器並啟動一個循環,等待處理傳入的訊息。其行為將取決於此類消費者的指定回調。讓我們回顧一下上面的消費者配置:
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
正如我們所看到的,回呼選項引用了upload_picture_service 。當消費者從伺服器收到訊息時,它將執行此類回調。如果出於測試或調試目的,您需要指定不同的回調,那麼您可以在那裡更改它。
除了回調之外,我們還指定要使用的連接,就像我們使用生產者一樣。剩下的選項是exchange_options和queue_options 。 Exchange_options應該與Producer使用的選項相同。在queue_options中,我們將提供一個佇列名稱。為什麼?
正如我們所說,AMQP 中的消息被發佈到交換器。這並不意味著訊息已到達隊列。為此,首先我們需要建立這樣的佇列,然後將其綁定到交換器。這樣做的一個很酷的事情是,您可以將多個佇列綁定到一個交換器,這樣一條訊息可以到達多個目的地。這種方法的優點是與生產者和消費者解耦。生產者並不關心有多少消費者會處理他的消息。它所需要的只是他的消息到達伺服器。透過這種方式,我們可以擴展每次上傳圖片時執行的操作,而無需更改控制器中的程式碼。
現在,如何運行消費者?有一個命令可以像這樣執行:
$ ./app/console rabbitmq:consumer -m 50 upload_picture
這意味著什麼?我們正在執行upload_picture消費者,告訴它只消費 50 則訊息。每次消費者從伺服器收到訊息時,它將執行配置的回調,將 AMQP 訊息作為PhpAmqpLibMessageAMQPMessage
類別的實例傳遞。訊息正文可以透過呼叫$msg->body
來取得。預設情況下,消費者將在無限循環中處理訊息,以實現某些無盡的定義。
如果你想確保消費者在 Unix 訊號上立即完成執行,你可以使用-w
標誌運行指令。
$ ./app/console rabbitmq:consumer -w upload_picture
那麼消費者就會立即完成執行。
要使用帶有此標誌的命令,您需要安裝帶有 PCNTL 擴充功能的 PHP。
如果您想建立消費者記憶體限制,可以使用標誌-l
來完成。在以下範例中,此標誌新增了 256 MB 記憶體限制。為了避免 PHP 允許的記憶體大小錯誤,消費者將在達到 256MB 之前停止 5MB。
$ ./app/console rabbitmq:consumer -l 256
如果要刪除佇列中等待的所有訊息,可以執行以下命令來清除該佇列:
$ ./app/console rabbitmq:purge --no-confirmation upload_picture
若要刪除消費者的佇列,請使用以下命令:
$ ./app/console rabbitmq:delete --no-confirmation upload_picture
這在許多場景中都很有用。有 3 個 AMQP 事件:
class OnConsumeEvent extends AMQPEvent
{
const NAME = AMQPEvent:: ON_CONSUME ;
/**
* OnConsumeEvent constructor.
*
* @param Consumer $consumer
*/
public function __construct ( Consumer $ consumer )
{
$ this -> setConsumer ( $ consumer );
}
}
s say you need to sleep / stop consumer/s on a new application deploy. You can listen for
OldSoundRabbitMqBundleEventOnConsumeEvent` 並檢查新應用程式部署。
class BeforeProcessingMessageEvent extends AMQPEvent
{
const NAME = AMQPEvent:: BEFORE_PROCESSING_MESSAGE ;
/**
* BeforeProcessingMessageEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer , AMQPMessage $ AMQPMessage )
{
$ this -> setConsumer ( $ consumer );
$ this -> setAMQPMessage ( $ AMQPMessage );
}
}
在處理AMQPMessage
之前引發的事件。
class AfterProcessingMessageEvent extends AMQPEvent
{
const NAME = AMQPEvent:: AFTER_PROCESSING_MESSAGE ;
/**
* AfterProcessingMessageEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer , AMQPMessage $ AMQPMessage )
{
$ this -> setConsumer ( $ consumer );
$ this -> setAMQPMessage ( $ AMQPMessage );
}
}
處理AMQPMessage
後引發的事件。如果進程訊息將拋出異常,則事件將不會引發。
<?php
class OnIdleEvent extends AMQPEvent
{
const NAME = AMQPEvent:: ON_IDLE ;
/**
* OnIdleEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer )
{
$ this -> setConsumer ( $ consumer );
$ this -> forceStop = true ;
}
}
當wait
方法因逾時退出而未收到訊息時引發事件。為了利用此事件,必須配置消費者idle_timeout
。預設情況下,進程在空閒逾時時退出,您可以透過在偵聽器中設定$event->setForceStop(false)
來阻止它。
如果您需要在一段時間內佇列沒有訊息時設定逾時,可以設定idle_timeout
(以秒為單位)。 idle_timeout_exit_code
指定當發生空閒逾時時消費者應傳回什麼退出代碼。如果不指定它,消費者將拋出PhpAmqpLibExceptionAMQPTimeoutException
異常。
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
idle_timeout : 60
idle_timeout_exit_code : 0
設定timeout_wait
以秒為單位。 timeout_wait
指定在確保當前連線仍然有效之前,消費者在沒有收到新訊息的情況下將等待多長時間。
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
idle_timeout : 60
idle_timeout_exit_code : 0
timeout_wait : 10
如果您希望消費者運行到一定時間然後正常退出,請設定graceful_max_execution.timeout
(以秒為單位)。 「優雅退出」意味著消費者將在目前正在運行的任務之後退出,或在等待新任務時立即退出。 graceful_max_execution.exit_code
指定當發生優雅最大執行逾時時,使用者應傳回什麼退出代碼。如果不指定它,消費者將以狀態0
退出。
此功能與 Supervisord 結合使用非常有用,它們一起可以允許定期記憶體洩漏清理、與資料庫/rabbitmq 更新的連接等。
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
graceful_max_execution :
timeout : 1800 # 30 minutes
exit_code : 10 # default is 0
您可能已經注意到,調度仍然沒有完全按照我們想要的方式工作。例如,在有兩名工作人員的情況下,當所有奇數訊息都很重而偶數訊息都很輕時,一名工作人員將一直忙碌,而另一名工作人員幾乎不會做任何工作。好吧,RabbitMQ 對此一無所知,並且仍然會均勻地分發訊息。
發生這種情況是因為 RabbitMQ 只是在訊息進入佇列時才調度該訊息。它不會查看消費者未確認訊息的數量。它只是盲目地將每條第 n 條訊息分派給第 n 個消費者。
為了解決這個問題,我們可以使用 basic.qos 方法和 prefetch_count=1 設定。這告訴 RabbitMQ 不要一次給一個工作線程多於一條訊息。或者,換句話說,在工作人員處理並確認前一條訊息之前,不要向工作人員發送新訊息。相反,它會將其分派給下一個不忙的工作人員。
取自:http://www.rabbitmq.com/tutorials/tutorial-two-python.html
請小心,因為實施公平調度會引入延遲,從而損害效能(請參閱此部落格文章)。但實現它允許您隨著隊列的增加而動態地水平擴展。您應該按照部落格文章的建議,根據處理每個訊息所需的時間和網路效能來評估 prefetch_size 的正確值。
使用 RabbitMqBundle,您可以為每個消費者設定 qos_options,如下所示:
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
qos_options : {prefetch_size: 0, prefetch_count: 1, global: false}
如果與Symfony 4.2+一起使用,則在容器中聲明生產者和普通消費者的別名集。這些用於基於聲明的類型和參數名稱的參數自動組裝。這允許您將先前的生產者範例變更為:
public function indexAction ( $ name , ProducerInterface $ uploadPictureProducer )
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ uploadPictureProducer -> publish ( serialize ( $ msg ));
}
參數名稱由配置中的生產者或消費者名稱建構而成,並根據類型添加生產者或消費者單字後綴。與容器項命名約定相反,如果名稱已經帶有後綴,則單字後綴(生產者或消費者)將不會重複。 upload_picture
生產者鍵將變更為$uploadPictureProducer
參數名稱。 upload_picture_producer
Producer 生產者金鑰也可以別名為$uploadPictureProducer
參數名稱。最好避免以這種方式使用相似的名稱。
所有生產者都被別名OldSoundRabbitMqBundleRabbitMqProducerInterface
和設定中的生產者類別選項。在沙盒模式下,僅建立ProducerInterface
別名。強烈建議在為生產者註入鍵入提示參數時使用ProducerInterface
類別。
所有使用者都別名為OldSoundRabbitMqBundleRabbitMqConsumerInterface
和%old_sound_rabbit_mq.consumer.class%
配置選項值。常規模式和沙盒模式沒有差別。強烈建議在為客戶端注入鍵入提示參數時使用ConsumerInterface
。
這是一個回調範例:
<?php
//src/Acme/DemoBundle/Consumer/UploadPictureConsumer.php
namespace Acme DemoBundle Consumer ;
use OldSound RabbitMqBundle RabbitMq ConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class UploadPictureConsumer implements ConsumerInterface
{
public function execute ( AMQPMessage $ msg )
{
//Process picture upload.
//$msg will be an instance of `PhpAmqpLibMessageAMQPMessage` with the $msg->body being the data sent over RabbitMQ.
$ isUploadSuccess = someUploadPictureMethod ();
if (! $ isUploadSuccess ) {
// If your image upload failed due to a temporary error you can return false
// from your callback so the message will be rejected by the consumer and
// requeued by RabbitMQ.
// Any other value not equal to false will acknowledge the message and remove it
// from the queue
return false ;
}
}
}
正如您所看到的,這就像實作一個方法一樣簡單: ConsumerInterface::execute 。
請記住,您的回呼需要註冊為普通的 Symfony 服務。您可以在那裡注入服務容器、資料庫服務、Symfony 記錄器等。
有關訊息實例組成部分的更多詳細信息,請參閱 https://github.com/php-amqplib/php-amqplib/blob/master/doc/AMQPMessage.md。
要停止消費者,回呼可以拋出StopConsumerException
(最後消費的消息不會被確認)或AckStopConsumerException
(訊息將被確認)。如果使用妖魔化,例如:主管,消費者實際上將重新啟動。
對於發送訊息來說,這似乎是相當多的工作,讓我們回顧一下以獲得更好的概述。這就是我們產生/消費訊息所需要的:
就是這樣!
這是對接收/發布的消息進行追蹤的要求。為了啟用此功能,您需要為消費者或發布者新增enable_logger
配置。
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
enable_logger : true
如果您願意,也可以透過引用通道phpamqplib
在獨白中處理來自具有不同處理程序的佇列的日誌記錄。
到目前為止,我們只是向消費者發送了訊息,但是如果我們想得到他們的回應怎麼辦?為了實現這一點,我們必須在我們的應用程式中實作 RPC 呼叫。這個捆綁包使得使用 Symfony 實現這些事情變得非常容易。
讓我們在配置中新增 RPC 客戶端和伺服器:
rpc_clients :
integer_store :
connection : default # default: default
unserializer : json_decode # default: unserialize
lazy : true # default: false
direct_reply_to : false
rpc_servers :
random_int :
connection : default
callback : random_int_server
qos_options : {prefetch_size: 0, prefetch_count: 1, global: false}
exchange_options : {name: random_int, type: topic}
queue_options : {name: random_int_queue, durable: false, auto_delete: true}
serializer : json_encode
如需完整設定參考,請使用php app/console config:dump-reference old_sound_rabbit_mq
指令。
這裡我們有一個非常有用的伺服器:它向客戶端傳回隨機整數。用於處理請求的回呼將是random_int_server服務。現在讓我們看看如何從控制器呼叫它。
首先我們必須從命令列啟動伺服器:
$ ./app/console_dev rabbitmq:rpc-server random_int
然後將以下程式碼新增到我們的控制器中:
public function indexAction ( $ name )
{
$ client = $ this -> get ( ' old_sound_rabbit_mq.integer_store_rpc ' );
$ client -> addRequest ( serialize ( array ( ' min ' => 0 , ' max ' => 10 )), ' random_int ' , ' request_id ' );
$ replies = $ client -> getReplies ();
}
如您所看到的,如果我們的客戶端 ID 是integer_store ,服務名稱將為old_sound_rabbit_mq.integer_store_rpc 。取得該物件後,我們透過呼叫需要三個參數的addRequest
向伺服器發出請求:
我們發送的參數是rand()
函數的最小值和最大值。我們透過序列化數組來發送它們。如果我們的伺服器需要 JSON 資訊或 XML,我們將在此處發送此類資料。
最後一步是得到回覆。我們的 PHP 腳本將會阻塞,直到伺服器傳回一個值。 $replies變數將是一個關聯數組,其中來自伺服器的每個回應都將包含在對應的request_id鍵中。
預設情況下,RPC 客戶端希望回應被序列化。如果您正在使用的伺服器傳回非序列化結果,則將 RPC 用戶端expect_serialized_response
選項設為 false。例如,如果integer_store伺服器沒有序列化結果,客戶端將設定如下:
rpc_clients :
integer_store :
connection : default
expect_serialized_response : false
您也可以設定請求的過期時間(以毫秒為單位),之後伺服器將不再處理訊息,客戶端請求將簡單地逾時。設定訊息過期僅適用於 RabbitMQ 3.x 及更高版本。請造訪 http://www.rabbitmq.com/ttl.html#per-message-ttl 以了解更多資訊。
public function indexAction ( $ name )
{
$ expiration = 5000 ; // milliseconds
$ client = $ this -> get ( ' old_sound_rabbit_mq.integer_store_rpc ' );
$ client -> addRequest ( $ body , $ server , $ requestId , $ routingKey , $ expiration );
try {
$ replies = $ client -> getReplies ();
// process $replies['request_id'];
} catch ( PhpAmqpLib Exception AMQPTimeoutException $ e ) {
// handle timeout
}
}
正如您所猜測的,我們也可以進行並行 RPC 呼叫。
假設為了渲染某個網頁,您需要執行兩次資料庫查詢,一次需要 5 秒才能完成,另一次需要 2 秒——非常昂貴的查詢——。如果您按順序執行它們,那麼您的頁面將在大約 7 秒內準備好交付。如果並行運行它們,那麼您的頁面將在大約 5 秒內得到服務。使用RabbitMqBundle
我們可以輕鬆地進行此類並行呼叫。讓我們在配置中定義一個並行客戶端和另一個 RPC 伺服器:
rpc_clients :
parallel :
connection : default
rpc_servers :
char_count :
connection : default
callback : char_count_server
random_int :
connection : default
callback : random_int_server
然後這段程式碼應該會進入我們的控制器:
public function indexAction ( $ name )
{
$ client = $ this -> get ( ' old_sound_rabbit_mq.parallel_rpc ' );
$ client -> addRequest ( $ name , ' char_count ' , ' char_count ' );
$ client -> addRequest ( serialize ( array ( ' min ' => 0 , ' max ' => 10 )), ' random_int ' , ' random_int ' );
$ replies = $ client -> getReplies ();
}
與前面的範例非常相似,我們只是有一個額外的addRequest
呼叫。此外,我們還提供了有意義的請求標識符,以便稍後我們可以更輕鬆地在$replies數組中找到我們想要的回應。
若要啟用直接回覆客戶端,您只需在客戶端的rpc_clients設定上啟用選項direct_reply_to 。
此選項在進行 RPC 呼叫時將使用偽佇列amq.rabbitmq.reply-to 。在 RPC 伺服器上無需進行任何修改。
從版本 3.5.0 開始,RabbitMQ 在核心中實作了優先權佇列。任何佇列都可以使用客戶端提供的可選參數轉換為優先佇列(但是,與使用可選參數而不是策略的其他功能不同)。此實作支援有限數量的優先順序:255 個。檢查文檔
這是聲明優先權隊列的方法
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture', arguments: {'x-max-priority': ['I', 10]} }
callback : upload_picture_service
如果upload-picture
佇列存在,則必須在執行rabbitmq:setup-fabric
指令之前刪除該佇列
現在假設您想要發布一條具有高優先級的訊息,您必須使用此附加資訊來發布該訊息
public function indexAction ()
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ additionalProperties = [ ' priority ' => 10 ] ;
$ routing_key = '' ;
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ), $ routing_key , $ additionalProperties );
}
有很多隊列用於邏輯分離是一個很好的實踐。對於一個簡單的消費者,您必須為每個隊列建立一個工作人員(消費者),並且在處理許多演變時可能很難管理(忘記在supervisord配置中添加一行?)。這對於小型佇列也很有用,因為您可能不希望擁有與佇列一樣多的工作人員,並且希望在不失去靈活性和分離原則的情況下將一些任務重新組合在一起。
多個消費者允許您透過偵聽同一消費者上的多個隊列來處理此用例。
以下是如何設定具有多個隊列的消費者:
multiple_consumers :
upload :
connection : default
exchange_options : {name: 'upload', type: direct}
queues_provider : queues_provider_service
queues :
upload-picture :
name : upload_picture
callback : upload_picture_service
routing_keys :
- picture
upload-video :
name : upload_video
callback : upload_video_service
routing_keys :
- video
upload-stats :
name : upload_stats
callback : upload_stats
現在,回調在每個佇列下指定,並且必須像簡單的消費者一樣實作ConsumerInterface
。消費者中queues-options
的所有選項對於每個隊列都是可用的。
請注意,所有佇列都位於同一個交換器下,您可以為回呼設定正確的路由。
queues_provider
是動態提供佇列的可選服務。它必須實作QueuesProviderInterface
。
請注意,佇列提供者負責對setDequeuer
的正確調用,並且回調是可調用的(而不是ConsumerInterface
)。如果服務提供佇列實作DequeuerAwareInterface
,則對setDequeuer
呼叫將會加入到服務的定義中,而DequeuerInterface
目前是MultipleConsumer
。
您可能會發現您的應用程式具有複雜的工作流程,並且您需要進行任意綁定。任意綁定場景可能包括透過destination_is_exchange
屬性進行交換到交換的綁定。
bindings :
- {exchange: foo, destination: bar, routing_key: 'baz.*' }
- {exchange: foo1, destination: foo, routing_key: 'baz.*', destination_is_exchange: true}
在建立任意綁定之前,rabbitmq:setup-fabric 指令將聲明生產者、消費者和多消費者設定中定義的交換和佇列。但是,rabbitmq:setup-fabric不會宣告綁定中定義的附加佇列和交換器。由您來確保聲明交換/佇列。
有時您必須即時變更消費者的配置。動態消費者可讓您根據上下文以程式定義消費者隊列選項。
例如,在定義的使用者必須負責動態數量的主題並且您不希望(或不能)每次都更改其配置的情況下。
定義一個實作QueueOptionsProviderInterface
的服務queue_options_provider
,並將其新增至您的dynamic_consumers
配置。
dynamic_consumers :
proc_logs :
connection : default
exchange_options : {name: 'logs', type: topic}
callback : parse_logs_service
queue_options_provider : queue_options_provider_service
用法範例:
$ ./app/console rabbitmq:dynamic-consumer proc_logs server1
在這種情況下, proc_logs
消費者為server1
運行,它可以決定它使用的佇列選項。
現在,為什麼我們需要匿名消費者?這聽起來像是某種網路威脅之類的…繼續閱讀。
在 AMQP 中,有一種稱為主題的交換類型,其中訊息根據(您猜的)訊息主題路由到佇列。我們可以使用建立日誌的主機名稱以及此類日誌的嚴重性作為主題,將有關應用程式的日誌傳送至 RabbiMQ 主題交換。訊息正文將是日誌內容,我們的路由鍵將如下所示:
由於我們不想用無限的日誌填充佇列,所以我們可以做的是,當我們想要監視系統時,我們可以啟動一個消費者,它會建立一個佇列並根據某個主題附加到日誌交換,例如,我們希望看到我們的伺服器報告的所有錯誤。路由鍵類似: #.error 。在這種情況下,我們必須提供一個佇列名稱,將其綁定到交換器,取得日誌,解除綁定並刪除佇列。令人高興的是,如果您在聲明和綁定佇列時提供正確的選項,AMPQ 提供了一種自動執行此操作的方法。問題是您不想記住所有這些選項。基於這個原因,我們實現了匿名消費者模式。
當我們啟動匿名消費者時,它會處理這些細節,我們只需要考慮在訊息到達時實現回調。之所以稱為 Anonymous,是因為它不會指定佇列名稱,但會等待 RabbitMQ 為其分配一個隨機名稱。
現在,如何配置和運作這樣的消費者?
anon_consumers :
logs_watcher :
connection : default
exchange_options : {name: 'app-logs', type: topic}
callback : logs_watcher
在那裡,我們指定交換名稱及其類型以及訊息到達時應執行的回調。
這個匿名消費者現在能夠收聽生產者的聲音,這些生產者連結到相同的交換和主題類型:
producers :
app_logs :
connection : default
exchange_options : {name: 'app-logs', type: topic}
要啟動匿名消費者,我們使用以下命令:
$ ./app/console_dev rabbitmq:anon-consumer -m 5 -r ' #.error ' logs_watcher
與我們之前見過的命令相比,唯一的新選項是指定路由鍵的選項: -r '#.error'
。
在某些情況下,您需要取得一批訊息,然後對所有訊息進行一些處理。批次消費者將允許您定義此類處理的邏輯。
例如:想像一下,您有一個隊列,您收到一條訊息,需要在資料庫中插入一些訊息,並且您意識到,如果進行批量插入,則比逐一插入要好得多。
定義一個實作BatchConsumerInterface
的回呼服務,並將使用者的定義加入您的配置。
batch_consumers :
batch_basic_consumer :
connection : default
exchange_options : {name: 'batch', type: fanout}
queue_options : {name: 'batch'}
callback : batch.basic
qos_options : {prefetch_size: 0, prefetch_count: 2, global: false}
timeout_wait : 5
auto_setup_fabric : false
idle_timeout_exit_code : -2
keep_alive : false
graceful_max_execution :
timeout : 60
注意:如果keep_alive
選項設定為true
, idle_timeout_exit_code
將被忽略,消費者進程將繼續。
您可以實現一個批次消費者,它將在一次返回中確認所有訊息,或者您可以控制要確認的訊息。
namespace AppBundle Service ;
use OldSound RabbitMqBundle RabbitMq BatchConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class DevckBasicConsumer implements BatchConsumerInterface
{
/**
* @inheritDoc
*/
public function batchExecute ( array $ messages )
{
echo sprintf ( ' Doing batch execution%s ' , PHP_EOL );
foreach ( $ messages as $ message ) {
$ this -> executeSomeLogicPerMessage ( $ message );
}
// you ack all messages got in batch
return true ;
}
}
namespace AppBundle Service ;
use OldSound RabbitMqBundle RabbitMq BatchConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class DevckBasicConsumer implements BatchConsumerInterface
{
/**
* @inheritDoc
*/
public function batchExecute ( array $ messages )
{
echo sprintf ( ' Doing batch execution%s ' , PHP_EOL );
$ result = [];
/** @var AMQPMessage $message */
foreach ( $ messages as $ message ) {
$ result [ $ message -> getDeliveryTag ()] = $ this -> executeSomeLogicPerMessage ( $ message );
}
// you ack only some messages that have return true
// e.g:
// $return = [
// 1 => true,
// 2 => true,
// 3 => false,
// 4 => true,
// 5 => -1,
// 6 => 2,
// ];
// The following will happen:
// * ack: 1,2,4
// * reject and requeq: 3
// * nack and requeue: 6
// * reject and drop: 5
return $ result ;
}
}
如何運行以下批次消費者:
$ ./bin/console rabbitmq:batch:consumer batch_basic_consumer -w
重要提示:BatchConsumers 將不具有可用的-m|messages
選項 重要提示:如果您只想使用特定數量的批次,然後停止用戶,BatchConsumers 也可以使用-b|batches
選項。只有當您希望消費者在消費完這些批次訊息後停止時,才提供批次數!
有一個命令從 STDIN 讀取資料並將其發佈到 RabbitMQ 佇列。要先使用它,您必須在設定檔中設定producer
服務,如下所示:
producers :
words :
connection : default
exchange_options : {name: 'words', type: direct}
該生產者將向words
直接交換發布訊息。當然,您可以根據自己的喜好調整配置。
然後,假設您想要發佈一些 XML 檔案的內容,以便由消費者群處理它們。您只需使用以下命令即可發布它們:
$ find vendor/symfony/ -name " *.xml " -print0 | xargs -0 cat | ./app/console rabbitmq:stdin-producer words
這意味著您可以使用普通的 Unix 命令來編寫生成器。
讓我們分解一下這個襯裡:
$ find vendor/symfony/ -name " *.xml " -print0
該命令將查找 symfony 資料夾內的所有.xml
檔案並列印檔案名稱。然後,每個檔名都透過xargs
透過管道傳輸到cat
:
$ xargs -0 cat
最後cat
的輸出直接送到我們的生產者,如下所示:
$ ./app/console rabbitmq:stdin-producer words
它只需要一個參數,也就是您在config.yml
檔案中配置的生產者的名稱。
該捆綁包的目的是讓您的應用程式產生訊息並將其發佈到您配置的某些交換器。
在某些情況下,即使您的配置正確,您產生的訊息也不會被路由到任何佇列,因為不存在任何佇列。負責隊列消費的消費者必須運作才能創建隊列。
當消費者數量較多時,為每個消費者啟動命令可能是一場噩夢。
為了立即建立交換器、佇列和綁定並確保不會遺失任何訊息,您可以執行以下命令:
$ ./app/console rabbitmq:setup-fabric
如果需要,您可以設定消費者和生產者以假設 RabbitMQ 結構已定義。為此,請將以下內容新增至您的配置:
producers :
upload_picture :
auto_setup_fabric : false
consumers :
upload_picture :
auto_setup_fabric : false
預設情況下,消費者或生產者將在 RabbitMQ 啟動時聲明其所需的一切。使用這個要小心,當沒有定義交換器或佇列時,會出現錯誤。當您更改任何配置時,您需要執行上述 setup-fabric 命令來聲明您的配置。
要做出貢獻,只需使用您的新程式碼開啟 Pull 請求,並考慮到如果您新增功能或修改現有功能,您必須在本自述文件中記錄它們的作用。如果你破壞了 BC,那麼你也必須記錄下來。您還必須更新 CHANGELOG。所以:
請參閱:resources/meta/LICENSE.md
套件結構和文件部分基於RedisBundle