RabbitMqBundle
使用 php-amqplib 库通过 RabbitMQ 将消息传递合并到您的应用程序中。
该包实现了 Thumper 库中所示的多种消息传递模式。因此,从 Symfony 控制器向 RabbitMQ 发布消息非常简单:
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ));
稍后,当您想要使用upload_pictures
队列中的 50 条消息时,只需在 CLI 上运行:
$ ./app/console rabbitmq:consumer -m 50 upload_picture
所有示例都需要一个正在运行的 RabbitMQ 服务器。
该捆绑包在 Symfony Live Paris 2011 会议上发布。请参阅此处的幻灯片。
由于 Symfony >=4.4 引起的重大更改,发布了一个新标签,使该包与 Symfony >=4.4 兼容。
需要 Composer 的包及其依赖项:
$ composer require php-amqplib/rabbitmq-bundle
注册捆绑包:
// app/AppKernel.php
public function registerBundles ()
{
$ bundles = array (
new OldSound RabbitMqBundle OldSoundRabbitMqBundle (),
);
}
享受 !
如果您有一个用于运行 RabbitMQ 使用者的控制台应用程序,则不需要 Symfony HttpKernel 和 FrameworkBundle。从版本 1.6 开始,您可以使用依赖注入组件加载此捆绑包配置和服务,然后使用消费者命令。
在您的composer.json 文件中需要该包:
{
"require": {
"php-amqplib/rabbitmq-bundle": "^2.0",
}
}
注册扩展和编译器通行证:
use OldSound RabbitMqBundle DependencyInjection OldSoundRabbitMqExtension ;
use OldSound RabbitMqBundle DependencyInjection Compiler RegisterPartsPass ;
// ...
$ containerBuilder -> registerExtension ( new OldSoundRabbitMqExtension ());
$ containerBuilder -> addCompilerPass ( new RegisterPartsPass ());
自 2012 年 6 月 4 日起,“生产者”配置部分中声明的交易所的一些默认选项已更改,以匹配“消费者”部分中声明的交易所的默认选项。受影响的设置有:
durable
由false
变为true
,auto_delete
已从true
更改为false
。如果您依赖以前的默认值,则必须更新您的配置。
自 2012-04-24 起 ConsumerInterface::execute 方法签名已更改
自 2012 年 1 月 3 日起,消费者执行方法获取整个 AMQP 消息对象,而不仅仅是正文。有关更多详细信息,请参阅 CHANGELOG 文件。
在配置文件中添加old_sound_rabbit_mq
部分:
old_sound_rabbit_mq :
connections :
default :
host : ' localhost '
port : 5672
user : ' guest '
password : ' guest '
vhost : ' / '
lazy : false
connection_timeout : 3
read_write_timeout : 3
# the timeout when waiting for a response from rabbitMQ (0.0 means waits forever)
channel_rpc_timeout : 0.0
# requires php-amqplib v2.4.1+ and PHP5.4+
keepalive : false
# requires php-amqplib v2.4.1+
heartbeat : 0
# requires php_sockets.dll
use_socket : true # default false
login_method : ' AMQPLAIN ' # default 'AMQPLAIN', can be 'EXTERNAL' or 'PLAIN', see https://www.rabbitmq.com/docs/access-control#mechanisms
another :
# A different (unused) connection defined by an URL. One can omit all parts,
# except the scheme (amqp:). If both segment in the URL and a key value (see above)
# are given the value from the URL takes precedence.
# See https://www.rabbitmq.com/uri-spec.html on how to encode values.
url : ' amqp://guest:password@localhost:5672/vhost?lazy=1&connection_timeout=6 '
producers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
service_alias : my_app_service # no alias by default
default_routing_key : ' optional.routing.key ' # defaults to '' if not set
default_content_type : ' content/type ' # defaults to 'text/plain'
default_delivery_mode : 2 # optional. 1 means non-persistent, 2 means persistent. Defaults to "2".
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
options :
no_ack : false # optional. If set to "true", automatic acknowledgement mode will be used by this consumer. Default "false". See https://www.rabbitmq.com/confirms.html for details.
在这里,我们配置应用程序将具有的连接服务和消息端点。在此示例中,您的服务容器将包含服务old_sound_rabbit_mq.upload_picture_producer
Producer 和old_sound_rabbit_mq.upload_picture_consumer
。后者期望有一个名为upload_picture_service
的服务。
如果没有为客户端指定连接,客户端将寻找具有相同别名的连接。因此,对于我们的upload_picture
服务容器将寻找upload_picture
连接。
如果您需要添加可选的队列参数,那么您的队列选项可以是这样的:
queue_options : {name: 'upload-picture', arguments: {'x-ha-policy': ['S', 'all']}}
消息 TTL 为 20 秒的另一个示例:
queue_options : {name: 'upload-picture', arguments: {'x-message-ttl': ['I', 20000]}}
参数值必须是数据类型和值的列表。有效的数据类型是:
S
- 字符串I
整数D
- 十进制T
- 时间戳F
表A
- 数组t
- 布尔根据您的需要调整arguments
。
如果你想将队列与特定的路由键绑定,你可以在生产者或消费者配置中声明它:
queue_options :
name : " upload-picture "
routing_keys :
- ' android.#.upload '
- ' iphone.upload '
在 Symfony 环境中,所有服务都会针对每个请求完全引导,从版本 >= 4.3 开始,您可以将服务声明为惰性服务(Lazy Services)。该捆绑包仍然不支持新的延迟服务功能,但您可以在连接配置中设置lazy: true
以避免每个请求中与消息代理的不必要连接。由于性能原因,强烈建议使用惰性连接,尽管如此,默认情况下禁用惰性选项,以避免已经使用此捆绑包的应用程序可能出现中断。
最好将read_write_timeout
设置为心跳的 2 倍,这样您的套接字就会打开。如果您不这样做,或者使用不同的乘数,消费者套接字就有超时的风险。
请记住,如果您的任务运行时间通常超过心跳周期,那么您可能会遇到问题,并且没有好的解决方案(链接)。考虑对心跳使用较大的值,或者禁用心跳以支持 tcp 的keepalive
(在客户端和服务器端)和graceful_max_execution_timeout
功能。
您可以为一个连接提供多个主机。这将允许您使用具有多个节点的 RabbitMQ 集群。
old_sound_rabbit_mq :
connections :
default :
hosts :
- host : host1
port : 3672
user : user1
password : password1
vhost : vhost1
- url : ' amqp://guest:password@localhost:5672/vhost '
connection_timeout : 3
read_write_timeout : 3
注意不能指定
connection_timeout
read_write_timeout
use_socket
ssl_context
keepalive
heartbeat
connection_parameters_provider
分别给每个主机设置参数。
有时您的连接信息可能需要是动态的。动态连接参数允许您通过服务以编程方式提供或覆盖参数。
例如,在连接的vhost
参数取决于白标应用程序的当前租户并且您不希望(或不能)每次都更改其配置的情况下。
在connection_parameters_provider
下定义一个实现ConnectionParametersProviderInterface
的服务,并将其添加到适当的connections
配置中。
connections :
default :
host : ' localhost '
port : 5672
user : ' guest '
password : ' guest '
vhost : ' foo ' # to be dynamically overridden by `connection_parameters_provider`
connection_parameters_provider : connection_parameters_provider_service
实施示例:
class ConnectionParametersProviderService implements ConnectionParametersProvider {
...
public function getConnectionParameters () {
return array ( ' vhost ' => $ this -> getVhost ());
}
. . .
}
在这种情况下, vhost
参数将被getVhost()
的输出覆盖。
在消息传递应用程序中,向代理发送消息的进程称为生产者,而接收这些消息的进程称为消费者。在您的应用程序中,您将拥有其中的几个,您可以在配置中各自的条目下列出它们。
生产者将用于向服务器发送消息。在 AMQP 模型中,消息被发送到交换器,这意味着在生产者的配置中,您必须指定连接选项以及交换选项,通常是交换器的名称及其类型。
现在假设您要在后台处理图片上传。将图片移动到最终位置后,您将向服务器发布一条消息,其中包含以下信息:
public function indexAction ( $ name )
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ));
}
如您所见,如果在您的配置中您有一个名为upload_picture的生产者,那么在服务容器中您将有一个名为old_sound_rabbit_mq.upload_picture_ Producer 的服务。
除了消息本身之外, OldSoundRabbitMqBundleRabbitMqProducer#publish()
方法还接受可选的路由键参数和可选的附加属性数组。附加属性数组允许您更改默认构造PhpAmqpLibMessageAMQPMessage
对象的属性。例如,您可以通过这种方式更改应用程序标头。
您可以使用setContentType和setDeliveryMode方法分别设置消息内容类型和消息传递模式,覆盖“生产者”配置部分中的任何默认设置。如果没有被“生产者”配置或对这些方法的显式调用(如下例所示)覆盖,则内容类型的默认值为text/plain ,交付模式的默认值为 2 。
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> setContentType ( ' application/json ' );
如果您需要为生产者使用自定义类(应继承自OldSoundRabbitMqBundleRabbitMqProducer
),您可以使用class
选项:
...
producers :
upload_picture :
class : MyCustomProducer
connection : default
exchange_options : {name: 'upload-picture', type: direct}
...
下一个难题是让消费者将消息从队列中取出并进行相应的处理。
当前生产者发出了两个事件。
此事件在发布消息之前立即发生。这是一个很好的钩子,可以在实际发送消息之前进行任何最终日志记录、验证等。监听器的示例实现:
namespace App EventListener ;
use OldSound RabbitMqBundle Event BeforeProducerPublishMessageEvent ;
use Symfony Component EventDispatcher Attribute AsEventListener ;
#[AsEventListener(event: BeforeProducerPublishMessageEvent:: NAME )]
final class AMQPBeforePublishEventListener
{
public function __invoke ( BeforeProducerPublishMessageEvent $ event ): void
{
// Your code goes here
}
}
该事件在发布消息后立即发生。这是一个很好的钩子,可以在实际发送消息后进行任何确认日志记录、提交等。监听器的示例实现:
namespace App EventListener ;
use OldSound RabbitMqBundle Event AfterProducerPublishMessageEvent ;
use Symfony Component EventDispatcher Attribute AsEventListener ;
#[AsEventListener(event: AfterProducerPublishMessageEvent:: NAME )]
final class AMQPBeforePublishEventListener
{
public function __invoke ( AfterProducerPublishMessageEvent $ event ): void
{
// Your code goes here
}
}
消费者将连接到服务器并启动一个循环,等待处理传入的消息。其行为将取决于此类消费者的指定回调。让我们回顾一下上面的消费者配置:
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
正如我们所看到的,回调选项引用了upload_picture_service 。当消费者从服务器收到消息时,它将执行此类回调。如果出于测试或调试目的,您需要指定不同的回调,那么您可以在那里更改它。
除了回调之外,我们还指定要使用的连接,就像我们使用生产者一样。剩下的选项是exchange_options和queue_options 。 Exchange_options应该与Producer使用的选项相同。在queue_options中,我们将提供一个队列名称。为什么?
正如我们所说,AMQP 中的消息被发布到交换器。这并不意味着消息已到达队列。为此,首先我们需要创建这样的队列,然后将其绑定到交换器。这样做的一个很酷的事情是,您可以将多个队列绑定到一个交换器,这样一条消息可以到达多个目的地。这种方法的优点是与生产者和消费者解耦。生产者并不关心有多少消费者会处理他的消息。它所需要的只是他的消息到达服务器。通过这种方式,我们可以扩展每次上传图片时执行的操作,而无需更改控制器中的代码。
现在,如何运行消费者?有一个命令可以像这样执行:
$ ./app/console rabbitmq:consumer -m 50 upload_picture
这意味着什么?我们正在执行upload_picture消费者,告诉它只消费 50 条消息。每次消费者从服务器收到消息时,它将执行配置的回调,将 AMQP 消息作为PhpAmqpLibMessageAMQPMessage
类的实例传递。消息正文可以通过调用$msg->body
获取。默认情况下,消费者将在无限循环中处理消息,以实现某些无尽的定义。
如果你想确保消费者在 Unix 信号上立即完成执行,你可以使用-w
标志运行命令。
$ ./app/console rabbitmq:consumer -w upload_picture
那么消费者就会立即完成执行。
要使用带有此标志的命令,您需要安装带有 PCNTL 扩展的 PHP。
如果您想建立消费者内存限制,可以使用标志-l
来完成。在以下示例中,此标志添加了 256 MB 内存限制。为了避免 PHP 允许的内存大小错误,消费者将在达到 256MB 之前停止 5MB。
$ ./app/console rabbitmq:consumer -l 256
如果要删除队列中等待的所有消息,可以执行以下命令来清除该队列:
$ ./app/console rabbitmq:purge --no-confirmation upload_picture
要删除消费者的队列,请使用以下命令:
$ ./app/console rabbitmq:delete --no-confirmation upload_picture
这在许多场景中都很有用。有 3 个 AMQP 事件:
class OnConsumeEvent extends AMQPEvent
{
const NAME = AMQPEvent:: ON_CONSUME ;
/**
* OnConsumeEvent constructor.
*
* @param Consumer $consumer
*/
public function __construct ( Consumer $ consumer )
{
$ this -> setConsumer ( $ consumer );
}
}
s say you need to sleep / stop consumer/s on a new application deploy. You can listen for
OldSoundRabbitMqBundleEventOnConsumeEvent` 并检查新应用程序部署。
class BeforeProcessingMessageEvent extends AMQPEvent
{
const NAME = AMQPEvent:: BEFORE_PROCESSING_MESSAGE ;
/**
* BeforeProcessingMessageEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer , AMQPMessage $ AMQPMessage )
{
$ this -> setConsumer ( $ consumer );
$ this -> setAMQPMessage ( $ AMQPMessage );
}
}
在处理AMQPMessage
之前引发的事件。
class AfterProcessingMessageEvent extends AMQPEvent
{
const NAME = AMQPEvent:: AFTER_PROCESSING_MESSAGE ;
/**
* AfterProcessingMessageEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer , AMQPMessage $ AMQPMessage )
{
$ this -> setConsumer ( $ consumer );
$ this -> setAMQPMessage ( $ AMQPMessage );
}
}
处理AMQPMessage
后引发的事件。如果进程消息将抛出异常,则事件将不会引发。
<?php
class OnIdleEvent extends AMQPEvent
{
const NAME = AMQPEvent:: ON_IDLE ;
/**
* OnIdleEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer )
{
$ this -> setConsumer ( $ consumer );
$ this -> forceStop = true ;
}
}
当wait
方法因超时退出而未收到消息时引发事件。为了利用此事件,必须配置消费者idle_timeout
。默认情况下,进程在空闲超时时退出,您可以通过在侦听器中设置$event->setForceStop(false)
来阻止它。
如果您需要在一段时间内队列没有消息时设置超时,可以设置idle_timeout
(以秒为单位)。 idle_timeout_exit_code
指定当发生空闲超时时消费者应返回什么退出代码。如果不指定它,消费者将抛出PhpAmqpLibExceptionAMQPTimeoutException
异常。
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
idle_timeout : 60
idle_timeout_exit_code : 0
设置timeout_wait
以秒为单位。 timeout_wait
指定在确保当前连接仍然有效之前,消费者在没有收到新消息的情况下将等待多长时间。
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
idle_timeout : 60
idle_timeout_exit_code : 0
timeout_wait : 10
如果您希望消费者运行到一定时间然后正常退出,请设置graceful_max_execution.timeout
(以秒为单位)。 “优雅退出”意味着消费者将在当前正在运行的任务之后退出,或者在等待新任务时立即退出。 graceful_max_execution.exit_code
指定当发生优雅最大执行超时时,使用者应返回什么退出代码。如果不指定它,消费者将以状态0
退出。
此功能与 Supervisord 结合使用非常有用,它们一起可以允许定期内存泄漏清理、与数据库/rabbitmq 更新的连接等。
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
graceful_max_execution :
timeout : 1800 # 30 minutes
exit_code : 10 # default is 0
您可能已经注意到,调度仍然没有完全按照我们想要的方式工作。例如,在有两名工作人员的情况下,当所有奇数消息都很重而偶数消息都很轻时,一名工作人员将一直忙碌,而另一名工作人员几乎不会做任何工作。好吧,RabbitMQ 对此一无所知,并且仍然会均匀地分发消息。
发生这种情况是因为 RabbitMQ 只是在消息进入队列时才调度该消息。它不会查看消费者未确认消息的数量。它只是盲目地将每条第 n 条消息分派给第 n 个消费者。
为了解决这个问题,我们可以使用 basic.qos 方法和 prefetch_count=1 设置。这告诉 RabbitMQ 不要一次给一个工作线程多于一条消息。或者,换句话说,在工作人员处理并确认前一条消息之前,不要向工作人员发送新消息。相反,它会将其分派给下一个不忙的工作人员。
来自:http://www.rabbitmq.com/tutorials/tutorial-two-python.html
请小心,因为实施公平调度会引入延迟,从而损害性能(请参阅此博文)。但实现它允许您随着队列的增加而动态地水平扩展。您应该按照博客文章的建议,根据处理每条消息所需的时间和网络性能来评估 prefetch_size 的正确值。
使用 RabbitMqBundle,您可以为每个消费者配置 qos_options,如下所示:
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
qos_options : {prefetch_size: 0, prefetch_count: 1, global: false}
如果与Symfony 4.2+一起使用,则在容器中声明生产者和普通消费者的别名集。这些用于基于声明的类型和参数名称的参数自动装配。这允许您将之前的生产者示例更改为:
public function indexAction ( $ name , ProducerInterface $ uploadPictureProducer )
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ uploadPictureProducer -> publish ( serialize ( $ msg ));
}
参数名称由配置中的生产者或消费者名称构造而成,并根据类型添加生产者或消费者单词后缀。与容器项命名约定相反,如果名称已经带有后缀,则单词后缀(生产者或消费者)将不会重复。 upload_picture
生产者键将更改为$uploadPictureProducer
参数名称。 upload_picture_producer
Producer 生产者密钥也可以别名为$uploadPictureProducer
参数名称。最好避免以这种方式使用相似的名称。
所有生产者都被别名为OldSoundRabbitMqBundleRabbitMqProducerInterface
和配置中的生产者类选项。在沙盒模式下,仅创建ProducerInterface
别名。强烈建议在为生产者注入键入提示参数时使用ProducerInterface
类。
所有使用者都别名为OldSoundRabbitMqBundleRabbitMqConsumerInterface
和%old_sound_rabbit_mq.consumer.class%
配置选项值。常规模式和沙盒模式没有区别。强烈建议在为客户端注入键入提示参数时使用ConsumerInterface
。
这是一个回调示例:
<?php
//src/Acme/DemoBundle/Consumer/UploadPictureConsumer.php
namespace Acme DemoBundle Consumer ;
use OldSound RabbitMqBundle RabbitMq ConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class UploadPictureConsumer implements ConsumerInterface
{
public function execute ( AMQPMessage $ msg )
{
//Process picture upload.
//$msg will be an instance of `PhpAmqpLibMessageAMQPMessage` with the $msg->body being the data sent over RabbitMQ.
$ isUploadSuccess = someUploadPictureMethod ();
if (! $ isUploadSuccess ) {
// If your image upload failed due to a temporary error you can return false
// from your callback so the message will be rejected by the consumer and
// requeued by RabbitMQ.
// Any other value not equal to false will acknowledge the message and remove it
// from the queue
return false ;
}
}
}
正如您所看到的,这就像实现一个方法一样简单: ConsumerInterface::execute 。
请记住,您的回调需要注册为普通的 Symfony 服务。您可以在那里注入服务容器、数据库服务、Symfony 记录器等。
有关消息实例组成部分的更多详细信息,请参阅 https://github.com/php-amqplib/php-amqplib/blob/master/doc/AMQPMessage.md。
要停止消费者,回调可以抛出StopConsumerException
(最后消费的消息不会被确认)或AckStopConsumerException
(消息将被确认)。如果使用妖魔化,例如:主管,消费者实际上将重新启动。
对于发送消息来说,这似乎是相当多的工作,让我们回顾一下以获得更好的概述。这就是我们生成/消费消息所需要的:
就是这样!
这是对接收/发布的消息进行跟踪的要求。为了启用此功能,您需要向消费者或发布者添加enable_logger
配置。
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
enable_logger : true
如果您愿意,还可以通过引用通道phpamqplib
在独白中处理来自具有不同处理程序的队列的日志记录。
到目前为止,我们只是向消费者发送了消息,但是如果我们想得到他们的回复怎么办?为了实现这一点,我们必须在我们的应用程序中实现 RPC 调用。这个捆绑包使得使用 Symfony 实现这些事情变得非常容易。
让我们在配置中添加 RPC 客户端和服务器:
rpc_clients :
integer_store :
connection : default # default: default
unserializer : json_decode # default: unserialize
lazy : true # default: false
direct_reply_to : false
rpc_servers :
random_int :
connection : default
callback : random_int_server
qos_options : {prefetch_size: 0, prefetch_count: 1, global: false}
exchange_options : {name: random_int, type: topic}
queue_options : {name: random_int_queue, durable: false, auto_delete: true}
serializer : json_encode
如需完整配置参考,请使用php app/console config:dump-reference old_sound_rabbit_mq
命令。
这里我们有一个非常有用的服务器:它向客户端返回随机整数。用于处理请求的回调将是random_int_server服务。现在让我们看看如何从控制器调用它。
首先我们必须从命令行启动服务器:
$ ./app/console_dev rabbitmq:rpc-server random_int
然后将以下代码添加到我们的控制器中:
public function indexAction ( $ name )
{
$ client = $ this -> get ( ' old_sound_rabbit_mq.integer_store_rpc ' );
$ client -> addRequest ( serialize ( array ( ' min ' => 0 , ' max ' => 10 )), ' random_int ' , ' request_id ' );
$ replies = $ client -> getReplies ();
}
正如您所看到的,如果我们的客户端 ID 是integer_store ,那么服务名称将为old_sound_rabbit_mq.integer_store_rpc 。获得该对象后,我们通过调用需要三个参数的addRequest
向服务器发出请求:
我们发送的参数是rand()
函数的最小值和最大值。我们通过序列化数组来发送它们。如果我们的服务器需要 JSON 信息或 XML,我们将在此处发送此类数据。
最后一步是得到回复。我们的 PHP 脚本将阻塞,直到服务器返回一个值。 $replies变量将是一个关联数组,其中来自服务器的每个回复都将包含在相应的request_id键中。
默认情况下,RPC 客户端希望响应被序列化。如果您正在使用的服务器返回非序列化结果,则将 RPC 客户端expect_serialized_response
选项设置为 false。例如,如果integer_store服务器没有序列化结果,客户端将设置如下:
rpc_clients :
integer_store :
connection : default
expect_serialized_response : false
您还可以设置请求的过期时间(以毫秒为单位),之后服务器将不再处理消息,客户端请求将简单地超时。设置消息过期仅适用于 RabbitMQ 3.x 及更高版本。访问 http://www.rabbitmq.com/ttl.html#per-message-ttl 了解更多信息。
public function indexAction ( $ name )
{
$ expiration = 5000 ; // milliseconds
$ client = $ this -> get ( ' old_sound_rabbit_mq.integer_store_rpc ' );
$ client -> addRequest ( $ body , $ server , $ requestId , $ routingKey , $ expiration );
try {
$ replies = $ client -> getReplies ();
// process $replies['request_id'];
} catch ( PhpAmqpLib Exception AMQPTimeoutException $ e ) {
// handle timeout
}
}
正如您所猜到的,我们还可以进行并行 RPC 调用。
假设为了渲染某个网页,您需要执行两次数据库查询,一次需要 5 秒才能完成,另一次需要 2 秒——非常昂贵的查询——。如果您按顺序执行它们,那么您的页面将在大约 7 秒内准备好交付。如果并行运行它们,那么您的页面将在大约 5 秒内得到服务。使用RabbitMqBundle
我们可以轻松地进行此类并行调用。让我们在配置中定义一个并行客户端和另一个 RPC 服务器:
rpc_clients :
parallel :
connection : default
rpc_servers :
char_count :
connection : default
callback : char_count_server
random_int :
connection : default
callback : random_int_server
然后这段代码应该进入我们的控制器:
public function indexAction ( $ name )
{
$ client = $ this -> get ( ' old_sound_rabbit_mq.parallel_rpc ' );
$ client -> addRequest ( $ name , ' char_count ' , ' char_count ' );
$ client -> addRequest ( serialize ( array ( ' min ' => 0 , ' max ' => 10 )), ' random_int ' , ' random_int ' );
$ replies = $ client -> getReplies ();
}
与前面的示例非常相似,我们只是有一个额外的addRequest
调用。此外,我们还提供了有意义的请求标识符,以便稍后我们可以更轻松地在$replies数组中找到我们想要的回复。
要启用直接回复客户端,您只需在客户端的rpc_clients配置上启用选项direct_reply_to 。
该选项在进行 RPC 调用时将使用伪队列amq.rabbitmq.reply-to 。在 RPC 服务器上无需进行任何修改。
从版本 3.5.0 开始,RabbitMQ 在核心中实现了优先级队列。任何队列都可以使用客户端提供的可选参数转换为优先队列(但是,与使用可选参数而不是策略的其他功能不同)。该实现支持有限数量的优先级:255 个。建议使用 1 到 10 之间的值。检查文档
这是声明优先级队列的方法
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture', arguments: {'x-max-priority': ['I', 10]} }
callback : upload_picture_service
如果upload-picture
队列存在,则必须在运行rabbitmq:setup-fabric
命令之前删除该队列
现在假设您想要发布一条具有高优先级的消息,您必须使用此附加信息来发布该消息
public function indexAction ()
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ additionalProperties = [ ' priority ' => 10 ] ;
$ routing_key = '' ;
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ), $ routing_key , $ additionalProperties );
}
有很多队列用于逻辑分离是一个很好的实践。对于一个简单的消费者,您必须为每个队列创建一个工作人员(消费者),并且在处理许多演变时可能很难管理(忘记在supervisord配置中添加一行?)。这对于小型队列也很有用,因为您可能不希望拥有与队列一样多的工作人员,并且希望在不失去灵活性和分离原则的情况下将一些任务重新组合在一起。
多个消费者允许您通过侦听同一消费者上的多个队列来处理此用例。
以下是如何设置具有多个队列的消费者:
multiple_consumers :
upload :
connection : default
exchange_options : {name: 'upload', type: direct}
queues_provider : queues_provider_service
queues :
upload-picture :
name : upload_picture
callback : upload_picture_service
routing_keys :
- picture
upload-video :
name : upload_video
callback : upload_video_service
routing_keys :
- video
upload-stats :
name : upload_stats
callback : upload_stats
现在,回调在每个队列下指定,并且必须像简单的消费者一样实现ConsumerInterface
。消费者中queues-options
的所有选项对于每个队列都是可用的。
请注意,所有队列都位于同一个交换器下,您可以为回调设置正确的路由。
queues_provider
是动态提供队列的可选服务。它必须实现QueuesProviderInterface
。
请注意,队列提供者负责对setDequeuer
的正确调用,并且回调是可调用的(而不是ConsumerInterface
)。如果服务提供队列实现DequeuerAwareInterface
,则对setDequeuer
调用将添加到服务的定义中,并且DequeuerInterface
当前是MultipleConsumer
。
您可能会发现您的应用程序具有复杂的工作流程,并且您需要进行任意绑定。任意绑定场景可能包括通过destination_is_exchange
属性进行交换到交换的绑定。
bindings :
- {exchange: foo, destination: bar, routing_key: 'baz.*' }
- {exchange: foo1, destination: foo, routing_key: 'baz.*', destination_is_exchange: true}
在创建任意绑定之前,rabbitmq:setup-fabric 命令将声明生产者、消费者和多消费者配置中定义的交换和队列。但是,rabbitmq:setup-fabric不会声明绑定中定义的附加队列和交换器。由您来确保声明交换/队列。
有时您必须即时更改消费者的配置。动态消费者允许您根据上下文以编程方式定义消费者队列选项。
例如,在定义的使用者必须负责动态数量的主题并且您不希望(或不能)每次都更改其配置的情况下。
定义一个实现QueueOptionsProviderInterface
的服务queue_options_provider
,并将其添加到您的dynamic_consumers
配置中。
dynamic_consumers :
proc_logs :
connection : default
exchange_options : {name: 'logs', type: topic}
callback : parse_logs_service
queue_options_provider : queue_options_provider_service
用法示例:
$ ./app/console rabbitmq:dynamic-consumer proc_logs server1
在这种情况下, proc_logs
消费者为server1
运行,它可以决定它使用的队列选项。
现在,为什么我们需要匿名消费者?这听起来像是某种互联网威胁之类的……继续阅读。
在 AMQP 中,有一种称为主题的交换类型,其中消息根据(您猜的)消息主题路由到队列。我们可以使用创建日志的主机名以及此类日志的严重性作为主题,将有关应用程序的日志发送到 RabbiMQ 主题交换。消息正文将是日志内容,我们的路由键将如下所示:
由于我们不想用无限的日志填充队列,所以我们可以做的是,当我们想要监视系统时,我们可以启动一个消费者,它创建一个队列并根据某个主题附加到日志交换,例如,我们希望看到我们的服务器报告的所有错误。路由键类似于: #.error 。在这种情况下,我们必须提供一个队列名称,将其绑定到交换器,获取日志,解除绑定并删除队列。令人高兴的是,如果您在声明和绑定队列时提供正确的选项,AMPQ 提供了一种自动执行此操作的方法。问题是您不想记住所有这些选项。出于这个原因,我们实现了匿名消费者模式。
当我们启动匿名消费者时,它会处理这些细节,我们只需要考虑在消息到达时实现回调。之所以称为 Anonymous,是因为它不会指定队列名称,但会等待 RabbitMQ 为其分配一个随机名称。
现在,如何配置和运行这样的消费者?
anon_consumers :
logs_watcher :
connection : default
exchange_options : {name: 'app-logs', type: topic}
callback : logs_watcher
在那里,我们指定交换名称及其类型以及消息到达时应执行的回调。
这个匿名消费者现在能够收听生产者的声音,这些生产者链接到相同的交换和主题类型:
producers :
app_logs :
connection : default
exchange_options : {name: 'app-logs', type: topic}
要启动匿名消费者,我们使用以下命令:
$ ./app/console_dev rabbitmq:anon-consumer -m 5 -r ' #.error ' logs_watcher
与我们之前见过的命令相比,唯一的新选项是指定路由键的选项: -r '#.error'
。
在某些情况下,您需要获取一批消息,然后对所有消息进行一些处理。批处理消费者将允许您定义此类处理的逻辑。
例如:想象一下,您有一个队列,您收到一条消息,需要在数据库中插入一些信息,并且您意识到,如果进行批量插入,则比逐一插入要好得多。
定义一个实现BatchConsumerInterface
的回调服务,并将使用者的定义添加到您的配置中。
batch_consumers :
batch_basic_consumer :
connection : default
exchange_options : {name: 'batch', type: fanout}
queue_options : {name: 'batch'}
callback : batch.basic
qos_options : {prefetch_size: 0, prefetch_count: 2, global: false}
timeout_wait : 5
auto_setup_fabric : false
idle_timeout_exit_code : -2
keep_alive : false
graceful_max_execution :
timeout : 60
注意:如果keep_alive
选项设置为true
, idle_timeout_exit_code
将被忽略,消费者进程将继续。
您可以实现一个批处理消费者,它将在一次返回中确认所有消息,或者您可以控制要确认的消息。
namespace AppBundle Service ;
use OldSound RabbitMqBundle RabbitMq BatchConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class DevckBasicConsumer implements BatchConsumerInterface
{
/**
* @inheritDoc
*/
public function batchExecute ( array $ messages )
{
echo sprintf ( ' Doing batch execution%s ' , PHP_EOL );
foreach ( $ messages as $ message ) {
$ this -> executeSomeLogicPerMessage ( $ message );
}
// you ack all messages got in batch
return true ;
}
}
namespace AppBundle Service ;
use OldSound RabbitMqBundle RabbitMq BatchConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class DevckBasicConsumer implements BatchConsumerInterface
{
/**
* @inheritDoc
*/
public function batchExecute ( array $ messages )
{
echo sprintf ( ' Doing batch execution%s ' , PHP_EOL );
$ result = [];
/** @var AMQPMessage $message */
foreach ( $ messages as $ message ) {
$ result [ $ message -> getDeliveryTag ()] = $ this -> executeSomeLogicPerMessage ( $ message );
}
// you ack only some messages that have return true
// e.g:
// $return = [
// 1 => true,
// 2 => true,
// 3 => false,
// 4 => true,
// 5 => -1,
// 6 => 2,
// ];
// The following will happen:
// * ack: 1,2,4
// * reject and requeq: 3
// * nack and requeue: 6
// * reject and drop: 5
return $ result ;
}
}
如何运行以下批处理消费者:
$ ./bin/console rabbitmq:batch:consumer batch_basic_consumer -w
重要提示:BatchConsumers 将不具有可用的-m|messages
选项 重要提示:如果您只想使用特定数量的批次,然后停止使用者,BatchConsumers 还可以使用-b|batches
选项。仅当您希望消费者在消费完这些批次消息后停止时,才提供批次数!
有一个命令从 STDIN 读取数据并将其发布到 RabbitMQ 队列。要首先使用它,您必须在配置文件中配置producer
服务,如下所示:
producers :
words :
connection : default
exchange_options : {name: 'words', type: direct}
该生产者将向words
直接交换发布消息。当然,您可以根据自己的喜好调整配置。
然后,假设您想要发布一些 XML 文件的内容,以便由消费者群处理它们。您只需使用如下命令即可发布它们:
$ find vendor/symfony/ -name " *.xml " -print0 | xargs -0 cat | ./app/console rabbitmq:stdin-producer words
这意味着您可以使用普通的 Unix 命令来编写生成器。
让我们分解一下这个衬里:
$ find vendor/symfony/ -name " *.xml " -print0
该命令将查找 symfony 文件夹内的所有.xml
文件并打印文件名。然后,每个文件名都通过xargs
通过管道传输到cat
:
$ xargs -0 cat
最后cat
的输出直接发送到我们的生产者,如下所示:
$ ./app/console rabbitmq:stdin-producer words
它只需要一个参数,即您在config.yml
文件中配置的生产者的名称。
该捆绑包的目的是让您的应用程序生成消息并将其发布到您配置的某些交换器。
在某些情况下,即使您的配置正确,您生成的消息也不会被路由到任何队列,因为不存在任何队列。负责队列消费的消费者必须运行才能创建队列。
当消费者数量较多时,为每个消费者启动命令可能是一场噩梦。
为了立即创建交换器、队列和绑定并确保不会丢失任何消息,您可以运行以下命令:
$ ./app/console rabbitmq:setup-fabric
如果需要,您可以配置消费者和生产者以假设 RabbitMQ 结构已定义。为此,请将以下内容添加到您的配置中:
producers :
upload_picture :
auto_setup_fabric : false
consumers :
upload_picture :
auto_setup_fabric : false
默认情况下,消费者或生产者将在 RabbitMQ 启动时声明其所需的一切。使用这个要小心,当没有定义交换器或队列时,会出现错误。当您更改任何配置时,您需要运行上述 setup-fabric 命令来声明您的配置。
要做出贡献,只需使用您的新代码打开一个 Pull 请求,并考虑到如果您添加新功能或修改现有功能,您必须在本自述文件中记录它们的作用。如果你破坏了 BC,那么你也必须记录下来。您还必须更新 CHANGELOG。所以:
请参阅:resources/meta/LICENSE.md
包结构和文档部分基于RedisBundle