只有最新版本才會獲得新功能。將使用以下方案提供錯誤修復:
套餐版本 | Laravel 版本 | 錯誤修復直至 | |
---|---|---|---|
13 | 9 | 2023 年 8 月 8 日 | 文件 |
您可以使用以下命令透過 Composer 安裝此軟體包:
composer require vladimir-yuldashev/laravel-queue-rabbitmq
該包將自動註冊。
新增到config/queue.php
的連接:
這是rabbitMQ 連線/驅動程式工作的最小配置。
' connections ' => [
// ...
' rabbitmq ' => [
' driver ' => ' rabbitmq ' ,
' hosts ' => [
[
' host ' => env ( ' RABBITMQ_HOST ' , ' 127.0.0.1 ' ),
' port ' => env ( ' RABBITMQ_PORT ' , 5672 ),
' user ' => env ( ' RABBITMQ_USER ' , ' guest ' ),
' password ' => env ( ' RABBITMQ_PASSWORD ' , ' guest ' ),
' vhost ' => env ( ' RABBITMQ_VHOST ' , ' / ' ),
],
// ...
],
// ...
],
// ...
],
可以選擇將佇列選項新增至連線的配置。為此連接創建的每個隊列都會取得屬性。
當您想要在訊息延遲時優先處理訊息時,可以透過新增額外的選項來實現。
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' options ' => [
' queue ' => [
// ...
' prioritize_delayed ' => false ,
' queue_max_priority ' => 10 ,
],
],
],
// ...
],
當您想要針對具有路由金鑰的交換發布訊息時,可以透過新增額外的選項來實現。
amq.direct
交換作為路由金鑰queue
名稱。%s
時,queue_name 將被替換。注意:當使用帶有路由金鑰的交換時,您可能會自行建立具有綁定的佇列。
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' options ' => [
' queue ' => [
// ...
' exchange ' => ' application-x ' ,
' exchange_type ' => ' topic ' ,
' exchange_routing_key ' => '' ,
],
],
],
// ...
],
在 Laravel 中,失敗的作業被儲存到資料庫中。但也許您想指示其他進程也對訊息執行某些操作。當您想要指示 RabbitMQ 將失敗的訊息重新路由到交換器或特定佇列時,可以透過新增額外的選項來實現。
amq.direct
交換作為路由金鑰queue
名稱將會替換為'.failed'
。%s
時,queue_name 將被替換。注意:當使用 failed_job 交換與路由鍵時,您可能需要自行建立具有綁定的交換/佇列。
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' options ' => [
' queue ' => [
// ...
' reroute_failed ' => true ,
' failed_exchange ' => ' failed-exchange ' ,
' failed_routing_key ' => ' application-x.%s ' ,
],
],
],
// ...
],
從 8.0 開始,該軟體包開箱即用地支援 Laravel Horizon。首先,安裝 Horizon ,然後將RABBITMQ_WORKER
設定為horizon
。
Horizon 取決於工作人員調度的事件。這些事件通知 Horizon 對訊息/作業做了什麼。
這個函式庫支援 Horizon,但是在設定中你必須通知 Laravel 使用與 Horizon 相容的 QueueApi。
' connections ' => [
// ...
' rabbitmq ' => [
// ...
/* Set to "horizon" if you wish to use Laravel Horizon. */
' worker ' => env ( ' RABBITMQ_WORKER ' , ' default ' ),
],
// ...
],
有時您必須處理另一個應用程式發布的消息。
這些訊息可能不尊重 Laravel 的作業負載模式。這些訊息的問題在於,Laravel 工作人員將無法確定要執行的實際作業或類別。
您可以擴展內建RabbitMQJob::class
,並在佇列連接配置中定義您自己的類別。當您在設定中使用您自己的類別名稱指定job
鍵時,從代理程式擷取的每個訊息都將由您自己的類別包裝。
設定範例:
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' options ' => [
' queue ' => [
// ...
' job ' => App Queue Jobs RabbitMQJob::class,
],
],
],
// ...
],
您自己的工作類別的範例:
<?php
namespace App Queue Jobs ;
use VladimirYuldashev LaravelQueueRabbitMQ Queue Jobs RabbitMQJob as BaseJob ;
class RabbitMQJob extends BaseJob
{
/**
* Fire the job.
*
* @return void
*/
public function fire ()
{
$ payload = $ this -> payload ();
$ class = WhatheverClassNameToExecute::class;
$ method = ' handle ' ;
( $ this -> instance = $ this -> resolve ( $ class ))->{ $ method }( $ this , $ payload );
$ this -> delete ();
}
}
或者您可能想要為有效負載添加額外的屬性:
<?php
namespace App Queue Jobs ;
use VladimirYuldashev LaravelQueueRabbitMQ Queue Jobs RabbitMQJob as BaseJob ;
class RabbitMQJob extends BaseJob
{
/**
* Get the decoded body of the job.
*
* @return array
*/
public function payload ()
{
return [
' job ' => ' WhatheverFullyQualifiedClassNameToExecute@handle ' ,
' data ' => json_decode ( $ this -> getRawBody (), true )
];
}
}
如果您想處理原始訊息,而不是 JSON 格式或 JSON 中沒有「job」鍵,您應該為getName
方法新增存根:
<?php
namespace App Queue Jobs ;
use Illuminate Support Facades Log ;
use VladimirYuldashev LaravelQueueRabbitMQ Queue Jobs RabbitMQJob as BaseJob ;
class RabbitMQJob extends BaseJob
{
public function fire ()
{
$ anyMessage = $ this -> getRawBody ();
Log:: info ( $ anyMessage );
$ this -> delete ();
}
public function getName ()
{
return '' ;
}
}
您可以擴充內建的PhpAmqpLibConnectionAMQPStreamConnection::class
或PhpAmqpLibConnectionAMQPSLLConnection::class
並在連線配置中,您可以定義自己的類別。當您在設定中使用您自己的類別名稱指定connection
鍵時,每個連線都會使用您自己的類別。
設定範例:
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' connection ' = > App Queue Connection MyRabbitMQConnection::class,
],
// ...
],
如果您想使用自己的RabbitMQQueue::class
可以透過擴充VladimirYuldashevLaravelQueueRabbitMQQueueRabbitMQQueue
來實現。並透過將RABBITMQ_WORKER
設定為AppQueueRabbitMQQueue::class
來通知 laravel 使用您的類別。
注意:Worker 類別必須擴充
VladimirYuldashevLaravelQueueRabbitMQQueueRabbitMQQueue
' connections ' => [
// ...
' rabbitmq ' => [
// ...
/* Set to a class if you wish to use your own. */
' worker ' => App Queue RabbitMQQueue::class,
],
// ...
],
<?php
namespace App Queue ;
use VladimirYuldashev LaravelQueueRabbitMQ Queue RabbitMQQueue as BaseRabbitMQQueue ;
class RabbitMQQueue extends BaseRabbitMQQueue
{
// ...
}
例如:重新連線實作。
如果要重新連線到 RabbitMQ,如果連線已中斷。您可以重寫發布和createChannel 方法。
注意:這不是最佳實踐,而是一個範例。
<?php
namespace App Queue ;
use PhpAmqpLib Exception AMQPChannelClosedException ;
use PhpAmqpLib Exception AMQPConnectionClosedException ;
use VladimirYuldashev LaravelQueueRabbitMQ Queue RabbitMQQueue as BaseRabbitMQQueue ;
class RabbitMQQueue extends BaseRabbitMQQueue
{
protected function publishBasic ( $ msg , $ exchange = '' , $ destination = '' , $ mandatory = false , $ immediate = false , $ ticket = null ): void
{
try {
parent :: publishBasic ( $ msg , $ exchange , $ destination , $ mandatory , $ immediate , $ ticket );
} catch ( AMQPConnectionClosedException | AMQPChannelClosedException ) {
$ this -> reconnect ();
parent :: publishBasic ( $ msg , $ exchange , $ destination , $ mandatory , $ immediate , $ ticket );
}
}
protected function publishBatch ( $ jobs , $ data = '' , $ queue = null ): void
{
try {
parent :: publishBatch ( $ jobs , $ data , $ queue );
} catch ( AMQPConnectionClosedException | AMQPChannelClosedException ) {
$ this -> reconnect ();
parent :: publishBatch ( $ jobs , $ data , $ queue );
}
}
protected function createChannel (): AMQPChannel
{
try {
return parent :: createChannel ();
} catch ( AMQPConnectionClosedException ) {
$ this -> reconnect ();
return parent :: createChannel ();
}
}
}
當 laravel 未提供佇列時,連線確實使用值為「default」的預設佇列。可以透過在連線配置中新增額外參數來變更預設佇列。
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' queue ' => env ( ' RABBITMQ_QUEUE ' , ' default ' ),
],
// ...
],
預設情況下,您的連線將使用心跳設定0
建立。您可以透過變更配置來變更心跳設定。
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' options ' => [
// ...
' heartbeat ' => 10 ,
],
],
// ...
],
如果您需要與rabbitMQ伺服器的安全連接,則需要新增這些額外的設定選項。
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' secure ' = > true ,
' options ' => [
// ...
' ssl_options ' => [
' cafile ' => env ( ' RABBITMQ_SSL_CAFILE ' , null ),
' local_cert ' => env ( ' RABBITMQ_SSL_LOCALCERT ' , null ),
' local_key ' => env ( ' RABBITMQ_SSL_LOCALKEY ' , null ),
' verify_peer ' => env ( ' RABBITMQ_SSL_VERIFY_PEER ' , true ),
' passphrase ' => env ( ' RABBITMQ_SSL_PASSPHRASE ' , null ),
],
],
],
// ...
],
指示 Laravel 工作人員在所有資料庫提交完成後分派事件。
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' after_commit ' => true ,
],
// ...
],
預設情況下,您的連線將建立為惰性連線。如果您由於某種原因不希望連線延遲,您可以透過設定以下配置將其關閉。
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' lazy ' = > false ,
],
// ...
],
預設情況下,用於連接的網路協定為 tcp。如果您由於某種原因想要使用其他網路協議,您可以在配置選項中新增額外的值。可用協定: tcp
、 ssl
、 tls
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' network_protocol ' => ' tcp ' ,
],
// ...
],
從 13.3.0 開始,該軟體包開箱即用地支援 Laravel Octane。首先,安裝 Octane 並且不要忘記在 Octane 設定中預熱「rabbitmq」連線。
請參閱:#460(評論)
完成配置後,您可以使用 Laravel Queue API。如果您使用其他佇列驅動程序,則無需更改任何其他內容。如果您不知道如何使用 Queue API,請參考 Laravel 官方文件:http://laravel.com/docs/queues
對於 Lumen 使用,應在bootstrap/app.php
中手動註冊服務提供者,如下所示:
$ app -> register ( VladimirYuldashev LaravelQueueRabbitMQ LaravelQueueRabbitMQServiceProvider::class);
有兩種消費訊息的方式。
queue:work
指令是 Laravel 內建的指令。這個指令使用basic_get
。如果您想使用多個佇列,請使用此選項。
rabbitmq:consume
這個包提供的命令。此指令利用basic_consume
,效能比basic_get
提高約 2 倍,但不支援多個佇列。
使用docker-compose
設定 RabbitMQ :
docker compose up -d
要運行測試套件,您可以使用以下命令:
# To run both style and unit tests.
composer test
# To run only style tests.
composer test:style
# To run only unit tests.
composer test:unit
如果您從樣式測試中收到任何錯誤,您可以使用以下命令自動修復大多數(如果不是全部)問題:
composer fix:style
您可以透過發現錯誤和打開問題來為這個包做出貢獻。請新增至您建立拉取請求或問題的套件版本。 (例如[5.2]延遲作業的致命錯誤)