只有最新版本才会获得新功能。将使用以下方案提供错误修复:
套餐版本 | Laravel 版本 | 错误修复直至 | |
---|---|---|---|
13 | 9 | 2023 年 8 月 8 日 | 文档 |
您可以使用以下命令通过 Composer 安装此软件包:
composer require vladimir-yuldashev/laravel-queue-rabbitmq
该包将自动注册。
添加到config/queue.php
的连接:
这是rabbitMQ 连接/驱动程序工作的最小配置。
' connections ' => [
// ...
' rabbitmq ' => [
' driver ' => ' rabbitmq ' ,
' hosts ' => [
[
' host ' => env ( ' RABBITMQ_HOST ' , ' 127.0.0.1 ' ),
' port ' => env ( ' RABBITMQ_PORT ' , 5672 ),
' user ' => env ( ' RABBITMQ_USER ' , ' guest ' ),
' password ' => env ( ' RABBITMQ_PASSWORD ' , ' guest ' ),
' vhost ' => env ( ' RABBITMQ_VHOST ' , ' / ' ),
],
// ...
],
// ...
],
// ...
],
可以选择将队列选项添加到连接的配置中。为此连接创建的每个队列都会获取属性。
当您想要在延迟时优先处理消息时,可以通过添加额外的选项来实现。
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' options ' => [
' queue ' => [
// ...
' prioritize_delayed ' => false ,
' queue_max_priority ' => 10 ,
],
],
],
// ...
],
当您想要针对带有路由密钥的交换发布消息时,可以通过添加额外的选项来实现。
amq.direct
交换作为路由密钥queue
名称。%s
时,queue_name 将被替换。注意:当使用带有路由密钥的交换时,您可能自己创建带有绑定的队列。
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' options ' => [
' queue ' => [
// ...
' exchange ' => ' application-x ' ,
' exchange_type ' => ' topic ' ,
' exchange_routing_key ' => '' ,
],
],
],
// ...
],
在 Laravel 中,失败的作业被存储到数据库中。但也许您想指示其他进程也对消息执行某些操作。当您想要指示 RabbitMQ 将失败的消息重新路由到交换器或特定队列时,可以通过添加额外的选项来实现。
amq.direct
交换作为路由密钥queue
名称将替换为'.failed'
。%s
时,queue_name 将被替换。注意:当使用 failed_job 交换与路由键时,您可能需要自己创建带有绑定的交换/队列。
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' options ' => [
' queue ' => [
// ...
' reroute_failed ' => true ,
' failed_exchange ' => ' failed-exchange ' ,
' failed_routing_key ' => ' application-x.%s ' ,
],
],
],
// ...
],
从 8.0 开始,该软件包开箱即用地支持 Laravel Horizon。首先,安装 Horizon ,然后将RABBITMQ_WORKER
设置为horizon
。
Horizon 取决于工作人员调度的事件。这些事件通知 Horizon 对消息/作业做了什么。
这个库支持 Horizon,但是在配置中你必须通知 Laravel 使用与 Horizon 兼容的 QueueApi。
' connections ' => [
// ...
' rabbitmq ' => [
// ...
/* Set to "horizon" if you wish to use Laravel Horizon. */
' worker ' => env ( ' RABBITMQ_WORKER ' , ' default ' ),
],
// ...
],
有时您必须处理另一个应用程序发布的消息。
这些消息可能不尊重 Laravel 的作业负载模式。这些消息的问题在于,Laravel 工作人员将无法确定要执行的实际作业或类。
您可以扩展内置的RabbitMQJob::class
,并且在队列连接配置中,您可以定义自己的类。当您在配置中使用您自己的类名指定job
键时,从代理检索的每条消息都将由您自己的类包装。
配置示例:
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' options ' => [
' queue ' => [
// ...
' job ' => App Queue Jobs RabbitMQJob::class,
],
],
],
// ...
],
您自己的工作类别的示例:
<?php
namespace App Queue Jobs ;
use VladimirYuldashev LaravelQueueRabbitMQ Queue Jobs RabbitMQJob as BaseJob ;
class RabbitMQJob extends BaseJob
{
/**
* Fire the job.
*
* @return void
*/
public function fire ()
{
$ payload = $ this -> payload ();
$ class = WhatheverClassNameToExecute::class;
$ method = ' handle ' ;
( $ this -> instance = $ this -> resolve ( $ class ))->{ $ method }( $ this , $ payload );
$ this -> delete ();
}
}
或者您可能想向有效负载添加额外的属性:
<?php
namespace App Queue Jobs ;
use VladimirYuldashev LaravelQueueRabbitMQ Queue Jobs RabbitMQJob as BaseJob ;
class RabbitMQJob extends BaseJob
{
/**
* Get the decoded body of the job.
*
* @return array
*/
public function payload ()
{
return [
' job ' => ' WhatheverFullyQualifiedClassNameToExecute@handle ' ,
' data ' => json_decode ( $ this -> getRawBody (), true )
];
}
}
如果您想处理原始消息,而不是 JSON 格式或 JSON 中没有“job”键,您应该为getName
方法添加存根:
<?php
namespace App Queue Jobs ;
use Illuminate Support Facades Log ;
use VladimirYuldashev LaravelQueueRabbitMQ Queue Jobs RabbitMQJob as BaseJob ;
class RabbitMQJob extends BaseJob
{
public function fire ()
{
$ anyMessage = $ this -> getRawBody ();
Log:: info ( $ anyMessage );
$ this -> delete ();
}
public function getName ()
{
return '' ;
}
}
您可以扩展内置的PhpAmqpLibConnectionAMQPStreamConnection::class
或PhpAmqpLibConnectionAMQPSLLConnection::class
并在连接配置中,您可以定义自己的类。当您在配置中使用您自己的类名指定connection
键时,每个连接都将使用您自己的类。
配置示例:
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' connection ' = > App Queue Connection MyRabbitMQConnection::class,
],
// ...
],
如果您想使用自己的RabbitMQQueue::class
可以通过扩展VladimirYuldashevLaravelQueueRabbitMQQueueRabbitMQQueue
来实现。并通过将RABBITMQ_WORKER
设置为AppQueueRabbitMQQueue::class
来通知 laravel 使用您的类。
注意:Worker 类必须扩展
VladimirYuldashevLaravelQueueRabbitMQQueueRabbitMQQueue
' connections ' => [
// ...
' rabbitmq ' => [
// ...
/* Set to a class if you wish to use your own. */
' worker ' => App Queue RabbitMQQueue::class,
],
// ...
],
<?php
namespace App Queue ;
use VladimirYuldashev LaravelQueueRabbitMQ Queue RabbitMQQueue as BaseRabbitMQQueue ;
class RabbitMQQueue extends BaseRabbitMQQueue
{
// ...
}
例如:重新连接实现。
如果要重新连接到 RabbitMQ,如果连接已断开。您可以重写发布和createChannel 方法。
注意:这不是最佳实践,而是一个示例。
<?php
namespace App Queue ;
use PhpAmqpLib Exception AMQPChannelClosedException ;
use PhpAmqpLib Exception AMQPConnectionClosedException ;
use VladimirYuldashev LaravelQueueRabbitMQ Queue RabbitMQQueue as BaseRabbitMQQueue ;
class RabbitMQQueue extends BaseRabbitMQQueue
{
protected function publishBasic ( $ msg , $ exchange = '' , $ destination = '' , $ mandatory = false , $ immediate = false , $ ticket = null ): void
{
try {
parent :: publishBasic ( $ msg , $ exchange , $ destination , $ mandatory , $ immediate , $ ticket );
} catch ( AMQPConnectionClosedException | AMQPChannelClosedException ) {
$ this -> reconnect ();
parent :: publishBasic ( $ msg , $ exchange , $ destination , $ mandatory , $ immediate , $ ticket );
}
}
protected function publishBatch ( $ jobs , $ data = '' , $ queue = null ): void
{
try {
parent :: publishBatch ( $ jobs , $ data , $ queue );
} catch ( AMQPConnectionClosedException | AMQPChannelClosedException ) {
$ this -> reconnect ();
parent :: publishBatch ( $ jobs , $ data , $ queue );
}
}
protected function createChannel (): AMQPChannel
{
try {
return parent :: createChannel ();
} catch ( AMQPConnectionClosedException ) {
$ this -> reconnect ();
return parent :: createChannel ();
}
}
}
当 laravel 未提供队列时,连接确实使用值为“default”的默认队列。可以通过在连接配置中添加额外参数来更改默认队列。
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' queue ' => env ( ' RABBITMQ_QUEUE ' , ' default ' ),
],
// ...
],
默认情况下,您的连接将使用心跳设置0
创建。您可以通过更改配置来更改心跳设置。
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' options ' => [
// ...
' heartbeat ' => 10 ,
],
],
// ...
],
如果您需要与rabbitMQ服务器的安全连接,则需要添加这些额外的配置选项。
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' secure ' = > true ,
' options ' => [
// ...
' ssl_options ' => [
' cafile ' => env ( ' RABBITMQ_SSL_CAFILE ' , null ),
' local_cert ' => env ( ' RABBITMQ_SSL_LOCALCERT ' , null ),
' local_key ' => env ( ' RABBITMQ_SSL_LOCALKEY ' , null ),
' verify_peer ' => env ( ' RABBITMQ_SSL_VERIFY_PEER ' , true ),
' passphrase ' => env ( ' RABBITMQ_SSL_PASSPHRASE ' , null ),
],
],
],
// ...
],
指示 Laravel 工作人员在所有数据库提交完成后分派事件。
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' after_commit ' => true ,
],
// ...
],
默认情况下,您的连接将创建为惰性连接。如果由于某种原因您不希望连接延迟,您可以通过设置以下配置将其关闭。
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' lazy ' = > false ,
],
// ...
],
默认情况下,用于连接的网络协议为 tcp。如果由于某种原因您想使用其他网络协议,您可以在配置选项中添加额外的值。可用协议: tcp
、 ssl
、 tls
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' network_protocol ' => ' tcp ' ,
],
// ...
],
从 13.3.0 开始,该软件包开箱即用地支持 Laravel Octane。首先,安装 Octane 并且不要忘记在 Octane 配置中预热“rabbitmq”连接。
请参阅:#460(评论)
完成配置后,您可以使用 Laravel Queue API。如果您使用其他队列驱动程序,则无需更改任何其他内容。如果您不知道如何使用 Queue API,请参考 Laravel 官方文档:http://laravel.com/docs/queues
对于 Lumen 使用,应在bootstrap/app.php
中手动注册服务提供商,如下所示:
$ app -> register ( VladimirYuldashev LaravelQueueRabbitMQ LaravelQueueRabbitMQServiceProvider::class);
有两种消费消息的方式。
queue:work
命令是 Laravel 的内置命令。该命令使用basic_get
。如果您想使用多个队列,请使用此选项。
rabbitmq:consume
这个包提供的命令。此命令利用basic_consume
,性能比basic_get
提高约 2 倍,但不支持多个队列。
使用docker-compose
设置 RabbitMQ :
docker compose up -d
要运行测试套件,您可以使用以下命令:
# To run both style and unit tests.
composer test
# To run only style tests.
composer test:style
# To run only unit tests.
composer test:unit
如果您从样式测试中收到任何错误,您可以使用以下命令自动修复大多数(如果不是全部)问题:
composer fix:style
您可以通过发现错误和打开问题来为这个包做出贡献。请添加到您创建拉取请求或问题的包版本。 (例如[5.2]延迟作业的致命错误)