PHP-rdkafka adalah klien Kafka yang stabil , siap produksi , dan cepat untuk PHP berdasarkan librdkafka.
Versi saat ini mendukung PHP >= 8.1.0, librdkafka >= 1.5.3, Kafka >= 0.8. Versi 6.x mendukung PHP 7.x..8.x, librdkafka 0.11..2.x. Versi yang lebih lama mendukung PHP 5.
Tujuan dari perluasan ini adalah menjadi pengikatan librdkafka tingkat rendah tanpa opini yang berfokus pada produksi dan dukungan jangka panjang.
API konsumen , produsen , dan metadata tingkat tinggi dan rendah didukung.
Dokumentasi tersedia di sini.
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/rdkafka.setup.html
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/rdkafka.examples.html
Parameter konfigurasi yang digunakan di bawah ini dapat ditemukan di referensi Konfigurasi Librdkafka
Untuk memproduksi, pertama-tama kita perlu membuat produser, dan menambahkan broker (server Kafka) ke dalamnya:
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' log_level ' , ( string ) LOG_DEBUG );
$ conf -> set ( ' debug ' , ' all ' );
$ rk = new RdKafka Producer ( $ conf );
$ rk -> addBrokers ( " 10.0.0.1:9092,10.0.0.2:9092 " );
Peringatan Pastikan produser Anda mengikuti penghentian yang benar (lihat di bawah) agar tidak kehilangan pesan.
Selanjutnya, kita membuat instance topik dari produser:
<?php
$ topic = $ rk -> newTopic ( " test " );
Dari sana, kita dapat menghasilkan pesan sebanyak yang kita inginkan, dengan menggunakan metode produksi:
<?php
$ topic -> produce ( RD_KAFKA_PARTITION_UA , 0 , " Message payload " );
Argumen pertama adalah partisi. RD_KAFKA_PARTITION_UA adalah singkatan dari unassigned , dan memungkinkan librdkafka memilih partisi.
Argumen kedua adalah tanda pesan dan harus bernilai 0
atau RD_KAFKA_MSG_F_BLOCK
untuk memblokir produksi pada antrian penuh. Muatan pesan bisa berupa apa saja.
Ini harus dilakukan sebelum menghancurkan contoh produsen
untuk memastikan semua permintaan produk dalam antrian dan dalam penerbangan telah diselesaikan
sebelum mengakhiri. Gunakan nilai yang wajar untuk $timeout_ms
.
Peringatan Tidak menelepon flush dapat menyebabkan hilangnya pesan!
$ rk -> flush ( $ timeout_ms );
Jika Anda tidak ingin mengirim pesan yang belum terkirim, Anda dapat menggunakan purge()
sebelum memanggil flush()
:
// Forget messages that are not fully sent yet
$ rk -> purge ( RD_KAFKA_PURGE_F_QUEUE );
$ rk -> flush ( $ timeout_ms );
Kelas RdKafkaKafkaConsumer mendukung penetapan/pencabutan partisi otomatis. Lihat contohnya di sini.
Catatan Konsumen tingkat rendah adalah API lama, harap pilih menggunakan konsumen tingkat tinggi
Pertama-tama kita perlu membuat konsumen tingkat rendah, dan menambahkan broker (server Kafka) ke dalamnya:
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' log_level ' , ( string ) LOG_DEBUG );
$ conf -> set ( ' debug ' , ' all ' );
$ rk = new RdKafka Consumer ( $ conf );
$ rk -> addBrokers ( " 10.0.0.1,10.0.0.2 " );
Selanjutnya, buat instance topik dengan memanggil metode newTopic()
, dan mulai menggunakan partisi 0:
<?php
$ topic = $ rk -> newTopic ( " test " );
// The first argument is the partition to consume from.
// The second argument is the offset at which to start consumption. Valid values
// are: RD_KAFKA_OFFSET_BEGINNING, RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED.
$ topic -> consumeStart ( 0 , RD_KAFKA_OFFSET_BEGINNING );
Selanjutnya, ambil pesan yang digunakan:
<?php
while ( true ) {
// The first argument is the partition (again).
// The second argument is the timeout.
$ msg = $ topic -> consume ( 0 , 1000 );
if ( null === $ msg || $ msg -> err === RD_KAFKA_RESP_ERR__PARTITION_EOF ) {
// Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
continue ;
} elseif ( $ msg -> err ) {
echo $ msg -> errstr (), "n" ;
break ;
} else {
echo $ msg -> payload , "n" ;
}
}
Catatan Konsumen tingkat rendah adalah API lama, harap pilih menggunakan konsumen tingkat tinggi
Mengonsumsi beberapa topik dan/atau partisi dapat dilakukan dengan memberi tahu librdkafka untuk meneruskan semua pesan dari topik/partisi ini ke antrean internal, lalu mengonsumsi dari antrean ini:
Membuat antrian:
<?php
$ queue = $ rk -> newQueue ();
Menambahkan partisi topik ke antrian:
<?php
$ topic1 = $ rk -> newTopic ( " topic1 " );
$ topic1 -> consumeQueueStart ( 0 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
$ topic1 -> consumeQueueStart ( 1 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
$ topic2 = $ rk -> newTopic ( " topic2 " );
$ topic2 -> consumeQueueStart ( 0 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
Selanjutnya, ambil pesan yang digunakan dari antrean:
<?php
while ( true ) {
// The only argument is the timeout.
$ msg = $ queue -> consume ( 1000 );
if ( null === $ msg || $ msg -> err === RD_KAFKA_RESP_ERR__PARTITION_EOF ) {
// Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
continue ;
} elseif ( $ msg -> err ) {
echo $ msg -> errstr (), "n" ;
break ;
} else {
echo $ msg -> payload , "n" ;
}
}
librdkafka per default menyimpan offset pada broker.
Jika Anda menggunakan file lokal untuk penyimpanan offset, maka secara default file tersebut dibuat di direktori saat ini, dengan nama berdasarkan topik dan partisi. Direktori dapat diubah dengan mengatur properti konfigurasi offset.store.path
.
Untuk mengontrol offset secara manual, setel enable.auto.offset.store
ke false
.
Pengaturan auto.commit.interval.ms
dan auto.commit.enable
akan mengontrol
apakah offset yang disimpan akan dikomit secara otomatis ke broker dan dalam interval berapa.
Untuk mengontrol offset secara manual, setel enable.auto.commit
ke false
.
Waktu maksimum yang diperbolehkan antar panggilan untuk menggunakan pesan bagi konsumen tingkat tinggi.
Jika interval ini terlampaui konsumen dianggap gagal dan kelompok akan dianggap gagal
menyeimbangkan kembali untuk menetapkan kembali partisi ke anggota kelompok konsumen lainnya.
group.id
bertanggung jawab untuk mengatur ID grup konsumen Anda dan harus unik (dan tidak boleh berubah). Kafka menggunakannya untuk mengenali aplikasi dan menyimpan offsetnya.
<?php
$ topicConf = new RdKafka TopicConf ();
$ topicConf -> set ( " auto.commit.interval.ms " , 1e3 );
$ topic = $ rk -> newTopic ( " test " , $ topicConf );
$ topic -> consumeStart ( 0 , RD_KAFKA_OFFSET_STORED );
Referensi Konfigurasi Librdkafka
librdkafka akan melakukan buffering hingga 1GB pesan untuk setiap partisi yang digunakan secara default. Anda dapat menurunkan penggunaan memori dengan mengurangi nilai parameter queued.max.messages.kbytes
pada konsumen Anda.
Setiap instance konsumen dan produsen akan mengambil metadata topik pada interval yang ditentukan oleh parameter topic.metadata.refresh.interval.ms
. Bergantung pada versi librdkafka Anda, parameter defaultnya adalah 10 detik, atau 600 detik.
librdkafka mengambil metadata untuk semua topik cluster secara default. Menyetel topic.metadata.refresh.sparse
ke string "true"
memastikan bahwa librdkafka hanya mengambil topik yang dia gunakan.
Menyetel topic.metadata.refresh.sparse
ke "true"
, dan topic.metadata.refresh.interval.ms
menjadi 600 detik (ditambah beberapa jitter) dapat mengurangi bandwidth secara signifikan, bergantung pada jumlah konsumen dan topik.
Pengaturan ini memungkinkan thread librdkafka untuk dihentikan segera setelah librdkafka selesai menggunakannya. Ini secara efektif memungkinkan proses/permintaan PHP Anda dihentikan dengan cepat.
Saat mengaktifkan ini, Anda harus menutupi sinyal seperti ini:
<?php
// once
pcntl_sigprocmask ( SIG_BLOCK , array ( SIGIO ));
// any time
$ conf -> set ( ' internal.termination.signal ' , SIGIO );
Waktu maksimum yang dapat diblokir oleh operasi soket broker. Nilai yang lebih rendah akan meningkatkan responsivitas dengan mengorbankan penggunaan CPU yang sedikit lebih tinggi.
Mengurangi nilai pengaturan ini akan meningkatkan kecepatan mematikan. Nilai tersebut menentukan waktu maksimum yang akan diblokir librdkafka dalam satu iterasi dari loop baca. Ini juga menentukan seberapa sering thread librdkafka utama akan memeriksa penghentian.
Ini menentukan waktu tunggu maksimum dan default librdkafka sebelum mengirim sejumlah pesan. Mengurangi pengaturan ini menjadi misalnya 1 ms memastikan bahwa pesan dikirim secepatnya, bukan dalam batch.
Hal ini terlihat mengurangi waktu penutupan instance rdkafka, dan proses/permintaan PHP.
Berikut adalah konfigurasi yang dioptimalkan untuk latensi rendah. Hal ini memungkinkan proses/permintaan PHP untuk mengirim pesan secepatnya dan diakhiri dengan cepat.
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' socket.timeout.ms ' , 50 ); // or socket.blocking.max.ms, depending on librdkafka version
if ( function_exists ( ' pcntl_sigprocmask ' )) {
pcntl_sigprocmask ( SIG_BLOCK , array ( SIGIO ));
$ conf -> set ( ' internal.termination.signal ' , SIGIO );
} else {
$ conf -> set ( ' queue.buffering.max.ms ' , 1 );
}
$ producer = new RdKafka Producer ( $ conf );
$ consumer = new RdKafka Consumer ( $ conf );
Disarankan untuk memanggil polling secara berkala untuk melayani panggilan balik. Di php-rdkafka:3.x
jajak pendapat juga dipanggil selama penutupan, jadi mungkin tidak memanggilnya secara berkala
menyebabkan penghentian yang sedikit lebih lama. Contoh di bawah ini melakukan polling hingga tidak ada lagi event di antrian:
$producer->produce(...);
while ($producer->getOutQLen() > 0) {
$producer->poll(1);
}
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/book.rdkafka.html
Sumber dokumentasi dapat ditemukan di sini
Jika dokumentasinya tidak cukup, silakan ajukan pertanyaan di saluran php-rdkafka di Gitter atau Google Grup.
Karena IDE Anda tidak dapat menemukan api php-rdkadka secara otomatis, Anda dapat mempertimbangkan penggunaan paket eksternal yang menyediakan satu set stub untuk kelas, fungsi, dan konstanta php-rdkafka: kwn/php-rdkafka-stubs
Jika Anda ingin berkontribusi, terima kasih :)
Sebelum memulai, silakan lihat dokumen KONTRIBUSI untuk mengetahui cara menggabungkan perubahan Anda.
Dokumentasi disalin dari librdkafka.
Penulis: lihat kontributor.
php-rdkafka dirilis di bawah lisensi MIT.