This library is a pure PHP implementation of the AMQP 0-9-1 protocol. ได้รับการทดสอบกับ RabbitMQ
ห้องสมุดถูกใช้สำหรับตัวอย่าง PHP ของ RabbitMQ ในการดำเนินการและบทช่วยสอน RabbitMQ อย่างเป็นทางการ
โปรดทราบว่าโครงการนี้ได้รับการปล่อยตัวพร้อมกับจรรยาบรรณของผู้สนับสนุน โดยการเข้าร่วมในโครงการนี้คุณตกลงที่จะปฏิบัติตามข้อกำหนดของมัน
Thanks to videlalvaro and postalservice14 for creating php-amqplib
.
แพ็คเกจนี้ได้รับการดูแลโดยRamūnas Dronga, Luke Bakken และวิศวกร VMware หลายคนที่ทำงานกับ RabbitMQ
Starting with version 2.0 this library uses AMQP 0.9.1
by default and thus requires RabbitMQ 2.0 or later version. โดยปกติแล้วการอัพเกรดเซิร์ฟเวอร์ไม่จำเป็นต้องมีการเปลี่ยนแปลงรหัสแอปพลิเคชันใด ๆ เนื่องจากโปรโตคอลมีการเปลี่ยนแปลงไม่บ่อยนัก แต่โปรดทำการทดสอบของคุณเองก่อนที่จะอัพเกรด
Since the library uses AMQP 0.9.1
we added support for the following RabbitMQ extensions:
แลกเปลี่ยนเพื่อแลกเปลี่ยนการผูก
นาคพื้นฐาน
ผู้เผยแพร่ยืนยัน
ผู้บริโภคยกเลิกการแจ้งเตือน
Extensions that modify existing methods like alternate exchanges
are also supported.
Enqueue/AMQP-LIB เป็นเสื้อคลุมที่เข้ากันได้กับ AMQP
Amqproxy เป็นห้องสมุดพร็อกซีที่มีการเชื่อมต่อและการรวมช่องทาง/การนำกลับมาใช้ใหม่ สิ่งนี้จะช่วยให้การเชื่อมต่อที่ต่ำกว่าและช่องสัญญาณปั่นป่วนเมื่อใช้ PHP-AMQPLIB ซึ่งนำไปสู่การใช้ CPU น้อยลงของ RabbitMQ
ตรวจสอบให้แน่ใจว่าคุณติดตั้งนักแต่งเพลงจากนั้นเรียกใช้คำสั่งต่อไปนี้:
$ composer ต้องการ php-amqplib/php-amqplib
ที่จะดึงไลบรารีและการพึ่งพาภายในโฟลเดอร์ผู้ขายของคุณ จากนั้นคุณสามารถเพิ่มไฟล์. php ต่อไปนี้เพื่อใช้ไลบรารี
require_once __dir __. '/ผู้ขาย/autoload.php';
Then you need to use
the relevant classes, for example:
ใช้ phpamqplibconnectionamqpstreamconnection ใช้ phpamqplibmessageamqpmessage;
ด้วย RabbitMQ ที่ใช้งานเปิดสองเทอร์มินัลและในคำสั่งแรกเรียกใช้คำสั่งต่อไปนี้เพื่อเริ่มผู้บริโภค:
$ cd php-amqplib/demo $ php amqp_consumer.php
จากนั้นในเทอร์มินัลอื่นทำ:
$ cd php-amqplib/demo $ php amqp_publisher.php บางข้อความที่จะเผยแพร่
คุณควรเห็นข้อความที่มาถึงกระบวนการในเทอร์มินัลอื่น ๆ
Then to stop the consumer, send to it the quit
message:
$ php amqp_publisher.php ออกจาก
หากคุณต้องการฟังซ็อกเก็ตที่ใช้เชื่อมต่อกับ RabbitMQ จากนั้นดูตัวอย่างในผู้บริโภคที่ไม่ปิดกั้น
$ php amqp_consumer_non_blocking.php
โปรดดู Changelog สำหรับข้อมูลเพิ่มเติมสิ่งที่เปลี่ยนแปลงเมื่อเร็ว ๆ นี้
http://php-amqplib.github.io/php-amqplib/
หากต้องการทำซ้ำตัวเองหากคุณต้องการเรียนรู้เพิ่มเติมเกี่ยวกับห้องสมุดนี้โปรดดูบทเรียน RabbitMQ อย่างเป็นทางการ
amqp_ha_consumer.php
: demos the use of mirrored queues.
amqp_consumer_exclusive.php
and amqp_publisher_exclusive.php
: demos fanout exchanges using exclusive queues.
amqp_consumer_fanout_{1,2}.php
and amqp_publisher_fanout.php
: demos fanout exchanges with named queues.
amqp_consumer_pcntl_heartbeat.php
: demos signal-based heartbeat sender usage.
basic_get.php
: demos obtaining messages from the queues by using the basic get AMQP call.
หากคุณมีคลัสเตอร์หลายโหนดที่แอปพลิเคชันของคุณสามารถเชื่อมต่อได้คุณสามารถเริ่มการเชื่อมต่อด้วยอาร์เรย์ของโฮสต์ To do that you should use the create_connection
static method.
ตัวอย่างเช่น:
$ connection = amqpstreamconnection :: create_connection ([[ ['host' => host1, 'พอร์ต' => พอร์ต, 'ผู้ใช้' => ผู้ใช้, 'รหัสผ่าน' => ผ่าน, 'vhost' => vhost], ['host' => host2, 'พอร์ต' => พอร์ต, 'ผู้ใช้' => ผู้ใช้, 'รหัสผ่าน' => ผ่าน, 'vhost' => vhost] ], $ ตัวเลือก);
This code will try to connect to HOST1
first, and connect to HOST2
if the first connection fails. วิธีการส่งคืนวัตถุการเชื่อมต่อสำหรับการเชื่อมต่อที่ประสบความสำเร็จครั้งแรก หากการเชื่อมต่อทั้งหมดล้มเหลวมันจะโยนข้อยกเว้นจากความพยายามในการเชื่อมต่อครั้งล่าสุด
See demo/amqp_connect_multiple_hosts.php
for more examples.
Let's say you have a process that generates a bunch of messages that are going to be published to the same exchange
using the same routing_key
and options like mandatory
. Then you could make use of the batch_basic_publish
library feature. คุณสามารถแบทช์ข้อความเช่นนี้:
$ msg = ใหม่ amqpmessage ($ msg_body); $ ch-> batch_basic_publish ($ msg, $ exchange); $ msg2 = ใหม่ amqpmessage ($ msg_body); $ ch-> batch_basic_publish ($ msg2, $ exchange);
จากนั้นส่งแบทช์แบบนี้:
$ ch-> publish_batch ();
สมมติว่าโปรแกรมของเราต้องอ่านจากไฟล์แล้วเผยแพร่หนึ่งข้อความต่อบรรทัด ขึ้นอยู่กับขนาดของข้อความคุณจะต้องตัดสินใจว่าจะส่งแบทช์ขึ้นเมื่อใด คุณสามารถส่งทุก ๆ 50 ข้อความหรือทุกร้อย ขึ้นอยู่กับคุณ
Another way to speed up your message publishing is by reusing the AMQPMessage
message instances. คุณสามารถสร้างข้อความใหม่ของคุณเช่นนี้:
$ properties = array ('content_type' => 'text/plain', 'delivery_mode' => amqpmessage :: delivery_mode_persistent); $ msg = ใหม่ amqpmessage ($ body, $ properties); $ ch-> basic_publish ($ msg, $ แลกเปลี่ยน);
Now let's say that while you want to change the message body for future messages, you will keep the same properties, that is, your messages will still be text/plain
and the delivery_mode
will still be AMQPMessage::DELIVERY_MODE_PERSISTENT
. If you create a new AMQPMessage
instance for every published message, then those properties would have to be re-encoded in the AMQP binary format. You could avoid all that by just reusing the AMQPMessage
and then resetting the message body like this:
$ msg-> setbody ($ body2); $ ch-> masic_publish ($ msg, $ exchange);
AMQP ไม่ จำกัด ขนาดของข้อความ if a very large message is received by a consumer, PHP's memory limit may be reached within the library before the callback passed to basic_consume
is called.
To avoid this, you can call the method AMQPChannel::setBodySizeLimit(int $bytes)
on your Channel instance. Body sizes exceeding this limit will be truncated, and delivered to your callback with a AMQPMessage::$is_truncated
flag set to true
. The property AMQPMessage::$body_size
will reflect the true body size of a received message, which will be higher than strlen(AMQPMessage::getBody())
if the message has been truncated.
โปรดทราบว่าข้อมูลทั้งหมดที่สูงกว่าขีด จำกัด จะถูกอ่านจากช่อง AMQP และทิ้งทันทีดังนั้นจึงไม่มีวิธีดึงข้อมูลภายในการโทรกลับของคุณ If you have another consumer which can handle messages with larger payloads, you can use basic_reject
or basic_nack
to tell the server (which still has a complete copy) to forward it to a Dead Letter Exchange.
โดยค่าเริ่มต้นจะไม่มีการตัดทอน To disable truncation on a Channel that has had it enabled, pass 0
(or null
) to AMQPChannel::setBodySizeLimit()
.
ไคลเอนต์ RabbitMQ บางรายที่ใช้กลไกการกู้คืนการเชื่อมต่ออัตโนมัติเพื่อเชื่อมต่อและกู้คืนช่องทางและผู้บริโภคในกรณีที่เกิดข้อผิดพลาดเครือข่าย
เนื่องจากไคลเอนต์นี้ใช้เธรดเดียวคุณสามารถตั้งค่าการกู้คืนการเชื่อมต่อโดยใช้กลไกการจัดการข้อยกเว้น
ข้อยกเว้นที่อาจถูกโยนทิ้งในกรณีที่เกิดข้อผิดพลาดในการเชื่อมต่อ:
phpamqplibexceptionamqpconnectionclosedexceptionphpamqplibexceptionamqpioexceptionruntimexceptionerrorexception
ข้อยกเว้นอื่น ๆ อาจถูกโยนทิ้ง แต่การเชื่อมต่อยังคงอยู่ที่นั่น เป็นความคิดที่ดีที่จะทำความสะอาดการเชื่อมต่อเก่า ๆ เมื่อจัดการข้อยกเว้นก่อนที่จะเชื่อมต่อใหม่
ตัวอย่างเช่นหากคุณต้องการตั้งค่าการเชื่อมต่อที่กู้คืน:
$ connection = null; $ channel = null; ในขณะที่ (จริง) {ลอง {$ connection = new AmqpStreamConnection (โฮสต์, พอร์ต, ผู้ใช้, ผ่าน, vhost); // รหัสแอปพลิเคชันของคุณไปที่นี่ } catch (amqpruntimeException $ e) {echo $ e-> getMessage (); cleanup_connection ($ การเชื่อมต่อ); usleep (wait_before_reconnect_us); } catch (runtimeException $ e) {cleanup_connection ($ การเชื่อมต่อ); usleep (wait_before_reconnect_us); } catch (errorexception $ e) {cleanup_connection ($ การเชื่อมต่อ); usleep (wait_before_reconnect_us); - -
A full example is in demo/connection_recovery_consume.php
.
รหัสนี้จะเชื่อมต่อใหม่และลองรหัสแอปพลิเคชันอีกครั้งทุกครั้งที่มีข้อยกเว้นเกิดขึ้น ข้อยกเว้นบางประการยังสามารถโยนได้และไม่ควรจัดการเป็นส่วนหนึ่งของกระบวนการเชื่อมต่อใหม่เนื่องจากอาจเป็นข้อผิดพลาดของแอปพลิเคชัน
วิธีการนี้เหมาะสมสำหรับแอปพลิเคชันผู้บริโภคส่วนใหญ่ผู้ผลิตจะต้องใช้รหัสแอปพลิเคชันเพิ่มเติมเพื่อหลีกเลี่ยงการเผยแพร่ข้อความเดียวกันหลายครั้ง
นี่เป็นตัวอย่างที่ง่ายที่สุดในแอปพลิเคชันในชีวิตจริงคุณอาจต้องการควบคุมจำนวน REL และอาจลดเวลารออย่างสง่างามในการเชื่อมต่อใหม่
คุณสามารถค้นหาตัวอย่างที่มากเกินไปใน #444
หากคุณติดตั้งการส่งสัญญาณ PCNTL ของสัญญาณจะได้รับการจัดการเมื่อผู้บริโภคไม่ได้ประมวลผลข้อความ
$ pcntlhandler = function ($ สัญญาณ) {switch ($ signal) {case sigterm: case sigusr1: case sigint: // บางสิ่งบางอย่างก่อนหยุดผู้บริโภคเช่นลบล็อค etcpcntl_signal ($ signal, sig_dfl); // คืนค่า handlerposix_kill (posix_getpid (), $ สัญญาณ); // ฆ่าตัวเองด้วยสัญญาณดู https://www.cons.org/cracauer/sigint.htmlcase Sighup: // บางสิ่งบางอย่างเพื่อรีสตาร์ท Consumerbreak; }; pcntl_signal (sigterm, $ pcntlhandler); pcntl_signal (sigint, $ pcntlhandler); pcntl_signal (sigusr1, $ pcntlhandler); pcntl_signal (sighup, $ pcntlhandler);
To disable this feature just define constant AMQP_WITHOUT_SIGNALS
as true
<? phpdefine ('amqp_without_signals', true); ... รหัสเพิ่มเติม
หากคุณติดตั้งส่วนขยาย PCNTL และใช้ PHP 7.1 หรือมากกว่าคุณสามารถลงทะเบียนผู้ส่ง HeartBeat ตามสัญญาณ
<? php $ sender = ใหม่ pcntlheartbeatsender ($ การเชื่อมต่อ); $ sender-> register (); ... รหัส $ sender-> unregister ();
หากคุณต้องการทราบว่าเกิดอะไรขึ้นในระดับโปรโตคอลให้เพิ่มค่าคงที่ต่อไปนี้ลงในรหัสของคุณ:
<? phpdefine ('amqp_debug', true); ... รหัสเพิ่มเติม?>
ในการเรียกใช้ประเภทมาตรฐานการเผยแพร่/บริโภค:
$ Make Benchmark
โปรดดูรายละเอียดที่มีส่วนร่วม
หากคุณยังต้องการใช้โปรโตคอลเวอร์ชันเก่าคุณสามารถทำได้โดยการตั้งค่าคงที่ต่อไปนี้ในรหัสการกำหนดค่าของคุณ:
กำหนด ('amqp_protocol', '0.8');
The default value is '0.9.1'
.
หากด้วยเหตุผลบางอย่างที่คุณไม่ต้องการใช้นักแต่งเพลงคุณจะต้องมี Autoloader ในชั้นเรียนของห้องสมุด ผู้คนได้รายงานว่าใช้ Autoloader นี้ด้วยความสำเร็จ
ด้านล่างเป็นเนื้อหาไฟล์ readme ดั้งเดิม เครดิตไปที่ผู้เขียนดั้งเดิม
PHP Library การใช้โปรโตคอลการเข้าคิวข้อความขั้นสูง (AMQP)
ไลบรารีคือพอร์ตของรหัส Python ของ py-amqplib http://barryp.org/software/py-amqplib/
มันได้รับการทดสอบกับเซิร์ฟเวอร์ RabbitMQ
หน้าแรกของโครงการ: http://code.google.com/p/php-amqplib/
สำหรับการสนทนาโปรดเข้าร่วมกลุ่ม:
http://groups.google.com/group/php-amqplib-dev
สำหรับรายงานข้อผิดพลาดโปรดใช้ระบบติดตามข้อผิดพลาดที่หน้าโครงการ
แพทช์ยินดีต้อนรับมาก!
ผู้แต่ง: vadim zaliva [email protected]