RabbitMqBundle
menggabungkan perpesanan dalam aplikasi Anda melalui RabbitMQ menggunakan perpustakaan php-amqplib.
Bundel ini mengimplementasikan beberapa pola pesan seperti yang terlihat di perpustakaan Thumper. Oleh karena itu, mempublikasikan pesan ke RabbitMQ dari pengontrol Symfony semudah:
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ));
Nanti ketika Anda ingin menggunakan 50 pesan dari antrian upload_pictures
, Anda tinggal menjalankannya di CLI:
$ ./app/console rabbitmq:consumer -m 50 upload_picture
Semua contoh mengharapkan server RabbitMQ yang berjalan.
Bundel ini dipresentasikan pada konferensi Symfony Live Paris 2011. Lihat slidenya di sini.
Karena perubahan yang dapat menyebabkan gangguan yang disebabkan oleh Symfony >=4.4, tag baru dirilis, membuat bundel tersebut kompatibel dengan Symfony >=4.4.
Memerlukan bundel dan dependensinya dengan composer:
$ composer require php-amqplib/rabbitmq-bundle
Daftarkan bundelnya:
// app/AppKernel.php
public function registerBundles ()
{
$ bundles = array (
new OldSound RabbitMqBundle OldSoundRabbitMqBundle (),
);
}
Menikmati !
Jika Anda memiliki aplikasi konsol yang digunakan untuk menjalankan konsumen RabbitMQ, Anda tidak memerlukan Symfony HttpKernel dan FrameworkBundle. Mulai versi 1.6, Anda dapat menggunakan komponen Injeksi Ketergantungan untuk memuat konfigurasi dan layanan bundel ini, lalu menggunakan perintah konsumen.
Memerlukan bundel di file composer.json Anda:
{
"require": {
"php-amqplib/rabbitmq-bundle": "^2.0",
}
}
Daftarkan ekstensi dan pass compiler:
use OldSound RabbitMqBundle DependencyInjection OldSoundRabbitMqExtension ;
use OldSound RabbitMqBundle DependencyInjection Compiler RegisterPartsPass ;
// ...
$ containerBuilder -> registerExtension ( new OldSoundRabbitMqExtension ());
$ containerBuilder -> addCompilerPass ( new RegisterPartsPass ());
Sejak 04-06-2012 Beberapa opsi default untuk bursa yang dinyatakan di bagian konfigurasi "produsen" telah diubah agar sesuai dengan default bursa yang dinyatakan di bagian "konsumen". Pengaturan yang terpengaruh adalah:
durable
diubah dari false
menjadi true
,auto_delete
diubah dari true
menjadi false
.Konfigurasi Anda harus diperbarui jika Anda mengandalkan nilai default sebelumnya.
Sejak 24-04-2012 Tanda tangan metode ConsumerInterface::execute telah berubah
Sejak 03-01-2012, metode yang dijalankan konsumen mendapatkan seluruh objek pesan AMQP dan bukan hanya isi pesan. Lihat file CHANGELOG untuk lebih jelasnya.
Tambahkan bagian old_sound_rabbit_mq
di file konfigurasi Anda:
old_sound_rabbit_mq :
connections :
default :
host : ' localhost '
port : 5672
user : ' guest '
password : ' guest '
vhost : ' / '
lazy : false
connection_timeout : 3
read_write_timeout : 3
# the timeout when waiting for a response from rabbitMQ (0.0 means waits forever)
channel_rpc_timeout : 0.0
# requires php-amqplib v2.4.1+ and PHP5.4+
keepalive : false
# requires php-amqplib v2.4.1+
heartbeat : 0
# requires php_sockets.dll
use_socket : true # default false
login_method : ' AMQPLAIN ' # default 'AMQPLAIN', can be 'EXTERNAL' or 'PLAIN', see https://www.rabbitmq.com/docs/access-control#mechanisms
another :
# A different (unused) connection defined by an URL. One can omit all parts,
# except the scheme (amqp:). If both segment in the URL and a key value (see above)
# are given the value from the URL takes precedence.
# See https://www.rabbitmq.com/uri-spec.html on how to encode values.
url : ' amqp://guest:password@localhost:5672/vhost?lazy=1&connection_timeout=6 '
producers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
service_alias : my_app_service # no alias by default
default_routing_key : ' optional.routing.key ' # defaults to '' if not set
default_content_type : ' content/type ' # defaults to 'text/plain'
default_delivery_mode : 2 # optional. 1 means non-persistent, 2 means persistent. Defaults to "2".
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
options :
no_ack : false # optional. If set to "true", automatic acknowledgement mode will be used by this consumer. Default "false". See https://www.rabbitmq.com/confirms.html for details.
Di sini kita mengkonfigurasi layanan koneksi dan titik akhir pesan yang akan dimiliki aplikasi kita. Dalam contoh ini penampung layanan Anda akan berisi layanan old_sound_rabbit_mq.upload_picture_producer
dan old_sound_rabbit_mq.upload_picture_consumer
. Nanti mengharapkan ada layanan bernama upload_picture_service
.
Jika Anda tidak menentukan koneksi untuk klien, klien akan mencari koneksi dengan alias yang sama. Jadi untuk upload_picture
kita, penampung layanan akan mencari koneksi upload_picture
.
Jika Anda perlu menambahkan argumen antrian opsional, maka opsi antrian Anda bisa seperti ini:
queue_options : {name: 'upload-picture', arguments: {'x-ha-policy': ['S', 'all']}}
contoh lain dengan pesan TTL 20 detik:
queue_options : {name: 'upload-picture', arguments: {'x-message-ttl': ['I', 20000]}}
Nilai argumen harus berupa daftar tipe data dan nilai. Tipe data yang valid adalah:
S
- TaliI
- bilangan bulatD
- DesimalT
- Stempel waktuF
- TabelA
- Himpunant
- Bodoh Sesuaikan arguments
sesuai dengan kebutuhan Anda.
Jika Anda ingin mengikat antrian dengan kunci perutean tertentu, Anda dapat mendeklarasikannya di konfigurasi produsen atau konsumen:
queue_options :
name : " upload-picture "
routing_keys :
- ' android.#.upload '
- ' iphone.upload '
Dalam lingkungan Symfony semua layanan di-bootstrap sepenuhnya untuk setiap permintaan, mulai versi >= 4.3 Anda dapat mendeklarasikan suatu layanan sebagai layanan malas (Lazy Services). Bundel ini masih belum mendukung fitur Layanan Malas yang baru tetapi Anda dapat mengatur lazy: true
dalam konfigurasi koneksi Anda untuk menghindari koneksi yang tidak perlu ke perantara pesan Anda di setiap permintaan. Sangat disarankan untuk menggunakan koneksi lambat karena alasan kinerja, namun opsi malas dinonaktifkan secara default untuk menghindari kemungkinan kerusakan pada aplikasi yang sudah menggunakan bundel ini.
Sebaiknya atur read_write_timeout
menjadi 2x detak jantung agar soket Anda terbuka. Jika Anda tidak melakukan ini, atau menggunakan pengganda yang berbeda, ada risiko soket konsumen akan habis waktunya.
Harap diingat, bahwa Anda dapat mengharapkan masalah, jika tugas Anda umumnya berjalan lebih lama dari periode detak jantung, yang tidak ada solusi yang baik (tautan). Pertimbangkan untuk menggunakan nilai yang besar untuk detak jantung atau biarkan detak jantung dinonaktifkan demi tcp keepalive
(baik di sisi klien dan server) dan fitur graceful_max_execution_timeout
.
Anda dapat menyediakan beberapa host untuk sebuah koneksi. Ini akan memungkinkan Anda untuk menggunakan cluster RabbitMQ dengan banyak node.
old_sound_rabbit_mq :
connections :
default :
hosts :
- host : host1
port : 3672
user : user1
password : password1
vhost : vhost1
- url : ' amqp://guest:password@localhost:5672/vhost '
connection_timeout : 3
read_write_timeout : 3
Perhatikan apa yang tidak bisa Anda tentukan
connection_timeout
read_write_timeout
use_socket
ssl_context
keepalive
heartbeat
connection_parameters_provider
parameter ke setiap host secara terpisah.
Terkadang informasi koneksi Anda mungkin perlu bersifat dinamis. Parameter koneksi dinamis memungkinkan Anda menyediakan atau mengganti parameter secara terprogram melalui layanan.
misalnya Dalam skenario ketika parameter vhost
koneksi bergantung pada penyewa aplikasi berlabel putih Anda saat ini dan Anda tidak ingin (atau tidak bisa) mengubah konfigurasinya setiap saat.
Tentukan layanan pada connection_parameters_provider
yang mengimplementasikan ConnectionParametersProviderInterface
, dan tambahkan ke konfigurasi connections
yang sesuai.
connections :
default :
host : ' localhost '
port : 5672
user : ' guest '
password : ' guest '
vhost : ' foo ' # to be dynamically overridden by `connection_parameters_provider`
connection_parameters_provider : connection_parameters_provider_service
Contoh Implementasi:
class ConnectionParametersProviderService implements ConnectionParametersProvider {
...
public function getConnectionParameters () {
return array ( ' vhost ' => $ this -> getVhost ());
}
. . .
}
Dalam hal ini, parameter vhost
akan diganti dengan keluaran getVhost()
.
Dalam aplikasi perpesanan, proses pengiriman pesan ke broker disebut produsen sedangkan proses menerima pesan tersebut disebut konsumen . Dalam aplikasi Anda, Anda akan memiliki beberapa di antaranya yang dapat Anda daftarkan di bawah entri masing-masing dalam konfigurasi.
Seorang produser akan digunakan untuk mengirim pesan ke server. Dalam Model AMQP, pesan dikirim ke bursa , ini berarti bahwa dalam konfigurasi untuk produsen Anda harus menentukan opsi koneksi bersama dengan opsi bursa, yang biasanya berupa nama bursa dan jenisnya.
Sekarang katakanlah Anda ingin memproses unggahan gambar di latar belakang. Setelah Anda memindahkan gambar ke lokasi akhirnya, Anda akan mempublikasikan pesan ke server dengan informasi berikut:
public function indexAction ( $ name )
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ));
}
Seperti yang Anda lihat, jika dalam konfigurasi Anda memiliki produser bernama upload_picture , maka dalam wadah layanan Anda akan memiliki layanan bernama old_sound_rabbit_mq.upload_picture_producer .
Selain pesan itu sendiri, metode OldSoundRabbitMqBundleRabbitMqProducer#publish()
juga menerima parameter kunci perutean opsional dan array opsional properti tambahan. Array properti tambahan memungkinkan Anda mengubah properti yang digunakan untuk membuat objek PhpAmqpLibMessageAMQPMessage
secara default. Dengan cara ini, misalnya, Anda dapat mengubah header aplikasi.
Anda dapat menggunakan metode setContentType dan setDeliveryMode untuk mengatur masing-masing jenis konten pesan dan mode pengiriman pesan, mengesampingkan set default apa pun di bagian konfigurasi "produsen". Jika tidak diganti oleh konfigurasi "produsen" atau panggilan eksplisit ke metode ini (seperti contoh di bawah), nilai defaultnya adalah text/plain untuk tipe konten dan 2 untuk mode pengiriman.
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> setContentType ( ' application/json ' );
Jika Anda perlu menggunakan kelas khusus untuk produser (yang seharusnya mewarisi dari OldSoundRabbitMqBundleRabbitMqProducer
), Anda dapat menggunakan opsi class
:
...
producers :
upload_picture :
class : MyCustomProducer
connection : default
exchange_options : {name: 'upload-picture', type: direct}
...
Bagian selanjutnya dari teka-teki ini adalah memiliki konsumen yang akan mengeluarkan pesan dari antrian dan memprosesnya sesuai dengan itu.
Saat ini ada dua acara yang dikeluarkan oleh produser.
Peristiwa ini terjadi segera sebelum pesan dipublikasikan. Ini adalah cara yang bagus untuk melakukan pencatatan akhir, validasi, dll. sebelum benar-benar mengirim pesan. Contoh implementasi pendengar:
namespace App EventListener ;
use OldSound RabbitMqBundle Event BeforeProducerPublishMessageEvent ;
use Symfony Component EventDispatcher Attribute AsEventListener ;
#[AsEventListener(event: BeforeProducerPublishMessageEvent:: NAME )]
final class AMQPBeforePublishEventListener
{
public function __invoke ( BeforeProducerPublishMessageEvent $ event ): void
{
// Your code goes here
}
}
Peristiwa ini terjadi segera setelah pesan dipublikasikan. Ini adalah cara yang bagus untuk melakukan pencatatan konfirmasi, penerapan, dll. setelah benar-benar mengirim pesan. Contoh implementasi pendengar:
namespace App EventListener ;
use OldSound RabbitMqBundle Event AfterProducerPublishMessageEvent ;
use Symfony Component EventDispatcher Attribute AsEventListener ;
#[AsEventListener(event: AfterProducerPublishMessageEvent:: NAME )]
final class AMQPBeforePublishEventListener
{
public function __invoke ( AfterProducerPublishMessageEvent $ event ): void
{
// Your code goes here
}
}
Konsumen akan terhubung ke server dan memulai loop menunggu pesan masuk diproses. Bergantung pada panggilan balik yang ditentukan untuk konsumen tersebut akan menjadi perilaku yang akan dimilikinya. Mari kita tinjau konfigurasi konsumen dari atas:
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
Seperti yang kita lihat di sana, opsi panggilan balik memiliki referensi ke upload_picture_service . Ketika konsumen mendapat pesan dari server, ia akan menjalankan panggilan balik tersebut. Jika untuk tujuan pengujian atau debugging Anda perlu menentukan callback yang berbeda, Anda dapat mengubahnya di sana.
Selain callback kita juga menentukan koneksi yang akan digunakan, sama seperti yang kita lakukan dengan producer . Opsi yang tersisa adalah exchange_options dan queue_options . Exchange_options harus sama dengan yang digunakan untuk producer . Di queue_options kami akan memberikan nama antrian . Mengapa?
Seperti yang kami katakan, pesan di AMQP dipublikasikan ke bursa . Ini tidak berarti pesan telah mencapai antrian . Agar hal ini terjadi, pertama-tama kita perlu membuat antrian tersebut dan kemudian mengikatnya ke exchange . Kerennya lagi, Anda bisa mengikat beberapa antrian ke satu exchange , sehingga satu pesan bisa sampai ke beberapa tujuan. Keuntungan dari pendekatan ini adalah pemisahan antara produsen dan konsumen. Produsen tidak peduli berapa banyak konsumen yang akan memproses pesannya. Yang diperlukan hanyalah pesannya sampai ke server. Dengan cara ini kita dapat memperluas tindakan yang kita lakukan setiap kali gambar diunggah tanpa perlu mengubah kode di pengontrol kita.
Sekarang, bagaimana cara menjalankan konsumen? Ada perintah untuk itu yang bisa dijalankan seperti ini:
$ ./app/console rabbitmq:consumer -m 50 upload_picture
Apa artinya ini? Kami mengeksekusi konsumen upload_picture yang memerintahkannya untuk hanya menggunakan 50 pesan. Setiap kali konsumen menerima pesan dari server, konsumen akan mengeksekusi callback yang dikonfigurasi yang meneruskan pesan AMQP sebagai turunan dari kelas PhpAmqpLibMessageAMQPMessage
. Isi pesan dapat diperoleh dengan memanggil $msg->body
. Secara default konsumen akan memproses pesan dalam loop tak berujung untuk beberapa definisi tak berujung .
Jika Anda ingin memastikan bahwa konsumen akan menyelesaikan eksekusi secara instan pada sinyal Unix, Anda dapat menjalankan perintah dengan flag -w
.
$ ./app/console rabbitmq:consumer -w upload_picture
Kemudian konsumen akan menyelesaikan eksekusi secara instan.
Untuk menggunakan perintah dengan flag ini Anda perlu menginstal PHP dengan ekstensi PCNTL.
Jika Anda ingin menetapkan batas memori konsumen, Anda dapat melakukannya dengan menggunakan flag -l
. Pada contoh berikut, tanda ini menambahkan batas memori 256 MB. Konsumen akan dihentikan lima MB sebelum mencapai 256MB untuk menghindari kesalahan ukuran memori PHP Diizinkan.
$ ./app/console rabbitmq:consumer -l 256
Jika Anda ingin menghapus semua pesan yang menunggu dalam antrian, Anda dapat menjalankan perintah ini untuk membersihkan antrian ini:
$ ./app/console rabbitmq:purge --no-confirmation upload_picture
Untuk menghapus antrian konsumen, gunakan perintah ini:
$ ./app/console rabbitmq:delete --no-confirmation upload_picture
Hal ini dapat berguna dalam banyak skenario. Ada 3 Acara AMQPE:
class OnConsumeEvent extends AMQPEvent
{
const NAME = AMQPEvent:: ON_CONSUME ;
/**
* OnConsumeEvent constructor.
*
* @param Consumer $consumer
*/
public function __construct ( Consumer $ consumer )
{
$ this -> setConsumer ( $ consumer );
}
}
s say you need to sleep / stop consumer/s on a new application deploy. You can listen for
OldSoundRabbitMqBundleEventOnConsumeEvent` dan memeriksa penerapan aplikasi baru.
class BeforeProcessingMessageEvent extends AMQPEvent
{
const NAME = AMQPEvent:: BEFORE_PROCESSING_MESSAGE ;
/**
* BeforeProcessingMessageEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer , AMQPMessage $ AMQPMessage )
{
$ this -> setConsumer ( $ consumer );
$ this -> setAMQPMessage ( $ AMQPMessage );
}
}
Acara dimunculkan sebelum memproses AMQPMessage
.
class AfterProcessingMessageEvent extends AMQPEvent
{
const NAME = AMQPEvent:: AFTER_PROCESSING_MESSAGE ;
/**
* AfterProcessingMessageEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer , AMQPMessage $ AMQPMessage )
{
$ this -> setConsumer ( $ consumer );
$ this -> setAMQPMessage ( $ AMQPMessage );
}
}
Acara dimunculkan setelah memproses AMQPMessage
. Jika pesan proses akan memunculkan Pengecualian, acara tersebut tidak akan dimunculkan.
<?php
class OnIdleEvent extends AMQPEvent
{
const NAME = AMQPEvent:: ON_IDLE ;
/**
* OnIdleEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct ( Consumer $ consumer )
{
$ this -> setConsumer ( $ consumer );
$ this -> forceStop = true ;
}
}
Peristiwa muncul ketika metode wait
keluar berdasarkan waktu habis tanpa menerima pesan. Untuk memanfaatkan kejadian ini, idle_timeout
konsumen harus dikonfigurasi. Secara default, proses keluar pada waktu tunggu habis, Anda dapat mencegahnya dengan menyetel $event->setForceStop(false)
di pendengar.
Jika Anda perlu menyetel batas waktu saat tidak ada pesan dari antrean selama jangka waktu tertentu, Anda dapat menyetel idle_timeout
dalam hitungan detik. idle_timeout_exit_code
menentukan kode keluar apa yang harus dikembalikan oleh konsumen ketika waktu tunggu idle terjadi. Tanpa menentukannya, konsumen akan memunculkan pengecualian PhpAmqpLibExceptionAMQPTimeoutException
.
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
idle_timeout : 60
idle_timeout_exit_code : 0
Tetapkan timeout_wait
dalam hitungan detik. timeout_wait
menentukan berapa lama konsumen akan menunggu tanpa menerima pesan baru sebelum memastikan koneksi saat ini masih valid.
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
idle_timeout : 60
idle_timeout_exit_code : 0
timeout_wait : 10
Jika Anda ingin konsumen Anda berjalan hingga waktu tertentu dan kemudian keluar dengan baik, maka atur graceful_max_execution.timeout
dalam hitungan detik. "Keluar dengan anggun" berarti, konsumen akan keluar setelah tugas yang sedang berjalan atau segera, ketika menunggu tugas baru. graceful_max_execution.exit_code
menentukan kode keluar apa yang harus dikembalikan oleh konsumen ketika batas waktu eksekusi maksimal yang anggun terjadi. Tanpa menentukannya, konsumen akan keluar dengan status 0
.
Fitur ini sangat bagus jika digabungkan dengan supervisord, yang bersama-sama memungkinkan pembersihan kebocoran memori secara berkala, koneksi dengan pembaruan database/rabbitmq, dan banyak lagi.
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
graceful_max_execution :
timeout : 1800 # 30 minutes
exit_code : 10 # default is 0
Anda mungkin telah memperhatikan bahwa pengiriman masih tidak berjalan sesuai keinginan kita. Misalnya dalam situasi dengan dua pekerja, ketika semua pesan ganjil berat dan pesan genap ringan, satu pekerja akan selalu sibuk dan pekerja lainnya hampir tidak melakukan pekerjaan apa pun. Ya, RabbitMQ tidak tahu apa-apa tentang itu dan masih akan mengirimkan pesan secara merata.
Hal ini terjadi karena RabbitMQ baru mengirimkan pesan ketika pesan tersebut masuk ke antrian. Itu tidak melihat jumlah pesan yang tidak diakui oleh konsumen. Itu hanya mengirimkan secara membabi buta setiap pesan ke-n ke konsumen ke-n.
Untuk mengatasinya kita bisa menggunakan metode basic.qos dengan setting prefetch_count=1. Ini memberitahu RabbitMQ untuk tidak memberikan lebih dari satu pesan kepada seorang pekerja dalam satu waktu. Atau, dengan kata lain, jangan mengirimkan pesan baru ke pekerja sampai pekerja tersebut memproses dan mengakui pesan sebelumnya. Sebaliknya, ia akan mengirimkannya ke pekerja berikutnya yang tidak masih sibuk.
Dari: http://www.rabbitmq.com/tutorials/tutorial-two-python.html
Berhati-hatilah saat menerapkan pengiriman adil yang menimbulkan latensi yang akan mengganggu kinerja (lihat posting blog ini). Namun penerapannya memungkinkan Anda melakukan penskalaan secara horizontal dan dinamis seiring bertambahnya antrian. Anda harus mengevaluasi, seperti yang direkomendasikan oleh postingan blog, nilai prefetch_size yang tepat sesuai dengan waktu yang dibutuhkan untuk memproses setiap pesan dan kinerja jaringan Anda.
Dengan RabbitMqBundle, Anda dapat mengonfigurasi qos_options per konsumen seperti itu:
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
qos_options : {prefetch_size: 0, prefetch_count: 1, global: false}
Jika digunakan dengan bundel Symfony 4.2+ yang dideklarasikan dalam kumpulan wadah alias untuk produsen dan konsumen biasa. Itu digunakan untuk argumen pengkabelan otomatis berdasarkan tipe yang dideklarasikan dan nama argumen. Ini memungkinkan Anda mengubah contoh produser sebelumnya menjadi:
public function indexAction ( $ name , ProducerInterface $ uploadPictureProducer )
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ uploadPictureProducer -> publish ( serialize ( $ msg ));
}
Nama argumen dibangun dari nama produsen atau konsumen dari konfigurasi dan diakhiri dengan kata produsen atau konsumen sesuai jenisnya. Berbeda dengan konvensi penamaan barang kontainer, akhiran kata (produsen atau konsumen) tidak akan terduplikasi jika nama sudah diberi akhiran. kunci produser upload_picture
akan diubah menjadi nama argumen $uploadPictureProducer
. kunci produser upload_picture_producer
juga akan diberi nama argumen $uploadPictureProducer
. Sebaiknya hindari nama yang serupa dengan cara seperti itu.
Semua produser memiliki alias ke OldSoundRabbitMqBundleRabbitMqProducerInterface
dan opsi kelas produser dari konfigurasi. Pada mode sandbox hanya alias ProducerInterface
yang dibuat. Sangat disarankan untuk menggunakan kelas ProducerInterface
ketika mengetikkan argumen petunjuk untuk injeksi produser.
Semua konsumen diberi alias ke nilai opsi konfigurasi OldSoundRabbitMqBundleRabbitMqConsumerInterface
dan %old_sound_rabbit_mq.consumer.class%
. Tidak ada perbedaan antara mode biasa dan mode sandbox. Sangat disarankan untuk menggunakan ConsumerInterface
saat mengetikkan argumen petunjuk untuk injeksi klien.
Berikut ini contoh panggilan balik:
<?php
//src/Acme/DemoBundle/Consumer/UploadPictureConsumer.php
namespace Acme DemoBundle Consumer ;
use OldSound RabbitMqBundle RabbitMq ConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class UploadPictureConsumer implements ConsumerInterface
{
public function execute ( AMQPMessage $ msg )
{
//Process picture upload.
//$msg will be an instance of `PhpAmqpLibMessageAMQPMessage` with the $msg->body being the data sent over RabbitMQ.
$ isUploadSuccess = someUploadPictureMethod ();
if (! $ isUploadSuccess ) {
// If your image upload failed due to a temporary error you can return false
// from your callback so the message will be rejected by the consumer and
// requeued by RabbitMQ.
// Any other value not equal to false will acknowledge the message and remove it
// from the queue
return false ;
}
}
}
Seperti yang Anda lihat, ini semudah menerapkan satu metode: ConsumerInterface::execute .
Ingatlah bahwa panggilan balik Anda harus didaftarkan sebagai layanan Symfony biasa. Di sana Anda dapat menyuntikkan wadah layanan, layanan database, logger Symfony, dan sebagainya.
Lihat https://github.com/php-amqplib/php-amqplib/blob/master/doc/AMQPMessage.md untuk detail selengkapnya tentang bagian dari instance pesan.
Untuk menghentikan konsumen, callback dapat menampilkan StopConsumerException
(pesan yang terakhir digunakan tidak akan di-ack) atau AckStopConsumerException
(pesan akan di-ack). Jika menggunakan demonized, ex: supervisor, konsumen akan benar-benar restart.
Tampaknya ini merupakan pekerjaan yang cukup berat hanya untuk mengirim pesan, mari kita rekap untuk mendapatkan gambaran yang lebih baik. Inilah yang kita perlukan untuk menghasilkan/mengkonsumsi pesan:
Dan itu saja!
Ini adalah persyaratan untuk memiliki keterlacakan pesan yang diterima/diterbitkan. Untuk mengaktifkannya, Anda perlu menambahkan konfigurasi enable_logger
ke konsumen atau penerbit.
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture'}
callback : upload_picture_service
enable_logger : true
Jika mau, Anda juga dapat menangani logging dari antrian dengan penangan berbeda di monolog, dengan merujuk saluran phpamqplib
.
Selama ini kita baru mengirimkan pesan kepada konsumen, namun bagaimana jika kita ingin mendapat balasan dari mereka? Untuk mencapai hal ini kita harus mengimplementasikan panggilan RPC ke dalam aplikasi kita. Bundel ini membuatnya cukup mudah untuk mencapai hal-hal tersebut dengan Symfony.
Mari tambahkan klien dan server RPC ke dalam konfigurasi:
rpc_clients :
integer_store :
connection : default # default: default
unserializer : json_decode # default: unserialize
lazy : true # default: false
direct_reply_to : false
rpc_servers :
random_int :
connection : default
callback : random_int_server
qos_options : {prefetch_size: 0, prefetch_count: 1, global: false}
exchange_options : {name: random_int, type: topic}
queue_options : {name: random_int_queue, durable: false, auto_delete: true}
serializer : json_encode
Untuk referensi konfigurasi lengkap silakan gunakan perintah php app/console config:dump-reference old_sound_rabbit_mq
.
Di sini kita memiliki server yang sangat berguna: ia mengembalikan bilangan bulat acak ke kliennya. Callback yang digunakan untuk memproses permintaan adalah layanan random_int_server . Sekarang mari kita lihat cara menjalankannya dari pengontrol kita.
Pertama kita harus memulai server dari baris perintah:
$ ./app/console_dev rabbitmq:rpc-server random_int
Dan kemudian tambahkan kode berikut ke pengontrol kami:
public function indexAction ( $ name )
{
$ client = $ this -> get ( ' old_sound_rabbit_mq.integer_store_rpc ' );
$ client -> addRequest ( serialize ( array ( ' min ' => 0 , ' max ' => 10 )), ' random_int ' , ' request_id ' );
$ replies = $ client -> getReplies ();
}
Seperti yang Anda lihat di sana, jika id klien kami adalah integer_store , maka nama layanannya adalah old_sound_rabbit_mq.integer_store_rpc . Setelah kita mendapatkan objek tersebut, kita mengajukan permintaan pada server dengan memanggil addRequest
yang mengharapkan tiga parameter:
Argumen yang kami kirimkan adalah nilai min dan max untuk fungsi rand()
. Kami mengirimkannya dengan membuat serialisasi array. Jika server kami mengharapkan informasi JSON, atau XML, kami akan mengirimkan data tersebut ke sini.
Bagian terakhir adalah mendapatkan balasannya. Skrip PHP kami akan memblokir hingga server mengembalikan nilai. Variabel $replies akan berupa array asosiatif dimana setiap balasan dari server akan dimuat dalam kunci request_id masing-masing.
Secara default, Klien RPC mengharapkan respons diserialkan. Jika server yang Anda gunakan mengembalikan hasil non-serial, maka atur opsi klien expect_serialized_response
ke false. Misalnya, jika server integer_store tidak membuat serial, hasilnya klien akan ditetapkan seperti di bawah ini:
rpc_clients :
integer_store :
connection : default
expect_serialized_response : false
Anda juga dapat mengatur masa berlaku permintaan dalam milidetik, setelah itu pesan tidak akan lagi ditangani oleh server dan permintaan klien akan habis waktunya. Menyetel masa berlaku pesan hanya berfungsi untuk RabbitMQ 3.x dan yang lebih baru. Kunjungi http://www.rabbitmq.com/ttl.html#per-message-ttl untuk informasi lebih lanjut.
public function indexAction ( $ name )
{
$ expiration = 5000 ; // milliseconds
$ client = $ this -> get ( ' old_sound_rabbit_mq.integer_store_rpc ' );
$ client -> addRequest ( $ body , $ server , $ requestId , $ routingKey , $ expiration );
try {
$ replies = $ client -> getReplies ();
// process $replies['request_id'];
} catch ( PhpAmqpLib Exception AMQPTimeoutException $ e ) {
// handle timeout
}
}
Seperti yang bisa Anda tebak, kami juga dapat melakukan panggilan RPC paralel .
Katakanlah untuk merender beberapa halaman web, Anda perlu melakukan dua kueri basis data, satu memerlukan waktu 5 detik untuk menyelesaikannya dan yang lainnya memerlukan waktu 2 detik –kueri yang sangat mahal–. Jika Anda menjalankannya secara berurutan, maka halaman Anda akan siap ditayangkan dalam waktu sekitar 7 detik. Jika Anda menjalankannya secara paralel maka halaman Anda akan disajikan dalam waktu sekitar 5 detik. Dengan RabbitMqBundle
kita dapat melakukan panggilan paralel dengan mudah. Mari kita tentukan klien paralel di konfigurasi dan server RPC lainnya:
rpc_clients :
parallel :
connection : default
rpc_servers :
char_count :
connection : default
callback : char_count_server
random_int :
connection : default
callback : random_int_server
Maka kode ini harus masuk ke pengontrol kami:
public function indexAction ( $ name )
{
$ client = $ this -> get ( ' old_sound_rabbit_mq.parallel_rpc ' );
$ client -> addRequest ( $ name , ' char_count ' , ' char_count ' );
$ client -> addRequest ( serialize ( array ( ' min ' => 0 , ' max ' => 10 )), ' random_int ' , ' random_int ' );
$ replies = $ client -> getReplies ();
}
Sangat mirip dengan contoh sebelumnya, kita hanya memiliki panggilan addRequest
tambahan. Kami juga menyediakan pengidentifikasi permintaan yang bermakna sehingga nantinya akan lebih mudah bagi kami untuk menemukan balasan yang kami inginkan di array $replies .
Untuk mengaktifkan klien balasan langsung, Anda hanya perlu mengaktifkan opsi direct_reply_to pada konfigurasi rpc_clients untuk klien.
Opsi ini akan menggunakan pseudo-queue amq.rabbitmq.reply-to saat melakukan panggilan RPC. Pada server RPC tidak diperlukan modifikasi.
RabbitMQ memiliki implementasi antrian prioritas di inti pada versi 3.5.0. Antrean apa pun dapat diubah menjadi antrean prioritas menggunakan argumen opsional yang disediakan klien (namun, tidak seperti fitur lain yang menggunakan argumen opsional, bukan kebijakan). Implementasinya mendukung sejumlah prioritas terbatas: 255. Direkomendasikan nilai antara 1 dan 10. Periksa dokumentasi
inilah cara Anda mendeklarasikan antrian prioritas
consumers :
upload_picture :
connection : default
exchange_options : {name: 'upload-picture', type: direct}
queue_options : {name: 'upload-picture', arguments: {'x-max-priority': ['I', 10]} }
callback : upload_picture_service
jika antrean upload-picture
ada sebelumnya, Anda harus menghapus antrean ini sebelum menjalankan perintah rabbitmq:setup-fabric
Sekarang katakanlah Anda ingin membuat pesan dengan prioritas tinggi, Anda harus mempublikasikan pesan tersebut dengan informasi tambahan ini
public function indexAction ()
{
$ msg = array ( ' user_id ' => 1235 , ' image_path ' => ' /path/to/new/pic.png ' );
$ additionalProperties = [ ' priority ' => 10 ] ;
$ routing_key = '' ;
$ this -> get ( ' old_sound_rabbit_mq.upload_picture_producer ' )-> publish ( serialize ( $ msg ), $ routing_key , $ additionalProperties );
}
Merupakan praktik yang baik untuk memiliki banyak antrian untuk pemisahan logika. Dengan konsumen sederhana Anda harus membuat satu pekerja (konsumen) per antrian dan ini mungkin sulit untuk dikelola ketika berhadapan dengan banyak evolusi (lupa menambahkan baris dalam konfigurasi supervisord Anda?). Hal ini juga berguna untuk antrean kecil karena Anda mungkin tidak ingin memiliki pekerja sebanyak antrean, dan ingin mengelompokkan kembali beberapa tugas tanpa kehilangan fleksibilitas dan prinsip pemisahan.
Banyak konsumen memungkinkan Anda menangani kasus penggunaan ini dengan mendengarkan beberapa antrean pada konsumen yang sama.
Berikut cara mengatur konsumen dengan banyak antrian:
multiple_consumers :
upload :
connection : default
exchange_options : {name: 'upload', type: direct}
queues_provider : queues_provider_service
queues :
upload-picture :
name : upload_picture
callback : upload_picture_service
routing_keys :
- picture
upload-video :
name : upload_video
callback : upload_video_service
routing_keys :
- video
upload-stats :
name : upload_stats
callback : upload_stats
Callback sekarang ditentukan di bawah setiap antrian dan harus mengimplementasikan ConsumerInterface
seperti konsumen sederhana. Semua pilihan queues-options
di konsumen tersedia untuk setiap antrian.
Sadarilah bahwa semua antrian berada di bawah pertukaran yang sama, terserah Anda untuk mengatur perutean yang benar untuk panggilan balik.
queues_provider
adalah layanan opsional yang menyediakan antrian secara dinamis. Itu harus mengimplementasikan QueuesProviderInterface
.
Ketahuilah bahwa penyedia antrean bertanggung jawab atas panggilan yang tepat ke setDequeuer
dan bahwa panggilan balik dapat dipanggil (bukan ConsumerInterface
). Jika antrian penyedia layanan mengimplementasikan DequeuerAwareInterface
, panggilan ke setDequeuer
ditambahkan ke definisi layanan dengan DequeuerInterface
yang saat ini menjadi MultipleConsumer
.
Anda mungkin menemukan bahwa aplikasi Anda memiliki alur kerja yang kompleks dan Anda perlu melakukan pengikatan sewenang-wenang. Skenario pengikatan sewenang-wenang mungkin mencakup pengikatan pertukaran ke pertukaran melalui properti destination_is_exchange
.
bindings :
- {exchange: foo, destination: bar, routing_key: 'baz.*' }
- {exchange: foo1, destination: foo, routing_key: 'baz.*', destination_is_exchange: true}
Perintah Rabbitmq:setup-fabric akan mendeklarasikan pertukaran dan antrean sebagaimana ditentukan dalam konfigurasi produsen, konsumen, dan multikonsumen Anda sebelum membuat pengikatan sewenang-wenang. Namun, Rabbitmq:setup-fabric TIDAK akan mendeklarasikan antrian penambahan dan pertukaran yang ditentukan dalam binding. Terserah Anda untuk memastikan pertukaran/antrian diumumkan.
Terkadang Anda harus mengubah konfigurasi konsumen dengan cepat. Konsumen dinamis memungkinkan Anda menentukan opsi antrean konsumen secara terprogram, berdasarkan konteks.
misalnya Dalam skenario ketika konsumen yang ditentukan harus bertanggung jawab atas sejumlah topik yang dinamis dan Anda tidak ingin (atau tidak bisa) mengubah konfigurasinya setiap saat.
Tentukan layanan queue_options_provider
yang mengimplementasikan QueueOptionsProviderInterface
, dan tambahkan ke konfigurasi dynamic_consumers
Anda.
dynamic_consumers :
proc_logs :
connection : default
exchange_options : {name: 'logs', type: topic}
callback : parse_logs_service
queue_options_provider : queue_options_provider_service
Contoh Penggunaan:
$ ./app/console rabbitmq:dynamic-consumer proc_logs server1
Dalam hal ini konsumen proc_logs
menjalankan server1
dan dapat memutuskan opsi antrian yang digunakannya.
Sekarang, mengapa kita membutuhkan konsumen anonim? Ini terdengar seperti ancaman internet atau semacamnya… Teruslah membaca.
Di AMQP ada jenis pertukaran yang disebut topik di mana pesan diarahkan ke antrian berdasarkan –Anda dapat menebak– topik pesan. Kami dapat mengirim log tentang aplikasi kami ke pertukaran topik RabbiMQ menggunakan topik nama host tempat log dibuat dan tingkat keparahan log tersebut. Badan pesan akan menjadi konten log dan kunci peruteannya akan seperti ini:
Karena kita tidak ingin mengisi antrian dengan log yang tidak terbatas, apa yang bisa kita lakukan adalah ketika kita ingin memantau sistem, kita bisa meluncurkan konsumen yang membuat antrian dan melampirkan pertukaran log berdasarkan beberapa topik, misalnya , kami ingin melihat semua kesalahan yang dilaporkan oleh server kami. Kunci peruteannya akan seperti ini: #.error . Dalam kasus seperti ini kita harus membuat nama antrean, mengikatnya ke bursa, mengambil log, melepaskan ikatannya, dan menghapus antrean. Untungnya AMPQ menyediakan cara untuk melakukan ini secara otomatis jika Anda memberikan opsi yang tepat saat mendeklarasikan dan mengikat antrean. Masalahnya adalah Anda tidak ingin mengingat semua pilihan itu. Oleh karena itu kami menerapkan pola Konsumen Anonim .
Saat kami memulai Konsumen Anonim, detail tersebut akan ditangani dan kami hanya perlu memikirkan untuk menerapkan panggilan balik saat pesan tiba. Apakah disebut Anonymous karena tidak akan menentukan nama antrian, tetapi akan menunggu RabbitMQ untuk menetapkan nama acak ke dalamnya.
Sekarang, bagaimana cara mengkonfigurasi dan menjalankan konsumen tersebut?
anon_consumers :
logs_watcher :
connection : default
exchange_options : {name: 'app-logs', type: topic}
callback : logs_watcher
Di sana kami menentukan nama pertukaran dan jenisnya beserta panggilan balik yang harus dijalankan ketika sebuah pesan tiba.
Konsumen Anonim ini sekarang dapat mendengarkan Produser, yang terhubung ke bursa yang sama dan berjenis topik :
producers :
app_logs :
connection : default
exchange_options : {name: 'app-logs', type: topic}
Untuk memulai Konsumen Anonim kami menggunakan perintah berikut:
$ ./app/console_dev rabbitmq:anon-consumer -m 5 -r ' #.error ' logs_watcher
Satu-satunya opsi baru dibandingkan dengan perintah yang telah kita lihat sebelumnya adalah opsi yang menentukan kunci perutean : -r '#.error'
.
Dalam beberapa kasus, Anda ingin mendapatkan sekumpulan pesan dan kemudian melakukan beberapa pemrosesan pada semuanya. Konsumen batch akan memungkinkan Anda menentukan logika untuk jenis pemrosesan ini.
misalnya: Bayangkan Anda memiliki antrian di mana Anda menerima pesan untuk memasukkan beberapa informasi ke dalam database, dan Anda menyadari bahwa jika Anda melakukan penyisipan batch jauh lebih baik daripada memasukkan satu per satu.
Tentukan layanan panggilan balik yang mengimplementasikan BatchConsumerInterface
dan tambahkan definisi konsumen ke konfigurasi Anda.
batch_consumers :
batch_basic_consumer :
connection : default
exchange_options : {name: 'batch', type: fanout}
queue_options : {name: 'batch'}
callback : batch.basic
qos_options : {prefetch_size: 0, prefetch_count: 2, global: false}
timeout_wait : 5
auto_setup_fabric : false
idle_timeout_exit_code : -2
keep_alive : false
graceful_max_execution :
timeout : 60
Catatan : Jika opsi keep_alive
disetel ke true
, idle_timeout_exit_code
akan diabaikan dan proses konsumen dilanjutkan.
Anda dapat menerapkan konsumen batch yang akan menerima semua pesan dalam satu pengembalian atau Anda dapat memiliki kendali atas pesan apa yang harus dikonfirmasi.
namespace AppBundle Service ;
use OldSound RabbitMqBundle RabbitMq BatchConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class DevckBasicConsumer implements BatchConsumerInterface
{
/**
* @inheritDoc
*/
public function batchExecute ( array $ messages )
{
echo sprintf ( ' Doing batch execution%s ' , PHP_EOL );
foreach ( $ messages as $ message ) {
$ this -> executeSomeLogicPerMessage ( $ message );
}
// you ack all messages got in batch
return true ;
}
}
namespace AppBundle Service ;
use OldSound RabbitMqBundle RabbitMq BatchConsumerInterface ;
use PhpAmqpLib Message AMQPMessage ;
class DevckBasicConsumer implements BatchConsumerInterface
{
/**
* @inheritDoc
*/
public function batchExecute ( array $ messages )
{
echo sprintf ( ' Doing batch execution%s ' , PHP_EOL );
$ result = [];
/** @var AMQPMessage $message */
foreach ( $ messages as $ message ) {
$ result [ $ message -> getDeliveryTag ()] = $ this -> executeSomeLogicPerMessage ( $ message );
}
// you ack only some messages that have return true
// e.g:
// $return = [
// 1 => true,
// 2 => true,
// 3 => false,
// 4 => true,
// 5 => -1,
// 6 => 2,
// ];
// The following will happen:
// * ack: 1,2,4
// * reject and requeq: 3
// * nack and requeue: 6
// * reject and drop: 5
return $ result ;
}
}
Cara menjalankan konsumen batch berikut:
$ ./bin/console rabbitmq:batch:consumer batch_basic_consumer -w
Penting: BatchConsumers tidak akan menyediakan opsi -m|messages
Penting: BatchConsumers juga dapat menyediakan opsi -b|batches
jika Anda hanya ingin menggunakan sejumlah batch tertentu dan kemudian menghentikan konsumen. Berikan nomor batch hanya jika Anda ingin konsumen berhenti setelah pesan batch tersebut dikonsumsi!
Ada Perintah yang membaca data dari STDIN dan menerbitkannya ke antrian RabbitMQ. Untuk menggunakannya terlebih dahulu Anda harus mengkonfigurasi layanan producer
di file konfigurasi Anda seperti ini:
producers :
words :
connection : default
exchange_options : {name: 'words', type: direct}
Produser itu akan mempublikasikan pesan dengan words
pertukaran langsung. Tentu saja Anda dapat menyesuaikan konfigurasinya sesuai keinginan Anda.
Lalu katakanlah Anda ingin mempublikasikan konten beberapa file XML sehingga diproses oleh sekelompok konsumen. Anda dapat mempublikasikannya hanya dengan menggunakan perintah seperti ini:
$ find vendor/symfony/ -name " *.xml " -print0 | xargs -0 cat | ./app/console rabbitmq:stdin-producer words
Ini berarti Anda dapat membuat produser dengan perintah Unix biasa.
Mari kita menguraikan satu liner itu:
$ find vendor/symfony/ -name " *.xml " -print0
Perintah itu akan menemukan semua file .xml
di dalam folder symfony dan akan mencetak nama file. Masing-masing nama file tersebut kemudian disalurkan ke cat
melalui xargs
:
$ xargs -0 cat
Dan akhirnya keluaran cat
langsung menuju ke produser kita yang dipanggil seperti ini:
$ ./app/console rabbitmq:stdin-producer words
Hanya diperlukan satu argumen yaitu nama produser saat Anda mengonfigurasinya di file config.yml
Anda.
Tujuan dari bundel ini adalah membiarkan aplikasi Anda menghasilkan pesan dan menerbitkannya ke beberapa bursa yang Anda konfigurasikan.
Dalam beberapa kasus dan meskipun konfigurasi Anda benar, pesan yang Anda hasilkan tidak akan dialihkan ke antrean mana pun karena tidak ada. Konsumen yang bertanggung jawab atas konsumsi antrian harus dijalankan agar antrian dapat dibuat.
Meluncurkan perintah untuk setiap konsumen bisa menjadi mimpi buruk ketika jumlah konsumen banyak.
Untuk membuat pertukaran, antrian, dan pengikatan sekaligus dan memastikan Anda tidak akan kehilangan pesan apa pun, Anda dapat menjalankan perintah berikut:
$ ./app/console rabbitmq:setup-fabric
Jika diinginkan, Anda dapat mengonfigurasi konsumen dan produsen untuk berasumsi bahwa fabric RabbitMQ sudah ditentukan. Untuk melakukannya, tambahkan yang berikut ini ke konfigurasi Anda:
producers :
upload_picture :
auto_setup_fabric : false
consumers :
upload_picture :
auto_setup_fabric : false
Secara default, konsumen atau produsen akan mendeklarasikan semua yang dibutuhkannya dengan RabbitMQ saat dimulai. Hati-hati dalam menggunakan ini, ketika pertukaran atau antrian tidak ditentukan, akan terjadi kesalahan. Ketika Anda telah mengubah konfigurasi apa pun, Anda perlu menjalankan perintah setup-fabric di atas untuk mendeklarasikan konfigurasi Anda.
Untuk berkontribusi cukup buka Permintaan Tarik dengan kode baru Anda dengan mempertimbangkan bahwa jika Anda menambahkan fitur baru atau memodifikasi fitur yang sudah ada, Anda harus mendokumentasikan dalam README ini apa yang mereka lakukan. Jika Anda melanggar BC maka Anda harus mendokumentasikannya juga. Anda juga harus memperbarui CHANGELOG. Jadi:
Lihat: resources/meta/LICENSE.md
Struktur bundel dan dokumentasinya sebagian didasarkan pada RedisBundle