Nur die neueste Version erhält neue Funktionen. Fehlerbehebungen werden nach folgendem Schema bereitgestellt:
Paketversion | Laravel-Version | Fehlerbehebungen bis | |
---|---|---|---|
13 | 9 | 8. August 2023 | Dokumentation |
Sie können dieses Paket über Composer mit diesem Befehl installieren:
composer require vladimir-yuldashev/laravel-queue-rabbitmq
Das Paket registriert sich automatisch.
Verbindung zu config/queue.php
hinzufügen:
Dies ist die Mindestkonfiguration, damit die RabbitMQ-Verbindung/der RabbitMQ-Treiber funktioniert.
' connections ' => [
// ...
' rabbitmq ' => [
' driver ' => ' rabbitmq ' ,
' hosts ' => [
[
' host ' => env ( ' RABBITMQ_HOST ' , ' 127.0.0.1 ' ),
' port ' => env ( ' RABBITMQ_PORT ' , 5672 ),
' user ' => env ( ' RABBITMQ_USER ' , ' guest ' ),
' password ' => env ( ' RABBITMQ_PASSWORD ' , ' guest ' ),
' vhost ' => env ( ' RABBITMQ_VHOST ' , ' / ' ),
],
// ...
],
// ...
],
// ...
],
Fügen Sie optional Warteschlangenoptionen zur Konfiguration einer Verbindung hinzu. Jede für diese Verbindung erstellte Warteschlange erhält die Eigenschaften.
Wenn Sie verspätete Nachrichten priorisieren möchten, ist dies durch Hinzufügen zusätzlicher Optionen möglich.
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' options ' => [
' queue ' => [
// ...
' prioritize_delayed ' => false ,
' queue_max_priority ' => 10 ,
],
],
],
// ...
],
Wenn Sie Nachrichten über eine Börse mit Routing-Schlüsseln veröffentlichen möchten, ist dies durch das Hinzufügen zusätzlicher Optionen möglich.
amq.direct
Austausch für den Routing-Schlüsselqueue
.%s
im Routing-Schlüssel wird der queue_name ersetzt.Hinweis: Wenn Sie einen Austausch mit Routing-Schlüssel verwenden, erstellen Sie Ihre Warteschlangen mit Bindungen wahrscheinlich selbst.
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' options ' => [
' queue ' => [
// ...
' exchange ' => ' application-x ' ,
' exchange_type ' => ' topic ' ,
' exchange_routing_key ' => '' ,
],
],
],
// ...
],
In Laravel werden fehlgeschlagene Jobs in der Datenbank gespeichert. Aber vielleicht möchten Sie einen anderen Prozess anweisen, ebenfalls etwas mit der Nachricht zu tun. Wenn Sie RabbitMQ anweisen möchten, fehlgeschlagene Nachrichten an einen Austausch oder eine bestimmte Warteschlange umzuleiten, ist dies durch Hinzufügen zusätzlicher Optionen möglich.
amq.direct
Austausch für den Routing-Schlüsselqueue
durch '.failed'
ersetzt.%s
im Routing-Schlüssel wird der queue_name ersetzt.Hinweis: Wenn Sie den Austausch „failed_job“ mit Routing-Key verwenden, müssen Sie Ihren Austausch/Ihre Warteschlange mit Bindungen wahrscheinlich selbst erstellen.
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' options ' => [
' queue ' => [
// ...
' reroute_failed ' => true ,
' failed_exchange ' => ' failed-exchange ' ,
' failed_routing_key ' => ' application-x.%s ' ,
],
],
],
// ...
],
Ab 8.0 unterstützt dieses Paket Laravel Horizon sofort. Installieren Sie zunächst Horizon und setzen Sie dann RABBITMQ_WORKER
auf horizon
.
Horizon hängt von den vom Worker ausgelösten Ereignissen ab. Diese Ereignisse informieren Horizon darüber, was mit der Nachricht/dem Auftrag geschehen ist.
Diese Bibliothek unterstützt Horizon, aber in der Konfiguration müssen Sie Laravel anweisen, die mit Horizon kompatible QueueApi zu verwenden.
' connections ' => [
// ...
' rabbitmq ' => [
// ...
/* Set to "horizon" if you wish to use Laravel Horizon. */
' worker ' => env ( ' RABBITMQ_WORKER ' , ' default ' ),
],
// ...
],
Manchmal müssen Sie mit Nachrichten arbeiten, die von einer anderen Anwendung veröffentlicht wurden.
Diese Nachrichten werden das Job-Payload-Schema von Laravel wahrscheinlich nicht respektieren. Das Problem mit diesen Nachrichten besteht darin, dass Laravel-Worker nicht in der Lage sind, den tatsächlich auszuführenden Job oder die auszuführende Klasse zu bestimmen.
Sie können die integrierte RabbitMQJob::class
erweitern und innerhalb der Warteschlangenverbindungskonfiguration Ihre eigene Klasse definieren. Wenn Sie in der Konfiguration einen job
mit Ihrem eigenen Klassennamen angeben, wird jede vom Broker abgerufene Nachricht von Ihrer eigenen Klasse umschlossen.
Ein Beispiel für die Konfiguration:
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' options ' => [
' queue ' => [
// ...
' job ' => App Queue Jobs RabbitMQJob::class,
],
],
],
// ...
],
Ein Beispiel für Ihre eigene Berufsklasse:
<?php
namespace App Queue Jobs ;
use VladimirYuldashev LaravelQueueRabbitMQ Queue Jobs RabbitMQJob as BaseJob ;
class RabbitMQJob extends BaseJob
{
/**
* Fire the job.
*
* @return void
*/
public function fire ()
{
$ payload = $ this -> payload ();
$ class = WhatheverClassNameToExecute::class;
$ method = ' handle ' ;
( $ this -> instance = $ this -> resolve ( $ class ))->{ $ method }( $ this , $ payload );
$ this -> delete ();
}
}
Oder vielleicht möchten Sie der Nutzlast zusätzliche Eigenschaften hinzufügen:
<?php
namespace App Queue Jobs ;
use VladimirYuldashev LaravelQueueRabbitMQ Queue Jobs RabbitMQJob as BaseJob ;
class RabbitMQJob extends BaseJob
{
/**
* Get the decoded body of the job.
*
* @return array
*/
public function payload ()
{
return [
' job ' => ' WhatheverFullyQualifiedClassNameToExecute@handle ' ,
' data ' => json_decode ( $ this -> getRawBody (), true )
];
}
}
Wenn Sie Rohnachrichten verarbeiten möchten, die nicht im JSON-Format oder ohne „Job“-Schlüssel in JSON vorliegen, sollten Sie einen Stub für getName
-Methode hinzufügen:
<?php
namespace App Queue Jobs ;
use Illuminate Support Facades Log ;
use VladimirYuldashev LaravelQueueRabbitMQ Queue Jobs RabbitMQJob as BaseJob ;
class RabbitMQJob extends BaseJob
{
public function fire ()
{
$ anyMessage = $ this -> getRawBody ();
Log:: info ( $ anyMessage );
$ this -> delete ();
}
public function getName ()
{
return '' ;
}
}
Sie können die integrierte PhpAmqpLibConnectionAMQPStreamConnection::class
oder PhpAmqpLibConnectionAMQPSLLConnection::class
erweitern und innerhalb der Verbindungskonfiguration Ihre eigene Klasse definieren. Wenn Sie in der Konfiguration einen connection
mit Ihrem eigenen Klassennamen angeben, verwendet jede Verbindung Ihre eigene Klasse.
Ein Beispiel für die Konfiguration:
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' connection ' = > App Queue Connection MyRabbitMQConnection::class,
],
// ...
],
Wenn Sie Ihre eigene RabbitMQQueue::class
verwenden möchten, ist dies durch die Erweiterung VladimirYuldashevLaravelQueueRabbitMQQueueRabbitMQQueue
möglich. und informieren Sie Laravel, Ihre Klasse zu verwenden, indem Sie RABBITMQ_WORKER
auf AppQueueRabbitMQQueue::class
setzen.
Hinweis: Worker-Klassen müssen
VladimirYuldashevLaravelQueueRabbitMQQueueRabbitMQQueue
erweitern
' connections ' => [
// ...
' rabbitmq ' => [
// ...
/* Set to a class if you wish to use your own. */
' worker ' => App Queue RabbitMQQueue::class,
],
// ...
],
<?php
namespace App Queue ;
use VladimirYuldashev LaravelQueueRabbitMQ Queue RabbitMQQueue as BaseRabbitMQQueue ;
class RabbitMQQueue extends BaseRabbitMQQueue
{
// ...
}
Zum Beispiel: Eine Reconnect-Implementierung.
Wenn Sie die Verbindung zu RabbitMQ wiederherstellen möchten, wenn die Verbindung unterbrochen ist. Sie können die Methoden „publishing“ und „createChannel“ überschreiben.
Hinweis: Dies ist keine Best Practice, sondern ein Beispiel.
<?php
namespace App Queue ;
use PhpAmqpLib Exception AMQPChannelClosedException ;
use PhpAmqpLib Exception AMQPConnectionClosedException ;
use VladimirYuldashev LaravelQueueRabbitMQ Queue RabbitMQQueue as BaseRabbitMQQueue ;
class RabbitMQQueue extends BaseRabbitMQQueue
{
protected function publishBasic ( $ msg , $ exchange = '' , $ destination = '' , $ mandatory = false , $ immediate = false , $ ticket = null ): void
{
try {
parent :: publishBasic ( $ msg , $ exchange , $ destination , $ mandatory , $ immediate , $ ticket );
} catch ( AMQPConnectionClosedException | AMQPChannelClosedException ) {
$ this -> reconnect ();
parent :: publishBasic ( $ msg , $ exchange , $ destination , $ mandatory , $ immediate , $ ticket );
}
}
protected function publishBatch ( $ jobs , $ data = '' , $ queue = null ): void
{
try {
parent :: publishBatch ( $ jobs , $ data , $ queue );
} catch ( AMQPConnectionClosedException | AMQPChannelClosedException ) {
$ this -> reconnect ();
parent :: publishBatch ( $ jobs , $ data , $ queue );
}
}
protected function createChannel (): AMQPChannel
{
try {
return parent :: createChannel ();
} catch ( AMQPConnectionClosedException ) {
$ this -> reconnect ();
return parent :: createChannel ();
}
}
}
Die Verbindung verwendet eine Standardwarteschlange mit dem Wert „default“, wenn von Laravel keine Warteschlange bereitgestellt wird. Es ist möglich, die Standardwarteschlange zu ändern, indem Sie einen zusätzlichen Parameter in der Verbindungskonfiguration hinzufügen.
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' queue ' => env ( ' RABBITMQ_QUEUE ' , ' default ' ),
],
// ...
],
Standardmäßig wird Ihre Verbindung mit der Heartbeat-Einstellung 0
erstellt. Sie können die Heartbeat-Einstellungen ändern, indem Sie die Konfiguration ändern.
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' options ' => [
// ...
' heartbeat ' => 10 ,
],
],
// ...
],
Wenn Sie eine sichere Verbindung zu RabbitMQ-Servern benötigen, müssen Sie diese zusätzlichen Konfigurationsoptionen hinzufügen.
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' secure ' = > true ,
' options ' => [
// ...
' ssl_options ' => [
' cafile ' => env ( ' RABBITMQ_SSL_CAFILE ' , null ),
' local_cert ' => env ( ' RABBITMQ_SSL_LOCALCERT ' , null ),
' local_key ' => env ( ' RABBITMQ_SSL_LOCALKEY ' , null ),
' verify_peer ' => env ( ' RABBITMQ_SSL_VERIFY_PEER ' , true ),
' passphrase ' => env ( ' RABBITMQ_SSL_PASSPHRASE ' , null ),
],
],
],
// ...
],
Um Laravel-Mitarbeiter anzuweisen, Ereignisse auszulösen, nachdem alle Datenbank-Commits abgeschlossen sind.
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' after_commit ' => true ,
],
// ...
],
Standardmäßig wird Ihre Verbindung als Lazy-Verbindung erstellt. Wenn Sie aus irgendeinem Grund nicht möchten, dass die Verbindung verzögert wird, können Sie sie deaktivieren, indem Sie die folgende Konfiguration festlegen.
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' lazy ' = > false ,
],
// ...
],
Standardmäßig wird für die Verbindung das Netzwerkprotokoll TCP verwendet. Wenn Sie aus irgendeinem Grund ein anderes Netzwerkprotokoll verwenden möchten, können Sie den zusätzlichen Wert in Ihren Konfigurationsoptionen hinzufügen. Verfügbare Protokolle: tcp
, ssl
, tls
' connections ' => [
// ...
' rabbitmq ' => [
// ...
' network_protocol ' => ' tcp ' ,
],
// ...
],
Ab 13.3.0 unterstützt dieses Paket Laravel Octane sofort. Installieren Sie zunächst Octane und vergessen Sie nicht, die Verbindung „rabbitmq“ in der Octane-Konfiguration zu erwärmen.
Siehe: #460 (Kommentar)
Sobald Sie die Konfiguration abgeschlossen haben, können Sie die Laravel Queue API verwenden. Wenn Sie andere Warteschlangentreiber verwendet haben, müssen Sie keine weiteren Änderungen vornehmen. Wenn Sie nicht wissen, wie Sie die Queue-API verwenden, lesen Sie bitte die offizielle Laravel-Dokumentation: http://laravel.com/docs/queues
Für die Lumen-Nutzung sollte der Dienstanbieter manuell wie folgt in bootstrap/app.php
registriert werden:
$ app -> register ( VladimirYuldashev LaravelQueueRabbitMQ LaravelQueueRabbitMQServiceProvider::class);
Es gibt zwei Möglichkeiten, Nachrichten zu konsumieren.
queue:work
-Befehl, der der integrierte Befehl von Laravel ist. Dieser Befehl verwendet basic_get
. Verwenden Sie dies, wenn Sie mehrere Warteschlangen nutzen möchten.
rabbitmq:consume
-Befehl, der von diesem Paket bereitgestellt wird. Dieser Befehl nutzt basic_consume
und ist etwa 2x leistungsfähiger als basic_get
, unterstützt jedoch nicht mehrere Warteschlangen.
Richten Sie RabbitMQ mit docker-compose
ein:
docker compose up -d
Um die Testsuite auszuführen, können Sie die folgenden Befehle verwenden:
# To run both style and unit tests.
composer test
# To run only style tests.
composer test:style
# To run only unit tests.
composer test:unit
Wenn Sie bei den Stiltests Fehler erhalten, können Sie die meisten, wenn nicht alle Probleme mit dem folgenden Befehl automatisch beheben:
composer fix:style
Sie können zu diesem Paket beitragen, indem Sie Fehler entdecken und Probleme lösen. Bitte fügen Sie hinzu, zu welcher Version des Pakets Sie eine Pull-Anfrage oder ein Pull-Problem erstellen. (z. B. [5.2] Schwerwiegender Fehler bei verzögertem Auftrag)