Только последняя версия получит новые функции. Исправления ошибок будут осуществляться по следующей схеме:
Версия пакета | Версия Ларавел | Исправления ошибок до | |
---|---|---|---|
13 | 9 | 8 августа 2023 г. | Документация |
Вы можете установить этот пакет через композитор, используя эту команду:
composer require vladimir-yuldashev/laravel-queue-rabbitmq
Пакет автоматически зарегистрируется.
Добавьте соединение в config/queue.php
:
Это минимальная конфигурация для работы соединения/драйвера RabbitMQ.
' connections ' => [
// ...
' rabbitmq ' => [
' driver ' => ' rabbitmq ' ,
' hosts ' => [
[
' host ' => env ( ' RABBITMQ_HOST ' , ' 127.0.0.1 ' ),
' port ' => env ( ' RABBITMQ_PORT ' , 5672 ),
' user ' => env ( ' RABBITMQ_USER ' , ' guest ' ),
' password ' => env ( ' RABBITMQ_PASSWORD ' , ' guest ' ),
' vhost ' => env ( ' RABBITMQ_VHOST ' , ' / ' ),
],
// ...
],
// ...
],
// ...
],
При необходимости добавьте параметры очереди в конфигурацию соединения. Каждая очередь, созданная для этого соединения, получает свойства.
Если вы хотите расставить приоритеты сообщений в случае их задержки, это возможно путем добавления дополнительных опций.
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' options ' => [
' queue ' => [
// ...
' prioritize_delayed ' => false ,
' queue_max_priority ' => 10 ,
],
],
],
// ...
],
Если вы хотите публиковать сообщения на бирже с ключами маршрутизации, это возможно путем добавления дополнительных опций.
amq.direct
для ключа маршрутизации.queue
.%s
в ключе маршрутизации будет заменено имя_очереди.Примечание: при использовании обмена с ключом маршрутизации вы, вероятно, сами создаете свои очереди с привязками.
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' options ' => [
' queue ' => [
// ...
' exchange ' => ' application-x ' ,
' exchange_type ' => ' topic ' ,
' exchange_routing_key ' => '' ,
],
],
],
// ...
],
В Laravel неудачные задания сохраняются в базе данных. Но, возможно, вы захотите поручить какому-то другому процессу также что-то сделать с сообщением. Если вы хотите указать RabbitMQ перенаправлять неудачные сообщения на обмен или в определенную очередь, это возможно путем добавления дополнительных опций.
amq.direct
для ключа маршрутизации.queue
заменяются на '.failed'
.%s
в ключе маршрутизации будет заменено имя_очереди.Примечание. При использовании обменаfail_job с ключом маршрутизации вам, вероятно, придется самостоятельно создать обмен/очередь с привязками.
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' options ' => [
' queue ' => [
// ...
' reroute_failed ' => true ,
' failed_exchange ' => ' failed-exchange ' ,
' failed_routing_key ' => ' application-x.%s ' ,
],
],
],
// ...
],
Начиная с версии 8.0, этот пакет поддерживает Laravel Horizon «из коробки». Сначала установите Horizon, а затем установите для RABBITMQ_WORKER
значение horizon
.
Горизонт зависит от событий, отправленных работником. Эти события сообщают Horizon, что было сделано с сообщением/заданием.
Эта библиотека поддерживает Horizon, но в конфигурации вы должны указать Laravel, что нужно использовать QueueApi, совместимый с Horizon.
' connections ' => [
// ...
' rabbitmq ' => [
// ...
/* Set to "horizon" if you wish to use Laravel Horizon. */
' worker ' => env ( ' RABBITMQ_WORKER ' , ' default ' ),
],
// ...
],
Иногда приходится работать с сообщениями, опубликованными другим приложением.
Эти сообщения, вероятно, не будут учитывать схему полезной нагрузки задания Laravel. Проблема с этими сообщениями заключается в том, что работники Laravel не смогут определить фактическое задание или класс для выполнения.
Вы можете расширить встроенный класс RabbitMQJob::class
и в конфигурации подключения к очереди определить свой собственный класс. Когда вы указываете в конфигурации ключ job
с собственным именем класса, каждое сообщение, полученное от брокера, будет заключено в ваш собственный класс.
Пример конфигурации:
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' options ' => [
' queue ' => [
// ...
' job ' => App Queue Jobs RabbitMQJob::class,
],
],
],
// ...
],
Пример вашего собственного класса работы:
<?php
namespace App Queue Jobs ;
use VladimirYuldashev LaravelQueueRabbitMQ Queue Jobs RabbitMQJob as BaseJob ;
class RabbitMQJob extends BaseJob
{
/**
* Fire the job.
*
* @return void
*/
public function fire ()
{
$ payload = $ this -> payload ();
$ class = WhatheverClassNameToExecute::class;
$ method = ' handle ' ;
( $ this -> instance = $ this -> resolve ( $ class ))->{ $ method }( $ this , $ payload );
$ this -> delete ();
}
}
Или, может быть, вы хотите добавить к полезной нагрузке дополнительные свойства:
<?php
namespace App Queue Jobs ;
use VladimirYuldashev LaravelQueueRabbitMQ Queue Jobs RabbitMQJob as BaseJob ;
class RabbitMQJob extends BaseJob
{
/**
* Get the decoded body of the job.
*
* @return array
*/
public function payload ()
{
return [
' job ' => ' WhatheverFullyQualifiedClassNameToExecute@handle ' ,
' data ' => json_decode ( $ this -> getRawBody (), true )
];
}
}
Если вы хотите обрабатывать необработанное сообщение, а не в формате JSON или без ключа «задание» в JSON, вам следует добавить заглушку для метода getName
:
<?php
namespace App Queue Jobs ;
use Illuminate Support Facades Log ;
use VladimirYuldashev LaravelQueueRabbitMQ Queue Jobs RabbitMQJob as BaseJob ;
class RabbitMQJob extends BaseJob
{
public function fire ()
{
$ anyMessage = $ this -> getRawBody ();
Log:: info ( $ anyMessage );
$ this -> delete ();
}
public function getName ()
{
return '' ;
}
}
Вы можете расширить встроенный класс PhpAmqpLibConnectionAMQPStreamConnection::class
или PhpAmqpLibConnectionAMQPSLLConnection::class
, а в конфигурации соединения вы можете определить свой собственный класс. Когда вы указываете в конфигурации ключ connection
с собственным именем класса, каждое соединение будет использовать ваш собственный класс.
Пример конфигурации:
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' connection ' = > App Queue Connection MyRabbitMQConnection::class,
],
// ...
],
Если вы хотите использовать свой собственный RabbitMQQueue::class
это возможно путем расширения VladimirYuldashevLaravelQueueRabbitMQQueueRabbitMQQueue
. и сообщите laravel, чтобы он использовал ваш класс, установив для RABBITMQ_WORKER
значение AppQueueRabbitMQQueue::class
.
Примечание. Рабочие классы должны расширять
VladimirYuldashevLaravelQueueRabbitMQQueueRabbitMQQueue
' connections ' => [
// ...
' rabbitmq ' => [
// ...
/* Set to a class if you wish to use your own. */
' worker ' => App Queue RabbitMQQueue::class,
],
// ...
],
<?php
namespace App Queue ;
use VladimirYuldashev LaravelQueueRabbitMQ Queue RabbitMQQueue as BaseRabbitMQQueue ;
class RabbitMQQueue extends BaseRabbitMQQueue
{
// ...
}
Например: реализация повторного подключения.
Если вы хотите повторно подключиться к RabbitMQ, если соединение прервано. Вы можете переопределить методы публикации и createChannel.
Примечание. Это не лучшая практика, это пример.
<?php
namespace App Queue ;
use PhpAmqpLib Exception AMQPChannelClosedException ;
use PhpAmqpLib Exception AMQPConnectionClosedException ;
use VladimirYuldashev LaravelQueueRabbitMQ Queue RabbitMQQueue as BaseRabbitMQQueue ;
class RabbitMQQueue extends BaseRabbitMQQueue
{
protected function publishBasic ( $ msg , $ exchange = '' , $ destination = '' , $ mandatory = false , $ immediate = false , $ ticket = null ): void
{
try {
parent :: publishBasic ( $ msg , $ exchange , $ destination , $ mandatory , $ immediate , $ ticket );
} catch ( AMQPConnectionClosedException | AMQPChannelClosedException ) {
$ this -> reconnect ();
parent :: publishBasic ( $ msg , $ exchange , $ destination , $ mandatory , $ immediate , $ ticket );
}
}
protected function publishBatch ( $ jobs , $ data = '' , $ queue = null ): void
{
try {
parent :: publishBatch ( $ jobs , $ data , $ queue );
} catch ( AMQPConnectionClosedException | AMQPChannelClosedException ) {
$ this -> reconnect ();
parent :: publishBatch ( $ jobs , $ data , $ queue );
}
}
protected function createChannel (): AMQPChannel
{
try {
return parent :: createChannel ();
} catch ( AMQPConnectionClosedException ) {
$ this -> reconnect ();
return parent :: createChannel ();
}
}
}
Соединение использует очередь по умолчанию со значением «по умолчанию», если laravel не предоставляет очередь. Можно изменить очередь по умолчанию, добавив дополнительный параметр в конфигурацию соединения.
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' queue ' => env ( ' RABBITMQ_QUEUE ' , ' default ' ),
],
// ...
],
По умолчанию ваше соединение будет создано с настройкой пульса 0
. Вы можете изменить настройки пульса, изменив файл config.
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' options ' => [
// ...
' heartbeat ' => 10 ,
],
],
// ...
],
Если вам нужно безопасное соединение с серверами RabbitMQ, вам нужно будет добавить эти дополнительные параметры конфигурации.
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' secure ' = > true ,
' options ' => [
// ...
' ssl_options ' => [
' cafile ' => env ( ' RABBITMQ_SSL_CAFILE ' , null ),
' local_cert ' => env ( ' RABBITMQ_SSL_LOCALCERT ' , null ),
' local_key ' => env ( ' RABBITMQ_SSL_LOCALKEY ' , null ),
' verify_peer ' => env ( ' RABBITMQ_SSL_VERIFY_PEER ' , true ),
' passphrase ' => env ( ' RABBITMQ_SSL_PASSPHRASE ' , null ),
],
],
],
// ...
],
Чтобы проинструктировать работники Laravel отправлять события после завершения всех фиксаций базы данных.
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' after_commit ' => true ,
],
// ...
],
По умолчанию ваше соединение будет создано как ленивое. Если по какой-то причине вы не хотите, чтобы соединение было ленивым, вы можете отключить его, установив следующую конфигурацию.
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' lazy ' = > false ,
],
// ...
],
По умолчанию для подключения используется сетевой протокол tcp. Если по какой-то причине вы хотите использовать другой сетевой протокол, вы можете добавить дополнительное значение в параметры конфигурации. Доступные протоколы: tcp
, ssl
, tls
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' network_protocol ' => ' tcp ' ,
],
// ...
],
Начиная с версии 13.3.0, этот пакет поддерживает Laravel Octane «из коробки». Во-первых, установите Octane и не забудьте прогреть соединение «rabbitmq» в конфигурации Octane.
См.: #460 (комментарий)
После завершения настройки вы можете использовать API Laravel Queue. Если вы использовали другие драйверы очередей, вам больше ничего менять не нужно. Если вы не знаете, как использовать Queue API, обратитесь к официальной документации Laravel: http://laravel.com/docs/queues.
Для использования Lumen поставщик услуг должен быть зарегистрирован вручную, как указано в bootstrap/app.php
:
$ app -> register ( VladimirYuldashev LaravelQueueRabbitMQ LaravelQueueRabbitMQServiceProvider::class);
Существует два способа потребления сообщений.
Команда queue:work
— встроенная команда Laravel. Эта команда использует basic_get
. Используйте это, если вы хотите использовать несколько очередей.
rabbitmq:consume
предоставляемая этим пакетом. Эта команда использует basic_consume
и более производительна, чем basic_get
примерно в 2 раза, но не поддерживает несколько очередей.
Настройте RabbitMQ с помощью docker-compose
:
docker compose up -d
Для запуска набора тестов вы можете использовать следующие команды:
# To run both style and unit tests.
composer test
# To run only style tests.
composer test:style
# To run only unit tests.
composer test:unit
Если вы получаете какие-либо ошибки в ходе тестов стиля, вы можете автоматически исправить большинство, если не все проблемы, с помощью следующей команды:
composer fix:style
Вы можете внести свой вклад в этот пакет, обнаруживая ошибки и открывая проблемы. Пожалуйста, укажите, в какой версии пакета вы создаете запрос на включение или проблему. (например, [5.2] Неустранимая ошибка при отложенном задании)