Perpustakaan ini adalah implementasi PHP murni dari protokol AMQP 0-9-1. Itu telah diuji terhadap RabbitMQ.
Perpustakaan digunakan untuk contoh PHP RabbitMQ dalam aksi dan tutorial RabbitMQ resmi.
Harap dicatat bahwa proyek ini dirilis dengan kode perilaku kontributor. Dengan berpartisipasi dalam proyek ini, Anda setuju untuk mematuhi persyaratannya.
Terima kasih kepada Videlalvaro dan PossalService14 untuk membuat php-amqplib
.
Paket ini sekarang dikelola oleh Ramūnas Dronga, Luke Bakken dan beberapa insinyur VMware yang bekerja pada RabbitMQ.
Dimulai dengan Versi 2.0 Pustaka ini menggunakan AMQP 0.9.1
secara default dan dengan demikian membutuhkan versi RabbitMQ 2.0 atau yang lebih baru. Biasanya peningkatan server tidak memerlukan perubahan kode aplikasi karena perubahan protokol sangat jarang tetapi harap lakukan pengujian Anda sendiri sebelum meningkatkan.
Karena perpustakaan menggunakan AMQP 0.9.1
kami menambahkan dukungan untuk ekstensi RabbitMQ berikut:
Pertukaran ke binding pertukaran
Nack dasar
Penerbit mengkonfirmasi
Konsumen membatalkan notify
Ekstensi yang memodifikasi metode yang ada seperti alternate exchanges
juga didukung.
ENQUEUE/AMQP-LIB adalah pembungkus kompatibel AMQP Interop.
AMQProxy adalah pustaka proxy dengan koneksi dan pengumpulan/penggunaan kembali saluran. Hal ini memungkinkan koneksi yang lebih rendah dan churn churn saat menggunakan php-amqPlib, yang mengarah ke lebih sedikit penggunaan CPU RabbitMQ.
Pastikan Anda telah menginstal komposer, lalu jalankan perintah berikut:
$ komposer membutuhkan php-amqplib/php-amqplib
Itu akan mengambil perpustakaan dan ketergantungannya di dalam folder vendor Anda. Kemudian Anda dapat menambahkan yang berikut ini ke file .php Anda untuk menggunakan perpustakaan
membutuhkan_once __dir __. '/vendor/autoload.php';
Maka Anda perlu use
kelas yang relevan, misalnya:
Gunakan PHPAMQPLIBConnectionamQPStreamConnection; gunakan pHPAMQPlibMessageamQPMessage;
Dengan RabbitMQ berjalan terbuka dua terminal dan pada yang pertama menjalankan perintah berikut untuk memulai konsumen:
$ CD PHP-AMQPLIB/DEMO $ php amqp_consumer.php
Kemudian di terminal lain lakukan:
$ CD PHP-AMQPLIB/DEMO $ php amqp_publisher.php beberapa teks untuk diterbitkan
Anda akan melihat pesan yang tiba di proses di terminal lain
Kemudian untuk menghentikan konsumen, kirimkan ke sana pesan quit
:
$ php amqp_publisher.php berhenti
Jika Anda perlu mendengarkan soket yang digunakan untuk terhubung ke RabbitMQ maka lihat contoh di konsumen yang tidak memblokir.
$ php amqp_consumer_non_blocking.php
Silakan lihat Changelog untuk informasi lebih lanjut apa yang telah berubah baru -baru ini.
http://php-amqplib.github.io/php-amqplib/
Untuk tidak mengulangi diri kita sendiri, jika Anda ingin mempelajari lebih lanjut tentang perpustakaan ini, silakan merujuk ke tutorial RabbitMQ resmi.
amqp_ha_consumer.php
: Demo Penggunaan antrian cermin.
amqp_consumer_exclusive.php
dan amqp_publisher_exclusive.php
: Pertukaran fanout demo menggunakan antrian eksklusif.
amqp_consumer_fanout_{1,2}.php
dan amqp_publisher_fanout.php
: pertukaran fanout demo dengan antrian bernama.
amqp_consumer_pcntl_heartbeat.php
: Demos Penggunaan Pengirim Detak Jantung Berbasis Sinyal.
basic_get.php
: Demo mendapatkan pesan dari antrian dengan menggunakan panggilan AMQP BASIC Get .
Jika Anda memiliki sekelompok beberapa node yang dapat dihubungkan oleh aplikasi Anda, Anda dapat memulai koneksi dengan array host. Untuk melakukan itu, Anda harus menggunakan metode statis create_connection
.
Misalnya:
$ connection = AMQPStreamConnection :: create_connection ([ ['host' => host1, 'port' => port, 'user' => user, 'password' => pass, 'vhost' => vhost], ['host' => host2, 'port' => port, 'user' => user, 'password' => pass, 'vhost' => vhost] ], $ options);
Kode ini akan mencoba untuk terhubung ke HOST1
terlebih dahulu, dan terhubung ke HOST2
jika koneksi pertama gagal. Metode ini mengembalikan objek koneksi untuk koneksi sukses pertama. Jika semua koneksi gagal, itu akan melempar pengecualian dari upaya koneksi terakhir.
Lihat demo/amqp_connect_multiple_hosts.php
untuk lebih banyak contoh.
Katakanlah Anda memiliki proses yang menghasilkan banyak pesan yang akan dipublikasikan ke exchange
yang sama menggunakan routing_key
yang sama dan opsi seperti mandatory
. Maka Anda dapat memanfaatkan fitur perpustakaan batch_basic_publish
. Anda dapat mengumpulkan pesan seperti ini:
$ msg = AMQPMessage baru ($ msg_body); $ ch-> Batch_basic_publish ($ msg, $ Exchange); $ msg2 = AMQPMessage baru ($ msg_body); $ ch-> Batch_basic_publish ($ msg2, $ Exchange);
dan kemudian kirim batch seperti ini:
$ ch-> publish_batch ();
Katakanlah program kami perlu dibaca dari file dan kemudian menerbitkan satu pesan per baris. Bergantung pada ukuran pesan, Anda harus memutuskan kapan lebih baik mengirim batch. Anda dapat mengirimkannya setiap 50 pesan, atau setiap seratus. Terserah Anda.
Cara lain untuk mempercepat penerbitan pesan Anda adalah dengan menggunakan kembali contoh pesan AMQPMessage
. Anda dapat membuat pesan baru seperti ini:
$ properties = array ('content_type' => 'text/polos', 'Delivery_mode' => amqpmessage :: Delivery_mode_persistent); $ msg = AMQPMessage baru ($ body, $ properties); $ ch-> basic_publish ($ msg, $, $, $ menukarkan);
Sekarang katakanlah bahwa saat Anda ingin mengubah badan pesan untuk pesan mendatang, Anda akan menyimpan properti yang sama, yaitu, pesan Anda masih akan menjadi text/plain
dan delivery_mode
masih akan menjadi AMQPMessage::DELIVERY_MODE_PERSISTENT
. Jika Anda membuat instance AMQPMessage
baru untuk setiap pesan yang diterbitkan, maka properti-properti itu harus dikodekan ulang dalam format biner AMQP. Anda dapat menghindari semua itu dengan hanya menggunakan kembali AMQPMessage
dan kemudian mengatur ulang badan pesan seperti ini:
$ msg-> setbody ($ body2); $ ch-> basic_publish ($ msg, $ exchange);
AMQP tidak membebankan batasan pada ukuran pesan; Jika pesan yang sangat besar diterima oleh konsumen, batas memori PHP dapat dicapai dalam perpustakaan sebelum panggilan balik diteruskan ke basic_consume
dipanggil.
Untuk menghindari ini, Anda dapat memanggil metode AMQPChannel::setBodySizeLimit(int $bytes)
pada instance saluran Anda. Ukuran tubuh yang melebihi batas ini akan terpotong, dan dikirim ke panggilan balik Anda dengan AMQPMessage::$is_truncated
Flag yang diatur ke true
. Properti AMQPMessage::$body_size
akan mencerminkan ukuran tubuh yang sebenarnya dari pesan yang diterima, yang akan lebih tinggi dari strlen(AMQPMessage::getBody())
jika pesan telah dipotong.
Perhatikan bahwa semua data di atas batas dibaca dari saluran AMQP dan segera dibuang, jadi tidak ada cara untuk mengambilnya dalam panggilan balik Anda. Jika Anda memiliki konsumen lain yang dapat menangani pesan dengan muatan yang lebih besar, Anda dapat menggunakan basic_reject
atau basic_nack
untuk memberi tahu server (yang masih memiliki salinan lengkap) untuk meneruskannya ke pertukaran surat mati.
Secara default, tidak ada pemotongan yang akan terjadi. Untuk menonaktifkan pemotongan pada saluran yang telah diaktifkan, lulus 0
(atau null
) ke AMQPChannel::setBodySizeLimit()
.
Beberapa klien RabbitMQ yang menggunakan mekanisme pemulihan koneksi otomatis untuk menyambung kembali dan memulihkan saluran dan konsumen jika terjadi kesalahan jaringan.
Karena klien ini menggunakan satu thread, Anda dapat mengatur pemulihan koneksi menggunakan mekanisme penanganan pengecualian.
Pengecualian yang mungkin dilemparkan jika terjadi kesalahan koneksi:
PhpamqPlibExceptionAmqPconnectionClosedExceptionphpamqPlibExceptionAmqpioExceptionrunTimeExceptionerrorException
Beberapa pengecualian lain mungkin dilemparkan, tetapi koneksi masih bisa ada di sana. Itu selalu merupakan ide yang baik untuk membersihkan koneksi lama saat menangani pengecualian sebelum terhubung kembali.
Misalnya, jika Anda ingin mengatur koneksi pemulihan:
$ connection = null; $ channel = null; while (true) {try {$ connection = new AMQPStreamConnection (host, port, user, pass, vhost); // kode aplikasi Anda pergi ke sini.do_something_with_connection ($ connection); } catch (AMQPrunTimeException $ e) {echo $ e-> getMessage (); cleanup_connection ($ connection); usleep (wait_before_reconnect_us); } catch (runtimeException $ e) {cleanup_connection ($ connection); usleep (wait_before_reconnect_us); } catch (errorException $ e) {cleanup_connection ($ connection); usleep (wait_before_reconnect_us); } }
Contoh lengkap adalah dalam demo/connection_recovery_consume.php
.
Kode ini akan menghubungkan kembali dan mencoba lagi kode aplikasi setiap kali pengecualian terjadi. Beberapa pengecualian masih dapat dilemparkan dan tidak boleh ditangani sebagai bagian dari proses penyambungan kembali, karena mereka mungkin kesalahan aplikasi.
Pendekatan ini sebagian besar masuk akal untuk aplikasi konsumen, produsen akan memerlukan beberapa kode aplikasi tambahan untuk menghindari menerbitkan pesan yang sama beberapa kali.
Ini adalah contoh paling sederhana, dalam aplikasi kehidupan nyata Anda mungkin ingin mengontrol jumlah retr dan mungkin dengan anggun menurunkan waktu tunggu untuk menyambung kembali.
Anda dapat menemukan contoh yang lebih berlebihan di #444
Jika Anda telah menginstal pengiriman ekstensi PCNTL dari sinyal akan ditangani ketika konsumen tidak memproses pesan.
$ pcntlHandler = fungsi ($ sinyal) {switch ($ sinyal) {case sigterm: case sigusr1: case sigint: // beberapa barang sebelum berhenti konsumen misalnya menghapus kunci etcpcntl_signal ($ sinyal, sig_dfl); // kembalikan handlerposix_kill (posix_getpid (), $ sinyal); // Bunuh diri dengan sinyal, lihat https://www.cons.org/cracauer/sigint.htmlcase sighup: // beberapa hal untuk memulai kembali konsumen; default: // lakukan apa -apa} }; pcntl_signal (sigterm, $ pcntlHandler); pcntl_signal (sigint, $ pcntlhandler); pcntl_signal (sigusr1, $ pcntlHandler); pcntl_signal (sighup, $ pcntlhandler);
Untuk menonaktifkan fitur ini cukup tentukan AMQP_WITHOUT_SIGNALS
yang konstan sebagai true
<? phpdefine ('amqp_without_signals', true); ... lebih banyak kode
Jika Anda telah menginstal ekstensi PCNTL dan menggunakan PHP 7.1 atau lebih besar, Anda dapat mendaftarkan pengirim detak jantung berbasis sinyal.
<? php $ pengirim = pcntlheartbeatsender baru ($ connection); $ sender-> register (); ... kode $ sender-> unregister ();
Jika Anda ingin tahu apa yang terjadi di tingkat protokol maka tambahkan konstanta berikut ke kode Anda:
<? phpdefine ('amqp_debug', true); ... lebih banyak kode?>
Untuk menjalankan jenis tolok ukur penerbitan/konsumsi:
$ Make Benchmark
Silakan lihat berkontribusi untuk detailnya.
Jika Anda masih ingin menggunakan versi lama dari protokol maka Anda dapat melakukannya dengan mengatur konstanta berikut dalam kode konfigurasi Anda:
define ('AMQP_PROTOCOL', '0.8');
Nilai defaultnya adalah '0.9.1'
.
Jika karena alasan tertentu Anda tidak ingin menggunakan komposer, maka Anda harus memiliki autoloader di tempat untuk kelas perpustakaan. Orang -orang telah melaporkan menggunakan autoloader ini dengan sukses.
Di bawah ini adalah konten file readme asli. Kredit diberikan kepada penulis asli.
Perpustakaan PHP Menerapkan Protokol Antrian Pesan Lanjutan (AMQP).
Perpustakaan adalah port Python Code of Py-AMQPLIB http://barryp.org/software/py-amqplib/
Ini telah diuji dengan server RabbitMQ.
Halaman Beranda Proyek: http://code.google.com/p/php-amqplib/
Untuk diskusi, silakan bergabung dengan grup:
http://groups.google.com/group/php-amqplib-devel
Untuk laporan bug, silakan gunakan sistem pelacakan bug di halaman proyek.
Patch sangat disambut!
Penulis: vadim zaliva [email protected]