Diese Bibliothek ist eine reine PHP- Implementierung des AMQP 0-9-1-Protokolls. Es wurde gegen Rabbitmq getestet.
Die Bibliothek wurde für die PHP -Beispiele von Rabbitmq in Aktion und den offiziellen Rabbitmq -Tutorials verwendet.
Bitte beachten Sie, dass dieses Projekt mit einem Verhaltenskodex von Mitwirkenden veröffentlicht wird. Wenn Sie an diesem Projekt teilnehmen, erklären Sie sich damit einverstanden, sich an seine Bedingungen einzuhalten.
Vielen Dank an Videlalvaro und Postalservice14 für die Erstellung von php-amqplib
.
Das Paket wird nun von Ramūnas Dronga, Luke Bakken und mehreren VMware -Ingenieuren unterhalten, die auf Rabbitmq arbeiten.
Beginnend mit Version 2.0 verwendet diese Bibliothek standardmäßig AMQP 0.9.1
und erfordert somit Rabbitmq 2.0 oder später. Normalerweise benötigen Server -Upgrades keine Änderungen des Anwendungscode, da das Protokoll sehr selten ändert, aber bitte Ihre eigenen Tests vor dem Upgrade durchführen.
Da die Bibliothek AMQP 0.9.1
verwendet, haben wir Unterstützung für die folgenden Rabbitmq -Erweiterungen hinzugefügt:
Austausch, um Bindungen auszutauschen
Basic Nack
Verlag bestätigt
Verbraucherabbrechen benachrichtigen
Erweiterungen, die vorhandene Methoden wie alternate exchanges
ändern, werden ebenfalls unterstützt.
Enqueue/AMQP-Lib ist ein AMQP-Interop-kompatibler Wrapper.
AmqProxy ist eine Proxy -Bibliothek mit Verbindungs- und Kanalpooling/Wiederverwendung. Dies ermöglicht eine niedrigere Verbindung und Kanalwanderung bei der Verwendung von PHP-AMQPLIB, was zu einer geringeren CPU-Verwendung von Rabbitmq führt.
Stellen Sie sicher, dass der Komponist installiert ist, und führen Sie den folgenden Befehl aus:
$ Composer benötigt PHP-AMQPLIB/PHP-AMQPLIB
Dadurch wird die Bibliothek und ihre Abhängigkeiten in Ihrem Lieferantenordner abgerufen. Dann können Sie Ihren .php -Dateien Folgendes hinzufügen, um die Bibliothek zu verwenden
required_once __dir __. '/vendor/autoload.php';
Dann müssen Sie die entsprechenden Klassen use
, z. B.:
Verwenden Sie PHPAMQPLIBConnectionAmqpStreamConnection; Verwenden Sie PhpaMQPlibMessageamqpMessage;
Wenn Rabbitmq zwei Terminals öffnen und auf der ersten die folgenden Befehle ausführen, um den Verbraucher zu starten:
$ CD PHP-AMQPLIB/Demo $ PHP AMQP_CONSUMER.PHP
Dann auf dem anderen Terminal:
$ CD PHP-AMQPLIB/Demo $ php amqp_publisher.php Ein paar Text zur Veröffentlichung
Sie sollten die Nachricht sehen, die zum Prozess am anderen Terminal ankommt
Um den Verbraucher zu stoppen, senden Sie ihm die quit
an ihn:
$ PHP amqp_publisher.php Quit
Wenn Sie die Sockets anhören müssen, die zur Verbindung zu Rabbitmq verwendet werden, sehen Sie das Beispiel im nicht blockierenden Verbraucher.
$ PHP AMQP_CONSUMER_NON_BLOCKING.PHP
Weitere Informationen finden Sie in letzter Zeit, was in letzter Zeit geändert wurde.
http://php-amqplib.github.io/php-amqplib/
Um uns nicht zu wiederholen, wenn Sie mehr über diese Bibliothek erfahren möchten, erhalten Sie bitte die offiziellen Rabbitmq -Tutorials.
amqp_ha_consumer.php
: Demos die Verwendung von gespiegelten Warteschlangen.
amqp_consumer_exclusive.php
und amqp_publisher_exclusive.php
: Demos Fanout -Börsen mit exklusiven Warteschlangen.
amqp_consumer_fanout_{1,2}.php
und amqp_publisher_fanout.php
: Demos Fanout -Börsen mit benannten Warteschlangen.
amqp_consumer_pcntl_heartbeat.php
: DEMOS-Signal-basierter Heartbeat-Absender.
basic_get.php
: Demos, das Nachrichten aus den Warteschlangen erhalten, indem sie den Basic Get AMQP -Aufruf verwenden.
Wenn Sie eine Gruppe mehrerer Knoten haben, mit denen sich Ihre Anwendung verbinden kann, können Sie eine Verbindung mit einer Reihe von Hosts starten. Dazu sollten Sie die statische Methode create_connection
verwenden.
Zum Beispiel:
$ connection = amqpstreamConnection :: create_connection ([[ ['host' => host1, 'port' => port, 'user' => user, 'password' => pass, 'vhost' => vhost], ['host' => host2, 'port' => port, 'user' => user, 'password' => pass, 'vhost' => vhost] ], $ option);
Dieser Code versucht zuerst eine Verbindung zu HOST1
und eine Verbindung zu HOST2
herzustellen, wenn die erste Verbindung fehlschlägt. Die Methode gibt ein Verbindungsobjekt für die erste erfolgreiche Verbindung zurück. Sollte alle Verbindungen scheitern, wird die Ausnahme vom letzten Verbindungsversuch ausgelöst.
Weitere Beispiele finden Sie demo/amqp_connect_multiple_hosts.php
.
Nehmen wir an, Sie haben einen Prozess, der eine Reihe von Nachrichten generiert, die mit demselben routing_key
und Optionen wie mandatory
an denselben exchange
veröffentlicht werden. Anschließend können Sie die Feature batch_basic_publish
Library verwenden. Sie können solche Nachrichten stapeln:
$ msg = new amqpMessage ($ msg_body); $ ch-> batch_basic_publish ($ msg, $ exchange); $ msg2 = new amqpMessage ($ msg_body); $ ch-> batch_basic_publish ($ msg2, $ Exchange);
Und dann schicken Sie die Charge wie diese:
$ ch-> publish_batch ();
Angenommen, unser Programm muss aus einer Datei lesen und dann eine Nachricht pro Zeile veröffentlichen. Abhängig von der Nachrichtengröße müssen Sie entscheiden, wann es besser ist, die Charge zu senden. Sie könnten es alle 50 Nachrichten oder alle hundert senden. Das liegt an dir.
Eine andere Möglichkeit, Ihre Nachrichtenveröffentlichung zu beschleunigen, besteht darin, die AMQPMessage
-Nachrichteninstanzen wiederzuverwenden. Sie können Ihre neue Nachricht wie folgt erstellen:
$ properties = array ('content_type' => 'text/plain', 'lieferung_mode' => amqpMessage :: lieferung_mode_persist); Austausch);
Nehmen wir nun an, dass Sie zwar die Nachricht für zukünftige Nachrichten ändern möchten, die gleichen Eigenschaften, das heißt, Ihre Nachrichten weiterhin text/plain
und der delivery_mode
sind immer noch AMQPMessage::DELIVERY_MODE_PERSISTENT
. Wenn Sie für jede veröffentlichte Nachricht eine neue AMQPMessage
Instanz erstellen, müssten diese Eigenschaften im AMQP-Binärformat neucodiert werden. Sie könnten all das vermeiden, indem Sie nur die AMQPMessage
wiederverwenden und dann den Meldungskörper wie folgt zurücksetzen:
$ msg-> setbody ($ body2); $ ch-> basic_publish ($ msg, $ Exchange);
AMQP legt die Größe der Nachrichten keine Begrenzung vor. Wenn eine sehr große Nachricht von einem Verbraucher empfangen wird, kann in der Bibliothek das Speicherlimit von PHP erreicht werden, bevor der Rückruf an basic_consume
aufgerufen wird.
Um dies zu vermeiden, können Sie die Methode AMQPChannel::setBodySizeLimit(int $bytes)
auf Ihrer Kanalinstanz aufrufen. Körpergrößen, die diese Grenze überschreiten, werden abgeschnitten und mit einem AMQPMessage::$is_truncated
Flag, der auf true
eingestellt ist, geliefert. Die Eigenschaft AMQPMessage::$body_size
spiegelt die wahre Körpergröße einer empfangenen Nachricht wider, die höher ist als strlen(AMQPMessage::getBody())
wenn die Nachricht abgeschnitten wurde.
Beachten Sie, dass alle Daten über dem Grenze aus dem AMQP -Kanal gelesen und sofort verworfen werden. Es gibt also keine Möglichkeit, ihn in Ihrem Rückruf abzurufen. Wenn Sie einen anderen Verbraucher haben, der Nachrichten mit größeren Nutzlasten verarbeiten kann, können Sie basic_reject
oder basic_nack
verwenden, um den Server (der noch eine vollständige Kopie enthält) an einen toten Buchstaben zu leiten.
Standardmäßig tritt keine Kürzung auf. Um die Kürzung auf einem Kanal zu deaktivieren, der es aktiviert hat, übergeben Sie 0
(oder null
) an AMQPChannel::setBodySizeLimit()
.
Einige RabbitMQ -Clients verwenden automatisierte Verbindungswiederherstellungsmechanismen, um Kanäle und Verbraucher im Falle von Netzwerkfehlern wiederherzustellen und wiederherzustellen.
Da dieser Client ein Einzel-Thread verwendet, können Sie die Verbindungswiederherstellung mit dem Ausnahmebehandlungsmechanismus einrichten.
Ausnahmen, die bei Verbindungsfehlern geworfen werden können:
PHPAMQPLIBExceptionAmqpConnectionCLEOSEDExceptionPAMQPLIBExceptionAmQPIOExceptionRuntimexceptionErRorexception
Einige andere Ausnahmen könnten geworfen werden, aber die Verbindung kann immer noch da sein. Es ist immer eine gute Idee, eine alte Verbindung bei der Behandlung einer Ausnahme vor dem Wiederverbinden aufzuräumen.
Zum Beispiel, wenn Sie eine Wiederherstellungsverbindung einrichten möchten:
$ connection = null; $ Channel = null; while (true) {try {$ connection = new amqpstreamConnection (Host, Port, Benutzer, Pass, Vhost); // Ihr Anwendungscode geht hier.do_Sometht_with_Connection ($ connection); } catch (amqpruntimeexception $ e) {echo $ e-> getMessage (); CleanUp_Connection ($ connection); usleep (wait_before_reconnect_us); } catch (runTimeException $ e) {CleanUp_Connection ($ connection); usleep (wait_before_reconnect_us); } catch (errorexception $ e) {CleanUp_Connection ($ connection); usleep (wait_before_reconnect_us); } }
Ein vollständiges Beispiel finden Sie in demo/connection_recovery_consume.php
.
Dieser Code verbindet den Anwendungscode jedes Mal, wenn die Ausnahme eintritt. Einige Ausnahmen können weiterhin geworfen werden und sollten nicht als Teil des Wiederverbindungsprozesses behandelt werden, da sie möglicherweise Anwendungsfehler sind.
Dieser Ansatz ist für Verbraucheranwendungen hauptsächlich sinnvoll. Die Hersteller benötigen einen zusätzlichen Anwendungscode, um die Veröffentlichung derselben Nachricht mehrmals zu vermeiden.
Dies war ein einfachste Beispiel. In einer realen Anwendung möchten Sie möglicherweise die Retrum-Anzahl steuern und möglicherweise die Wartezeit für die Wiederverbindung abbauen.
Sie können in #444 ein übermäßiges Beispiel finden
Wenn Sie die PCNNTL -Erweiterung des Signals installiert haben, wird das Versand von Signalen behandelt, wenn der Verbraucher keine Meldung verarbeitet.
$ pcntlhandler = function ($ signal) {switch ($ signal) {case sigterm: case sigusr1: case sigint: // Einige Dinge vor Stoppen der Verbraucher, z. // HandlerPoSix_kill (posix_getPid (), $ signal) restaure; // Selbst mit Signal töten, siehe https://www.cons.org/cracauer/sigint.htmlcase seufz: // Einige Dinge zum Neustarten von Consumerbreak; Standard: // nichts tun} }; pcntl_signal (sigterm, $ pcntlHandler); pcntl_signal (sigint, $ pcntlHandler); pcntl_signal (sigusr1, $ pcntlHandler); pcntl_signal (Sighup, $ pcntlhandler);
true
diese AMQP_WITHOUT_SIGNALS
zu deaktivieren
<? phpDefine ('amqp_without_signals', true); ... mehr Code
Wenn Sie die PCNTL-Erweiterung installiert haben und PHP 7.1 oder höher verwenden, können Sie einen signifikanten Herzschlag-Absender registrieren.
<? php $ sender = new pcntlheartbeatsender ($ connection); $ sender-> register ();
Wenn Sie wissen möchten, was auf einer Protokollebene vor sich geht, fügen Sie Ihrem Code die folgende Konstante hinzu:
<? phpDefine ('amqp_debug', true); ... mehr Code?>
So führen Sie den Veröffentlichungs-/Verbrauch von Benchmark -Typ aus:
$ make Benchmark
Weitere Informationen finden Sie unter Beitrag.
Wenn Sie die alte Version des Protokolls weiterhin verwenden möchten, können Sie dies tun, indem Sie die folgende Konstante in Ihrem Konfigurationscode festlegen:
define ('amqp_protocol', '0,8');
Der Standardwert ist '0.9.1'
.
Wenn Sie aus irgendeinem Grund keinen Komponisten verwenden möchten, müssen Sie einen Autoloader für die Bibliotheksklassen an Ort und Stelle haben. Die Menschen haben berichtet, diesen Autoloader mit Erfolg zu verwenden.
Unten finden Sie den ursprünglichen Inhalt der Readme -Datei. Die Credits gehen an die ursprünglichen Autoren.
PHP -Bibliothek implementiert erweitertes Message Queuing Protocol (AMQP).
Die Bibliothek ist Port des Python-Codes von Py-amqplib http://barryp.org/software/py-amqplib/
Es wurde mit Rabbitmq Server getestet.
Projekt Homepage: http://code.google.com/p/php-amqplib/
Zur Diskussion schließen Sie sich der Gruppe an:
http://groups.google.com/group/php-amqplib-devel
Für Fehlerberichte verwenden Sie bitte das Fehlerverfolgungssystem auf der Projektseite.
Patches sind sehr willkommen!
Autor: Vadim Zaliva [email protected]