Das RabbitMqBundle
integriert Messaging in Ihre Anwendung über RabbitMQ mithilfe der php-amqplib-Bibliothek.
Das Bundle implementiert mehrere Nachrichtenmuster, wie sie in der Thumper-Bibliothek zu sehen sind. Daher ist das Veröffentlichen von Nachrichten an RabbitMQ von einem Symfony-Controller aus so einfach wie:
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ));
Wenn Sie später 50 Nachrichten aus der Warteschlange upload_pictures
nutzen möchten, führen Sie einfach Folgendes auf der CLI aus:
$ ./app/console rabbitmq:consumer -m 50 upload_picture
Alle Beispiele setzen einen laufenden RabbitMQ-Server voraus.
Dieses Paket wurde auf der Symfony Live Paris 2011-Konferenz vorgestellt. Sehen Sie sich die Folien hier an.
Aufgrund der durch Symfony >=4.4 verursachten bahnbrechenden Änderungen wurde ein neues Tag veröffentlicht, das das Bundle mit Symfony >=4.4 kompatibel macht.
Fordern Sie das Bundle und seine Abhängigkeiten mit Composer an:
$ composer require php-amqplib/rabbitmq-bundle
Registrieren Sie das Bundle:
// app/AppKernel.php
public function registerBundles ()
{
$ bundles = array (
new OldSound RabbitMqBundle OldSoundRabbitMqBundle (),
);
}
Genießen !
Wenn Sie eine Konsolenanwendung zum Ausführen von RabbitMQ-Konsumenten haben, benötigen Sie Symfony HttpKernel und FrameworkBundle nicht. Ab Version 1.6 können Sie die Dependency-Injection-Komponente verwenden, um diese Bundle-Konfiguration und -Dienste zu laden und dann den Consumer-Befehl zu verwenden.
Erfordern Sie das Bundle in Ihrer Composer.json-Datei:
{
"require": {
"php-amqplib/rabbitmq-bundle": "^2.0",
}
}
Registrieren Sie die Erweiterung und den Compiler-Durchlauf:
use OldSound RabbitMqBundle DependencyInjection OldSoundRabbitMqExtension ;
use OldSound RabbitMqBundle DependencyInjection Compiler RegisterPartsPass ;
// ...
$ containerBuilder -> registerExtension ( new OldSoundRabbitMqExtension ());
$ containerBuilder -> addCompilerPass ( new RegisterPartsPass ());
Seit 04.06.2012 Einige Standardoptionen für Börsen, die im Konfigurationsabschnitt „Produzenten“ deklariert wurden, wurden geändert, um mit den Standardoptionen der im Abschnitt „Verbraucher“ deklarierten Börsen übereinzustimmen. Die betroffenen Einstellungen sind:
durable
wurde von false
in true
geändert.auto_delete
wurde von true
in false
geändert.Ihre Konfiguration muss aktualisiert werden, wenn Sie sich auf die vorherigen Standardwerte verlassen haben.
Seit dem 24.04.2012 hat sich die Signatur der ConsumerInterface::execute-Methode geändert
Seit dem 03.01.2012 ruft die Consumer-Execute-Methode das gesamte AMQP-Nachrichtenobjekt und nicht nur den Text ab. Weitere Einzelheiten finden Sie in der CHANGELOG-Datei.
Fügen Sie den Abschnitt old_sound_rabbit_mq
in Ihre Konfigurationsdatei ein:
old_sound_rabbit_mq :
connections :
default :
host : ' localhost '
port : 5672
user : ' guest '
password : ' guest '
vhost : ' / '
lazy : false
connection_timeout : 3
read_write_timeout : 3
# the timeout when waiting for a response from rabbitMQ (0.0 means waits forever)
channel_rpc_timeout : 0.0
# requires php-amqplib v2.4.1+ and PHP5.4+
keepalive : false
# requires php-amqplib v2.4.1+
heartbeat : 0
# requires php_sockets.dll
use_socket : true # default false
login_method : ' AMQPLAIN ' # default 'AMQPLAIN', can be 'EXTERNAL' or 'PLAIN', see https://www.rabbitmq.com/docs/access-control#mechanisms
another :
# A different (unused) connection defined by an URL. One can omit all parts,
# except the scheme (amqp:). If both segment in the URL and a key value (see above)
# are given the value from the URL takes precedence.
# See https://www.rabbitmq.com/uri-spec.html on how to encode values.
url : ' amqp://guest:password@localhost:5672/vhost?lazy=1&connection_timeout=6 '
producers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
service_alias : my_app_service # no alias by default
default_routing_key : ' optional.routing.key ' # defaults to '' if not set
default_content_type : ' content/type ' # defaults to 'text/plain'
default_delivery_mode : 2 # optional. 1 means non-persistent, 2 means persistent. Defaults to "2".
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
options :
no_ack : false # optional. If set to "true", automatic acknowledgement mode will be used by this consumer. Default "false". See https://www.rabbitmq.com/confirms.html for details.
Hier konfigurieren wir den Verbindungsdienst und die Nachrichtenendpunkte, die unsere Anwendung haben wird. In diesem Beispiel enthält Ihr Service-Container die Dienste old_sound_rabbit_mq.upload_picture_producer
und old_sound_rabbit_mq.upload_picture_consumer
. Letzteres erwartet, dass es einen Dienst namens upload_picture_service
gibt.
Wenn Sie für den Client keine Verbindung angeben, sucht der Client nach einer Verbindung mit demselben Alias. Für unser upload_picture
sucht der Servicecontainer also nach einer upload_picture
-Verbindung.
Wenn Sie optionale Warteschlangenargumente hinzufügen müssen, können Ihre Warteschlangenoptionen etwa so aussehen:
queue_options : {name: 'upload-picture', arguments: {'x-ha-policy': ['S', 'all']}}
ein weiteres Beispiel mit einer Nachrichten-TTL von 20 Sekunden:
queue_options : {name: 'upload-picture', arguments: {'x-message-ttl': ['I', 20000]}}
Der Argumentwert muss eine Liste von Datentyp und Wert sein. Gültige Datentypen sind:
S
– SaiteI
– GanzzahlD
– DezimalzahlT
– ZeitstempelF
- TischA
– Arrayt
– Bool Passen Sie die arguments
Ihren Bedürfnissen an.
Wenn Sie eine Warteschlange mit bestimmten Routing-Schlüsseln binden möchten, können Sie dies in der Produzenten- oder Verbraucherkonfiguration deklarieren:
queue_options :
name : " upload-picture "
routing_keys :
- ' android.#.upload '
- ' iphone.upload '
In einer Symfony-Umgebung werden alle Dienste für jede Anfrage vollständig gebootet. Ab Version >= 4.3 können Sie einen Dienst als faul deklarieren (Lazy Services). Dieses Paket unterstützt immer noch nicht die neue Funktion „Lazy Services“, aber Sie können lazy: true
in Ihrer Verbindungskonfiguration festlegen, um unnötige Verbindungen zu Ihrem Nachrichtenbroker bei jeder Anfrage zu vermeiden. Aus Leistungsgründen wird dringend empfohlen, Lazy-Verbindungen zu verwenden. Die Lazy-Option ist jedoch standardmäßig deaktiviert, um mögliche Unterbrechungen in Anwendungen zu vermeiden, die dieses Bundle bereits verwenden.
Es ist eine gute Idee, read_write_timeout
auf das Doppelte des Heartbeats zu setzen, damit Ihr Socket geöffnet ist. Wenn Sie dies nicht tun oder einen anderen Multiplikator verwenden, besteht die Gefahr, dass der Consumer- Socket eine Zeitüberschreitung erleidet.
Bitte bedenken Sie, dass Sie mit Problemen rechnen müssen, wenn Ihre Aufgaben generell länger als die Heartbeat-Periode laufen, für die es keine guten Lösungen gibt (Link). Erwägen Sie die Verwendung eines großen Werts für den Heartbeat oder lassen Sie den Heartbeat zugunsten des TCP- keepalive
(sowohl auf der Client- als auch der Serverseite) und der Funktion graceful_max_execution_timeout
deaktiviert.
Sie können mehrere Hosts für eine Verbindung bereitstellen. Dadurch können Sie den RabbitMQ-Cluster mit mehreren Knoten verwenden.
old_sound_rabbit_mq :
connections :
default :
hosts :
- host : host1
port : 3672
user : user1
password : password1
vhost : vhost1
- url : ' amqp://guest:password@localhost:5672/vhost '
connection_timeout : 3
read_write_timeout : 3
Achten Sie darauf, dass Sie nichts angeben können
connection_timeout
read_write_timeout
use_socket
ssl_context
keepalive
heartbeat
connection_parameters_provider
Parameter für jeden Host separat.
Manchmal müssen Ihre Verbindungsinformationen dynamisch sein. Mit dynamischen Verbindungsparametern können Sie Parameter programmgesteuert über einen Dienst bereitstellen oder überschreiben.
Beispielsweise in einem Szenario, in dem der vhost
-Parameter der Verbindung vom aktuellen Mandanten Ihrer White-Label-Anwendung abhängt und Sie dessen Konfiguration nicht jedes Mal ändern möchten (oder können).
Definieren Sie unter connection_parameters_provider
einen Dienst, der ConnectionParametersProviderInterface
implementiert, und fügen Sie ihn der entsprechenden connections
hinzu.
connections :
default :
host : ' localhost '
port : 5672
user : ' guest '
password : ' guest '
vhost : ' foo ' # to be dynamically overridden by `connection_parameters_provider`
connection_parameters_provider : connection_parameters_provider_service
Beispielimplementierung:
class ConnectionParametersProviderService implements ConnectionParametersProvider {
...
public function getConnectionParameters () {
return array ( ' vhost ' => $ this -> getVhost ());
}
. . .
}
In diesem Fall wird der vhost
-Parameter durch die Ausgabe von getVhost()
überschrieben.
In einer Messaging-Anwendung wird der Prozess, der Nachrichten an den Broker sendet , als Producer bezeichnet, während der Prozess, der diese Nachrichten empfängt , als Consumer bezeichnet wird. In Ihrer Anwendung haben Sie mehrere davon, die Sie unter den jeweiligen Einträgen in der Konfiguration auflisten können.
Ein Produzent wird verwendet, um Nachrichten an den Server zu senden. Im AMQP-Modell werden Nachrichten an eine Börse gesendet. Das bedeutet, dass Sie in der Konfiguration für einen Produzenten die Verbindungsoptionen zusammen mit den Austauschoptionen angeben müssen, bei denen es sich normalerweise um den Namen der Börse und deren Typ handelt.
Nehmen wir nun an, Sie möchten Bild-Uploads im Hintergrund verarbeiten. Nachdem Sie das Bild an seinen endgültigen Speicherort verschoben haben, veröffentlichen Sie eine Nachricht mit den folgenden Informationen auf dem Server:
public function indexAction ( $ name )
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ));
}
Wie Sie sehen können, haben Sie, wenn Sie in Ihrer Konfiguration einen Produzenten namens upload_picture haben, im Service-Container einen Service namens old_sound_rabbit_mq.upload_picture_producer .
Neben der Nachricht selbst akzeptiert die Methode OldSoundRabbitMqBundleRabbitMqProducer#publish()
auch einen optionalen Routing-Schlüsselparameter und ein optionales Array zusätzlicher Eigenschaften. Mit dem Array zusätzlicher Eigenschaften können Sie die Eigenschaften ändern, mit denen ein PhpAmqpLibMessageAMQPMessage
Objekt standardmäßig erstellt wird. Auf diese Weise können Sie beispielsweise die Anwendungsheader ändern.
Sie können die Methoden setContentType und setDeliveryMode verwenden, um den Nachrichteninhaltstyp bzw. den Nachrichtenübermittlungsmodus festzulegen und dabei alle im Konfigurationsabschnitt „producers“ festgelegten Standardeinstellungen zu überschreiben. Sofern sie nicht durch die „producers“-Konfiguration oder einen expliziten Aufruf dieser Methoden (wie im folgenden Beispiel) überschrieben werden, sind die Standardwerte text/plain für den Inhaltstyp und 2 für den Bereitstellungsmodus.
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> setContentType ( ' application/json ' );
Wenn Sie eine benutzerdefinierte Klasse für einen Produzenten verwenden müssen (die von OldSoundRabbitMqBundleRabbitMqProducer
erben sollte), können Sie die class
verwenden:
...
producers :
upload_picture :
class : MyCustomProducer
connection : default
exchange_options : {name: 'upload-picture', type: direct}
...
Der nächste Teil des Puzzles besteht darin, einen Verbraucher zu haben, der die Nachricht aus der Warteschlange nimmt und sie entsprechend verarbeitet.
Derzeit werden vom Produzenten zwei Ereignisse ausgegeben.
Dieses Ereignis tritt unmittelbar vor der Veröffentlichung der Nachricht ein. Dies ist ein guter Ansatz, um vor dem eigentlichen Senden der Nachricht eine abschließende Protokollierung, Validierung usw. durchzuführen. Eine Beispielimplementierung eines Listeners:
namespace App EventListener ;
use OldSound RabbitMqBundle Event BeforeProducerPublishMessageEvent ;
use Symfony Component EventDispatcher Attribute AsEventListener ;
#[AsEventListener(event: BeforeProducerPublishMessageEvent:: NAME )]
final class AMQPBeforePublishEventListener
{
public function __invoke ( BeforeProducerPublishMessageEvent $ event ): void
{
// Your code goes here
}
}
Dieses Ereignis tritt unmittelbar nach der Veröffentlichung der Nachricht ein. Dies ist ein guter Ansatz, um Bestätigungsprotokolle, Commits usw. durchzuführen, nachdem die Nachricht tatsächlich gesendet wurde. Eine Beispielimplementierung eines Listeners:
namespace App EventListener ;
use OldSound RabbitMqBundle Event AfterProducerPublishMessageEvent ;
use Symfony Component EventDispatcher Attribute AsEventListener ;
#[AsEventListener(event: AfterProducerPublishMessageEvent:: NAME )]
final class AMQPBeforePublishEventListener
{
public function __invoke ( AfterProducerPublishMessageEvent $ event ): void
{
// Your code goes here
}
}
Ein Verbraucher stellt eine Verbindung zum Server her und startet eine Schleife, die auf die Verarbeitung eingehender Nachrichten wartet. Abhängig vom angegebenen Rückruf für einen solchen Verbraucher ist das Verhalten, das er haben wird. Sehen wir uns die Verbraucherkonfiguration von oben an:
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
Wie wir dort sehen, hat die Callback -Option einen Verweis auf einen upload_picture_service . Wenn der Verbraucher eine Nachricht vom Server erhält, führt er einen solchen Rückruf aus. Wenn Sie zu Test- oder Debugzwecken einen anderen Rückruf angeben müssen, können Sie ihn dort ändern.
Neben dem Callback geben wir auch die zu verwendende Verbindung an, genau wie bei einem Producer . Die verbleibenden Optionen sind „ exchange_options“ und „queue_options“ . Die Exchange_Options sollten dieselben sein, die auch für den Producer verwendet werden. In den queue_options geben wir einen Warteschlangennamen an. Warum?
Wie bereits erwähnt, werden Nachrichten in AMQP an einer Börse veröffentlicht. Dies bedeutet nicht, dass die Nachricht eine Warteschlange erreicht hat. Dazu müssen wir zunächst eine solche Warteschlange erstellen und sie dann an die Börse binden. Das Coole daran ist, dass Sie mehrere Warteschlangen an eine Vermittlungsstelle binden können, sodass eine Nachricht an mehreren Zielen ankommen kann. Der Vorteil dieses Ansatzes liegt in der Entkopplung vom Produzenten und Konsumenten. Dem Produzenten ist es egal, wie viele Konsumenten seine Nachrichten verarbeiten. Alles was es braucht ist, dass seine Nachricht beim Server ankommt. Auf diese Weise können wir die Aktionen, die wir jedes Mal ausführen, wenn ein Bild hochgeladen wird, erweitern, ohne dass wir den Code in unserem Controller ändern müssen.
Wie führt man nun einen Verbraucher? Dafür gibt es einen Befehl, der so ausgeführt werden kann:
$ ./app/console rabbitmq:consumer -m 50 upload_picture
Was bedeutet das? Wir führen den Verbraucher upload_picture aus und weisen ihn an, nur 50 Nachrichten zu konsumieren. Jedes Mal, wenn der Verbraucher eine Nachricht vom Server empfängt, führt er den konfigurierten Rückruf aus und übergibt die AMQP-Nachricht als Instanz der Klasse PhpAmqpLibMessageAMQPMessage
. Der Nachrichtentext kann durch Aufruf von $msg->body
abgerufen werden. Standardmäßig verarbeitet der Verbraucher Nachrichten in einer Endlosschleife für eine Definition von „endlos“ .
Wenn Sie sicher sein möchten, dass der Verbraucher die Ausführung bei einem Unix-Signal sofort beendet, können Sie den Befehl mit der Flagge -w
ausführen.
$ ./app/console rabbitmq:consumer -w upload_picture
Dann beendet der Verbraucher die Ausführung sofort.
Um den Befehl mit diesem Flag verwenden zu können, müssen Sie PHP mit der PCNTL-Erweiterung installieren.
Wenn Sie ein Consumer-Speicherlimit festlegen möchten, können Sie dies mit dem Flag -l
tun. Im folgenden Beispiel fügt dieses Flag ein Speicherlimit von 256 MB hinzu. Der Consumer wird fünf MB vor Erreichen von 256 MB gestoppt, um einen Fehler bei der von PHP zugelassenen Speichergröße zu vermeiden.
$ ./app/console rabbitmq:consumer -l 256
Wenn Sie alle in einer Warteschlange wartenden Nachrichten entfernen möchten, können Sie diesen Befehl ausführen, um diese Warteschlange zu leeren:
$ ./app/console rabbitmq:purge --no-confirmation upload_picture
Um die Warteschlange des Verbrauchers zu löschen, verwenden Sie diesen Befehl:
$ ./app/console rabbitmq:delete --no-confirmation upload_picture
Dies kann in vielen Szenarien nützlich sein. Es gibt 3 AMQPEvents:
class OnConsumeEvent extends AMQPEvent
{
const NAME = AMQPEvent:: ON_CONSUME ;
/**
* OnConsumeEvent constructor.
*
* @param Consumer $consumer
*/
public function __construct ( Consumer $ consumer )
{
$ this -> setConsumer ( $ consumer );
}
}
Nehmen s say you need to sleep / stop consumer/s on a new application deploy. You can listen for
und nach neuen Anwendungsbereitstellungen suchen.
class BeforeProcessingMessageEvent extends AMQPEvent
{
const NAME = AMQPEvent:: BEFORE_PROCESSING_MESSAGE ;
/**
* BeforeProcessingMessageEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer , AMQPMessage $ AMQPMessage )
{
$ this -> setConsumer ( $ consumer );
$ this -> setAMQPMessage ( $ AMQPMessage );
}
}
Vor der Verarbeitung einer AMQPMessage
ausgelöstes Ereignis.
class AfterProcessingMessageEvent extends AMQPEvent
{
const NAME = AMQPEvent:: AFTER_PROCESSING_MESSAGE ;
/**
* AfterProcessingMessageEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer , AMQPMessage $ AMQPMessage )
{
$ this -> setConsumer ( $ consumer );
$ this -> setAMQPMessage ( $ AMQPMessage );
}
}
Ereignis, das nach der Verarbeitung einer AMQPMessage
ausgelöst wird. Wenn die Prozessmeldung eine Ausnahme auslöst, wird das Ereignis nicht ausgelöst.
<?php
class OnIdleEvent extends AMQPEvent
{
const NAME = AMQPEvent:: ON_IDLE ;
/**
* OnIdleEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer )
{
$ this -> setConsumer ( $ consumer );
$ this -> forceStop = true ;
}
}
Ereignis wird ausgelöst, wenn die wait
aufgrund einer Zeitüberschreitung beendet wird, ohne dass eine Nachricht empfangen wird. Um dieses Ereignis nutzen zu können, muss ein Consumer- idle_timeout
konfiguriert werden. Standardmäßig wird der Prozess bei Leerlauf-Timeout beendet. Sie können dies verhindern, indem Sie $event->setForceStop(false)
in einem Listener festlegen.
Wenn Sie eine Zeitüberschreitung festlegen müssen, wenn während eines bestimmten Zeitraums keine Nachrichten aus Ihrer Warteschlange eingehen, können Sie den idle_timeout
in Sekunden festlegen. Der idle_timeout_exit_code
gibt an, welcher Exit-Code vom Verbraucher zurückgegeben werden soll, wenn das Leerlauf-Timeout auftritt. Ohne Angabe löst der Verbraucher eine PhpAmqpLibExceptionAMQPTimeoutException
-Ausnahme aus.
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
idle_timeout : 60
idle_timeout_exit_code : 0
Legen Sie den timeout_wait
in Sekunden fest. timeout_wait
gibt an, wie lange der Verbraucher wartet, ohne eine neue Nachricht zu erhalten, bevor er sicherstellt, dass die aktuelle Verbindung noch gültig ist.
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
idle_timeout : 60
idle_timeout_exit_code : 0
timeout_wait : 10
Wenn Sie möchten, dass Ihr Verbraucher bis zu einer bestimmten Zeit läuft und dann ordnungsgemäß beendet wird, legen Sie graceful_max_execution.timeout
in Sekunden fest. „Anmutig beenden“ bedeutet, dass der Verbraucher entweder nach der aktuell ausgeführten Aufgabe oder sofort, wenn er auf neue Aufgaben wartet, beendet wird. Der graceful_max_execution.exit_code
gibt an, welcher Exit-Code vom Verbraucher zurückgegeben werden soll, wenn das zulässige maximale Ausführungszeitlimit eintritt. Ohne Angabe wird der Verbraucher mit dem Status 0
beendet.
Diese Funktion eignet sich hervorragend in Verbindung mit supervisord, da sie zusammen die periodische Bereinigung von Speicherlecks, die Verbindung mit der Datenbank/RabbitMQ-Erneuerung und vieles mehr ermöglichen kann.
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
graceful_max_execution :
timeout : 1800 # 30 minutes
exit_code : 10 # default is 0
Möglicherweise ist Ihnen aufgefallen, dass das Dispatching immer noch nicht genau so funktioniert, wie wir es möchten. Wenn beispielsweise in einer Situation mit zwei Arbeitern alle ungeraden Nachrichten schwer und die geraden Nachrichten leicht sind, ist ein Arbeiter ständig beschäftigt und der andere wird kaum arbeiten. Nun, RabbitMQ weiß davon nichts und wird die Nachrichten trotzdem gleichmäßig versenden.
Dies liegt daran, dass RabbitMQ eine Nachricht nur dann versendet, wenn die Nachricht in die Warteschlange gelangt. Die Anzahl der unbestätigten Nachrichten für einen Verbraucher wird nicht berücksichtigt. Es sendet einfach blind jede n-te Nachricht an den n-ten Verbraucher.
Um dies zu verhindern, können wir die Methode „basic.qos“ mit der Einstellung „prefetch_count=1“ verwenden. Dadurch wird RabbitMQ angewiesen, einem Arbeiter nicht mehr als eine Nachricht gleichzeitig zu übermitteln. Mit anderen Worten: Versenden Sie keine neue Nachricht an einen Mitarbeiter, bis dieser die vorherige Nachricht verarbeitet und bestätigt hat. Stattdessen wird es an den nächsten Arbeiter weitergeleitet, der noch nicht beschäftigt ist.
Von: http://www.rabbitmq.com/tutorials/tutorial-two-python.html
Seien Sie vorsichtig, da die Implementierung des fairen Dispatchings zu einer Latenz führt, die sich negativ auf die Leistung auswirkt (siehe diesen Blogbeitrag). Durch die Implementierung können Sie jedoch horizontal und dynamisch skalieren, wenn die Warteschlange größer wird. Sie sollten, wie im Blogbeitrag empfohlen, den richtigen Wert von prefetch_size anhand der für die Verarbeitung jeder Nachricht benötigten Zeit und Ihrer Netzwerkleistung bewerten.
Mit RabbitMqBundle können Sie die qos_options pro Verbraucher folgendermaßen konfigurieren:
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
qos_options : {prefetch_size: 0, prefetch_count: 1, global: false}
Bei Verwendung mit Symfony 4.2+ deklariert das Bundle im Container eine Reihe von Aliasen für Produzenten und reguläre Verbraucher. Diese werden für die automatische Verknüpfung von Argumenten basierend auf dem deklarierten Typ und dem Argumentnamen verwendet. Auf diese Weise können Sie das vorherige Produzentenbeispiel wie folgt ändern:
public function indexAction ( $ name , ProducerInterface $ uploadPictureProducer )
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ uploadPictureProducer -> publish ( serialize ( $ msg ));
}
Der Name des Arguments wird aus dem Produzenten- oder Konsumentennamen der Konfiguration erstellt und je nach Typ mit einem Produzenten- oder Konsumentenwort angehängt. Im Gegensatz zur Benennungskonvention für Containerelemente wird das Wortsuffix (Produzent oder Verbraucher) nicht dupliziert, wenn der Name bereits angehängt ist. upload_picture
Produzentenschlüssel wird in den Argumentnamen $uploadPictureProducer
geändert. Der Produzentenschlüssel upload_picture_producer
würde auch als Alias für den Argumentnamen $uploadPictureProducer
verwendet. Es ist am besten, auf diese Weise ähnliche Namen zu vermeiden.
Alle Produzenten werden in der Konfiguration mit dem Alias OldSoundRabbitMqBundleRabbitMqProducerInterface
und der Producer-Klassenoption versehen. Im Sandbox-Modus werden nur ProducerInterface
Aliase erstellt. Es wird dringend empfohlen, ProducerInterface
Klasse zu verwenden, wenn Argumente für die Produzenteninjektion eingegeben werden.
Alle Verbraucher haben den Alias OldSoundRabbitMqBundleRabbitMqConsumerInterface
und den Konfigurationsoptionswert %old_sound_rabbit_mq.consumer.class%
. Es gibt keinen Unterschied zwischen regulärem und Sandbox-Modus. Es wird dringend empfohlen, ConsumerInterface
zu verwenden, wenn Sie Argumente für die Client-Injection eingeben.
Hier ist ein Beispiel für einen Rückruf:
<?php
//src/Acme/DemoBundle/Consumer/UploadPictureConsumer.php
namespace Acme DemoBundle Consumer ;
use OldSound RabbitMqBundle RabbitMq ConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class UploadPictureConsumer implements ConsumerInterface
{
public function execute ( AMQPMessage $ msg )
{
//Process picture upload.
//$msg will be an instance of `PhpAmqpLibMessageAMQPMessage` with the $msg->body being the data sent over RabbitMQ.
$ isUploadSuccess = someUploadPictureMethod ();
if (! $ isUploadSuccess ) {
// If your image upload failed due to a temporary error you can return false
// from your callback so the message will be rejected by the consumer and
// requeued by RabbitMQ.
// Any other value not equal to false will acknowledge the message and remove it
// from the queue
return false ;
}
}
}
Wie Sie sehen, ist dies so einfach wie die Implementierung einer Methode: ConsumerInterface::execute .
Beachten Sie, dass Ihre Rückrufe als normale Symfony-Dienste registriert werden müssen . Dort können Sie den Dienstcontainer, den Datenbankdienst, den Symfony-Logger usw. einbinden.
Weitere Informationen darüber, was Teil einer Nachrichteninstanz ist, finden Sie unter https://github.com/php-amqplib/php-amqplib/blob/master/doc/AMQPMessage.md.
Um den Verbraucher zu stoppen, kann der Rückruf StopConsumerException
(die zuletzt konsumierte Nachricht wird nicht bestätigt) oder AckStopConsumerException
(die Nachricht wird bestätigt) auslösen. Bei Verwendung von dämonisiert, z. B. Supervisor, wird der Verbraucher tatsächlich neu gestartet.
Das allein für das Versenden von Nachrichten scheint ziemlich viel Arbeit zu sein. Fassen wir es noch einmal zusammen, um einen besseren Überblick zu bekommen. Das brauchen wir, um Nachrichten zu produzieren/konsumieren:
Und das ist es!
Dies war eine Voraussetzung für die Rückverfolgbarkeit der empfangenen/veröffentlichten Nachrichten. Um dies zu aktivieren, müssen Sie die Konfiguration enable_logger
zu Verbrauchern oder Herausgebern hinzufügen.
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
enable_logger : true
Wenn Sie möchten, können Sie die Protokollierung aus Warteschlangen auch mit unterschiedlichen Handlern in Monolog behandeln, indem Sie auf den Kanal phpamqplib
verweisen.
Bisher haben wir nur Nachrichten an Verbraucher gesendet, aber was ist, wenn wir eine Antwort von ihnen erhalten möchten? Um dies zu erreichen, müssen wir RPC-Aufrufe in unsere Anwendung implementieren. Dieses Bundle macht es ziemlich einfach, solche Dinge mit Symfony zu erreichen.
Fügen wir der Konfiguration einen RPC-Client und -Server hinzu:
rpc_clients :
integer_store :
connection : default # default: default
unserializer : json_decode # default: unserialize
lazy : true # default: false
direct_reply_to : false
rpc_servers :
random_int :
connection : default
callback : random_int_server
qos_options : {prefetch_size: 0, prefetch_count: 1, global: false}
exchange_options : {name: random_int, type: topic}
queue_options : {name: random_int_queue, durable: false, auto_delete: true}
serializer : json_encode
Für eine vollständige Konfigurationsreferenz verwenden Sie bitte den php app/console config:dump-reference old_sound_rabbit_mq
.
Hier haben wir einen sehr nützlichen Server: Er gibt zufällige Ganzzahlen an seine Clients zurück. Der zur Verarbeitung der Anfrage verwendete Rückruf ist der Dienst random_int_server . Sehen wir uns nun an, wie wir es von unseren Controllern aus aufrufen.
Zuerst müssen wir den Server über die Befehlszeile starten:
$ ./app/console_dev rabbitmq:rpc-server random_int
Und fügen Sie dann den folgenden Code zu unserem Controller hinzu:
public function indexAction ( $ name )
{
$ client = $ this -> get ( ' old_sound_rabbit_mq.integer_store_rpc ' );
$ client -> addRequest ( serialize ( array ( ' min ' => 0 , ' max ' => 10 )), ' random_int ' , ' request_id ' );
$ replies = $ client -> getReplies ();
}
Wie Sie dort sehen können, lautet der Dienstname old_sound_rabbit_mq.integer_store_rpc , wenn unsere Client-ID integer_store lautet. Sobald wir dieses Objekt erhalten haben, stellen wir eine Anfrage an den Server, indem wir addRequest
aufrufen, das drei Parameter erwartet:
Die Argumente, die wir senden, sind die Mindest- und Höchstwerte für die Funktion rand()
. Wir versenden sie, indem wir ein Array serialisieren. Wenn unser Server JSON-Informationen oder XML erwartet, senden wir diese Daten hierher.
Der letzte Schritt besteht darin, die Antwort zu erhalten. Unser PHP-Skript blockiert, bis der Server einen Wert zurückgibt. Die Variable „$replies“ ist ein assoziatives Array, in dem jede Antwort vom Server im entsprechenden Schlüssel „request_id“ enthalten ist.
Standardmäßig erwartet der RPC-Client, dass die Antwort serialisiert wird. Wenn der Server, mit dem Sie arbeiten, ein nicht serialisiertes Ergebnis zurückgibt, setzen Sie die Option expect_serialized_response
des RPC-Clients auf „false“. Wenn der integer_store -Server beispielsweise das Ergebnis nicht serialisiert, würde der Client wie folgt eingestellt:
rpc_clients :
integer_store :
connection : default
expect_serialized_response : false
Sie können für die Anfrage auch einen Ablauf in Millisekunden festlegen, nach dem die Nachricht nicht mehr vom Server verarbeitet wird und die Client-Anfrage einfach abläuft. Das Festlegen des Ablaufs für Nachrichten funktioniert nur für RabbitMQ 3.x und höher. Weitere Informationen finden Sie unter http://www.rabbitmq.com/ttl.html#per-message-ttl.
public function indexAction ( $ name )
{
$ expiration = 5000 ; // milliseconds
$ client = $ this -> get ( ' old_sound_rabbit_mq.integer_store_rpc ' );
$ client -> addRequest ( $ body , $ server , $ requestId , $ routingKey , $ expiration );
try {
$ replies = $ client -> getReplies ();
// process $replies['request_id'];
} catch ( PhpAmqpLib Exception AMQPTimeoutException $ e ) {
// handle timeout
}
}
Wie Sie sich vorstellen können, können wir auch parallele RPC-Aufrufe durchführen.
Nehmen wir an, Sie müssen zum Rendern einer Webseite zwei Datenbankabfragen durchführen, von denen eine 5 Sekunden und die andere 2 Sekunden dauert – sehr teure Abfragen –. Wenn Sie sie nacheinander ausführen, ist Ihre Seite in etwa 7 Sekunden lieferbereit. Wenn Sie sie parallel ausführen, wird Ihre Seite in etwa 5 Sekunden bereitgestellt. Mit RabbitMqBundle
können wir solche parallelen Aufrufe problemlos durchführen. Definieren wir einen parallelen Client in der Konfiguration und einen weiteren RPC-Server:
rpc_clients :
parallel :
connection : default
rpc_servers :
char_count :
connection : default
callback : char_count_server
random_int :
connection : default
callback : random_int_server
Dann sollte dieser Code in unseren Controller passen:
public function indexAction ( $ name )
{
$ client = $ this -> get ( ' old_sound_rabbit_mq.parallel_rpc ' );
$ client -> addRequest ( $ name , ' char_count ' , ' char_count ' );
$ client -> addRequest ( serialize ( array ( ' min ' => 0 , ' max ' => 10 )), ' random_int ' , ' random_int ' );
$ replies = $ client -> getReplies ();
}
Ist dem vorherigen Beispiel sehr ähnlich, wir haben nur einen zusätzlichen addRequest
Aufruf. Außerdem stellen wir aussagekräftige Anforderungskennungen bereit, damit wir später die gewünschte Antwort leichter im Array $replies finden können.
Um Direktantwort-Clients zu aktivieren, müssen Sie lediglich die Option „direct_reply_to“ in der rpc_clients -Konfiguration für den Client aktivieren.
Diese Option verwendet bei RPC-Aufrufen die Pseudowarteschlange amq.rabbitmq.reply-to . Auf dem RPC-Server sind keine Änderungen erforderlich.
RabbitMQ verfügt ab Version 3.5.0 über eine Prioritätswarteschlangenimplementierung im Kern. Jede Warteschlange kann mithilfe vom Client bereitgestellter optionaler Argumente in eine Prioritätswarteschlange umgewandelt werden (jedoch im Gegensatz zu anderen Funktionen, die optionale Argumente und keine Richtlinien verwenden). Die Implementierung unterstützt eine begrenzte Anzahl von Prioritäten: 255. Werte zwischen 1 und 10 werden empfohlen. Dokumentation prüfen
So können Sie eine Prioritätswarteschlange deklarieren
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture', arguments: {'x-max-priority': ['I', 10]} }
callback : upload_picture_service
Wenn eine Warteschlange upload-picture
vorhanden ist, müssen Sie diese Warteschlange löschen, bevor Sie den Befehl rabbitmq:setup-fabric
ausführen
Nehmen wir nun an, Sie möchten eine Nachricht mit hoher Priorität erstellen. Sie müssen die Nachricht mit diesen zusätzlichen Informationen veröffentlichen
public function indexAction ()
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ additionalProperties = [ ' priority ' => 10 ] ;
$ routing_key = '' ;
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ), $ routing_key , $ additionalProperties );
}
Es empfiehlt sich, zur logischen Trennung viele Warteschlangen zu haben. Bei einem einfachen Verbraucher müssen Sie einen Worker (Verbraucher) pro Warteschlange erstellen, und die Verwaltung kann bei vielen Entwicklungen schwierig sein (vergessen Sie, Ihrer Supervisord-Konfiguration eine Zeile hinzuzufügen?). Dies ist auch für kleine Warteschlangen nützlich, da Sie möglicherweise nicht so viele Mitarbeiter wie Warteschlangen haben möchten und einige Aufgaben neu gruppieren möchten, ohne die Flexibilität und das Trennungsprinzip zu verlieren.
Mit mehreren Verbrauchern können Sie diesen Anwendungsfall bewältigen, indem Sie mehrere Warteschlangen auf demselben Verbraucher abhören.
So können Sie einen Verbraucher mit mehreren Warteschlangen einrichten:
multiple_consumers :
upload :
connection : default
exchange_options : {name: 'upload', type: direct}
queues_provider : queues_provider_service
queues :
upload-picture :
name : upload_picture
callback : upload_picture_service
routing_keys :
- picture
upload-video :
name : upload_video
callback : upload_video_service
routing_keys :
- video
upload-stats :
name : upload_stats
callback : upload_stats
Der Callback wird nun unter jeder Warteschlange angegeben und muss das ConsumerInterface
wie ein einfacher Consumer implementieren. Für jede Warteschlange stehen alle Optionen der queues-options
im Consumer zur Verfügung.
Beachten Sie, dass sich alle Warteschlangen unter derselben Vermittlungsstelle befinden. Es liegt an Ihnen, die richtige Weiterleitung für Rückrufe festzulegen.
Der queues_provider
ist ein optionaler Dienst, der Warteschlangen dynamisch bereitstellt. Es muss QueuesProviderInterface
implementieren.
Beachten Sie, dass Warteschlangenanbieter für die ordnungsgemäßen Aufrufe von setDequeuer
verantwortlich sind und dass Rückrufe aufrufbar sind (nicht ConsumerInterface
). Falls der Dienst, der Warteschlangen bereitstellt, DequeuerAwareInterface
implementiert, wird ein Aufruf von setDequeuer
zur Definition des Dienstes hinzugefügt, wobei ein DequeuerInterface
derzeit ein MultipleConsumer
ist.
Möglicherweise verfügt Ihre Anwendung über einen komplexen Workflow und Sie benötigen eine beliebige Bindung. Zu beliebigen Bindungsszenarien können Exchange-to-Exchange-Bindungen über die Eigenschaft destination_is_exchange
gehören.
bindings :
- {exchange: foo, destination: bar, routing_key: 'baz.*' }
- {exchange: foo1, destination: foo, routing_key: 'baz.*', destination_is_exchange: true}
Der Befehl Rabbitmq:setup-fabric deklariert Austausche und Warteschlangen wie in Ihren Producer-, Consumer- und Multi-Consumer-Konfigurationen definiert, bevor er Ihre beliebigen Bindungen erstellt. Allerdings deklariert die Rabbitmq:setup-fabric KEINE zusätzlichen Warteschlangen und Austausche, die in den Bindungen definiert sind. Es liegt an Ihnen, sicherzustellen, dass Austausche/Warteschlangen deklariert werden.
Manchmal muss man die Konfiguration des Verbrauchers spontan ändern. Mit dynamischen Verbrauchern können Sie die Optionen der Verbraucherwarteschlange programmgesteuert basierend auf dem Kontext definieren.
Beispielsweise in einem Szenario, in dem der definierte Verbraucher für eine dynamische Anzahl von Themen verantwortlich sein muss und Sie seine Konfiguration nicht jedes Mal ändern möchten (oder können).
Definieren Sie einen Dienst queue_options_provider
, der QueueOptionsProviderInterface
implementiert, und fügen Sie ihn Ihrer Konfiguration dynamic_consumers
hinzu.
dynamic_consumers :
proc_logs :
connection : default
exchange_options : {name: 'logs', type: topic}
callback : parse_logs_service
queue_options_provider : queue_options_provider_service
Beispielverwendung:
$ ./app/console rabbitmq:dynamic-consumer proc_logs server1
In diesem Fall wird der proc_logs
Consumer für server1
ausgeführt und kann über die von ihm verwendeten Warteschlangenoptionen entscheiden.
Warum werden wir nun jemals anonyme Verbraucher brauchen? Das klingt nach einer Bedrohung aus dem Internet oder so etwas … Lesen Sie weiter.
In AMQP gibt es eine Art Austausch namens Topic , bei dem die Nachrichten basierend auf – Sie vermuten – dem Thema der Nachricht an Warteschlangen weitergeleitet werden. Wir können Protokolle über unsere Anwendung an einen RabbiMQ-Themenaustausch senden und dabei als Thema den Hostnamen verwenden, auf dem das Protokoll erstellt wurde, und den Schweregrad dieses Protokolls. Der Nachrichtentext wird der Protokollinhalt sein und unsere Routing-Schlüssel werden wie folgt aussehen:
Da wir die Warteschlangen nicht mit unbegrenzten Protokollen füllen möchten, können wir zum Überwachen des Systems einen Verbraucher starten, der eine Warteschlange erstellt und sich beispielsweise basierend auf einem bestimmten Thema an den Protokollaustausch anschließt , möchten wir alle von unseren Servern gemeldeten Fehler sehen. Der Routing-Schlüssel sieht etwa so aus: #.error . In einem solchen Fall müssen wir uns einen Warteschlangennamen ausdenken, ihn an die Börse binden, die Protokolle abrufen, die Bindung aufheben und die Warteschlange löschen. Glücklicherweise bietet AMPQ eine Möglichkeit, dies automatisch zu tun, wenn Sie beim Deklarieren und Binden der Warteschlange die richtigen Optionen angeben. Das Problem ist, dass Sie sich nicht alle diese Optionen merken möchten. Aus diesem Grund haben wir das Muster „Anonymer Verbraucher“ implementiert.
Wenn wir einen anonymen Verbraucher starten, kümmert er sich um solche Details und wir müssen nur darüber nachdenken, den Rückruf zu implementieren, wenn die Nachrichten eintreffen. Heißt es „Anonymous“, weil es keinen Warteschlangennamen angibt, sondern darauf wartet, dass RabbitMQ ihm einen zufälligen Namen zuweist?
Wie kann man nun einen solchen Verbraucher konfigurieren und ausführen?
anon_consumers :
logs_watcher :
connection : default
exchange_options : {name: 'app-logs', type: topic}
callback : logs_watcher
Dort geben wir den Exchange-Namen und dessen Typ sowie den Callback an, der beim Eintreffen einer Nachricht ausgeführt werden soll.
Dieser anonyme Verbraucher ist nun in der Lage, Produzenten zuzuhören, die mit derselben Börse und demselben Thema verknüpft sind:
producers :
app_logs :
connection : default
exchange_options : {name: 'app-logs', type: topic}
Um einen anonymen Verbraucher zu starten, verwenden wir den folgenden Befehl:
$ ./app/console_dev rabbitmq:anon-consumer -m 5 -r ' #.error ' logs_watcher
Die einzige neue Option im Vergleich zu den Befehlen, die wir zuvor gesehen haben, ist diejenige, die den Routing-Schlüssel angibt: -r '#.error'
.
In manchen Fällen möchten Sie einen Stapel von Nachrichten abrufen und diese dann alle verarbeiten. Mit Batch-Consumern können Sie Logik für diese Art der Verarbeitung definieren.
Beispiel: Stellen Sie sich vor, Sie haben eine Warteschlange, in der Sie eine Nachricht zum Einfügen einiger Informationen in die Datenbank erhalten, und stellen fest, dass es viel besser ist, eine Stapeleinfügung durchzuführen, als sie einzeln einzufügen.
Definieren Sie einen Rückrufdienst, der BatchConsumerInterface
implementiert, und fügen Sie die Definition des Verbrauchers zu Ihrer Konfiguration hinzu.
batch_consumers :
batch_basic_consumer :
connection : default
exchange_options : {name: 'batch', type: fanout}
queue_options : {name: 'batch'}
callback : batch.basic
qos_options : {prefetch_size: 0, prefetch_count: 2, global: false}
timeout_wait : 5
auto_setup_fabric : false
idle_timeout_exit_code : -2
keep_alive : false
graceful_max_execution :
timeout : 60
Hinweis : Wenn die Option keep_alive
auf true
gesetzt ist, wird idle_timeout_exit_code
ignoriert und der Verbraucherprozess wird fortgesetzt.
Sie können einen Batch-Consumer implementieren, der alle Nachrichten auf einmal bestätigt, oder Sie können steuern, welche Nachricht bestätigt werden soll.
namespace AppBundle Service ;
use OldSound RabbitMqBundle RabbitMq BatchConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class DevckBasicConsumer implements BatchConsumerInterface
{
/**
* @inheritDoc
*/
public function batchExecute ( array $ messages )
{
echo sprintf ( ' Doing batch execution%s ' , PHP_EOL );
foreach ( $ messages as $ message ) {
$ this -> executeSomeLogicPerMessage ( $ message );
}
// you ack all messages got in batch
return true ;
}
}
namespace AppBundle Service ;
use OldSound RabbitMqBundle RabbitMq BatchConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class DevckBasicConsumer implements BatchConsumerInterface
{
/**
* @inheritDoc
*/
public function batchExecute ( array $ messages )
{
echo sprintf ( ' Doing batch execution%s ' , PHP_EOL );
$ result = [];
/** @var AMQPMessage $message */
foreach ( $ messages as $ message ) {
$ result [ $ message -> getDeliveryTag ()] = $ this -> executeSomeLogicPerMessage ( $ message );
}
// you ack only some messages that have return true
// e.g:
// $return = [
// 1 => true,
// 2 => true,
// 3 => false,
// 4 => true,
// 5 => -1,
// 6 => 2,
// ];
// The following will happen:
// * ack: 1,2,4
// * reject and requeq: 3
// * nack and requeue: 6
// * reject and drop: 5
return $ result ;
}
}
So führen Sie den folgenden Batch-Consumer aus:
$ ./bin/console rabbitmq:batch:consumer batch_basic_consumer -w
Wichtig: Für BatchConsumers ist die Option -m|messages
nicht verfügbar. Wichtig: Für BatchConsumers ist auch die Option -b|batches
verfügbar, wenn Sie nur eine bestimmte Anzahl von Batches konsumieren und den Consumer dann stoppen möchten. Geben Sie die Anzahl der Batches nur an, wenn Sie möchten, dass der Verbraucher stoppt, nachdem diese Batch-Nachrichten verbraucht wurden!
Es gibt einen Befehl, der Daten aus STDIN liest und sie in einer RabbitMQ-Warteschlange veröffentlicht. Um es zunächst zu verwenden, müssen Sie in Ihrer Konfigurationsdatei einen producer
wie folgt konfigurieren:
producers :
words :
connection : default
exchange_options : {name: 'words', type: direct}
Dieser Produzent wird Nachrichten zum words
Direktaustausch veröffentlichen. Natürlich können Sie die Konfiguration nach Ihren Wünschen anpassen.
Nehmen wir dann an, Sie möchten den Inhalt einiger XML-Dateien veröffentlichen, damit sie von einer Verbraucherfarm verarbeitet werden. Sie könnten sie veröffentlichen, indem Sie einfach einen Befehl wie diesen verwenden:
$ find vendor/symfony/ -name " *.xml " -print0 | xargs -0 cat | ./app/console rabbitmq:stdin-producer words
Dies bedeutet, dass Sie Produzenten mit einfachen Unix-Befehlen erstellen können.
Lassen Sie uns diesen einen Liner zerlegen:
$ find vendor/symfony/ -name " *.xml " -print0
Dieser Befehl findet alle .xml
Dateien im Symfony-Ordner und gibt den Dateinamen aus. Jeder dieser Dateinamen wird dann über xargs
an cat
weitergeleitet :
$ xargs -0 cat
Und schließlich geht die Ausgabe von cat
direkt an unseren Produzenten, der wie folgt aufgerufen wird:
$ ./app/console rabbitmq:stdin-producer words
Es benötigt nur ein Argument, nämlich den Namen des Produzenten, wie Sie ihn in Ihrer config.yml
Datei konfiguriert haben.
Der Zweck dieses Pakets besteht darin, dass Ihre Anwendung Nachrichten erstellen und diese an einigen von Ihnen konfigurierten Börsen veröffentlichen kann.
In einigen Fällen und selbst wenn Ihre Konfiguration richtig ist, werden die von Ihnen erstellten Nachrichten nicht an eine Warteschlange weitergeleitet, da keine vorhanden ist. Damit die Warteschlange erstellt werden kann, muss der für den Warteschlangenverbrauch verantwortliche Verbraucher ausgeführt werden.
Das Starten eines Befehls für jeden Verbraucher kann ein Albtraum sein, wenn die Anzahl der Verbraucher hoch ist.
Um Austausche, Warteschlangen und Bindungen gleichzeitig zu erstellen und sicherzustellen, dass keine Nachrichten verloren gehen, können Sie den folgenden Befehl ausführen:
$ ./app/console rabbitmq:setup-fabric
Bei Bedarf können Sie Ihre Verbraucher und Produzenten so konfigurieren, dass sie davon ausgehen, dass die RabbitMQ-Fabric bereits definiert ist. Fügen Sie dazu Folgendes zu Ihrer Konfiguration hinzu:
producers :
upload_picture :
auto_setup_fabric : false
consumers :
upload_picture :
auto_setup_fabric : false
Standardmäßig deklariert ein Verbraucher oder Produzent beim Start alles, was er mit RabbitMQ benötigt. Gehen Sie dabei vorsichtig vor, da es zu Fehlern kommt, wenn Austausche oder Warteschlangen nicht definiert sind. Wenn Sie eine Konfiguration geändert haben, müssen Sie den obigen Setup-Fabric-Befehl ausführen, um Ihre Konfiguration zu deklarieren.
Um einen Beitrag zu leisten, öffnen Sie einfach eine Pull-Anfrage mit Ihrem neuen Code. Beachten Sie dabei, dass Sie in dieser README-Datei dokumentieren müssen, was sie tun, wenn Sie neue Funktionen hinzufügen oder bestehende ändern. Wenn Sie BC brechen, müssen Sie dies ebenfalls dokumentieren. Außerdem müssen Sie das CHANGELOG aktualisieren. Also:
Siehe: resources/meta/LICENSE.md
Der Bundle-Aufbau und die Dokumentation basieren teilweise auf dem RedisBundle