中文文档
Kafka-php adalah klien kafka PHP murni yang saat ini mendukung Kafka versi lebih besar dari 0.8.x, proyek ini v0.2.x dan v0.1.x tidak kompatibel jika menggunakan v0.1.x asli. Anda dapat merujuk ke document Dokumen Kafka PHP v0.1.x, tetapi disarankan untuk beralih ke v0.2.x . v0.2.x menggunakan implementasi asinkron PHP dan interaksi broker kafka, lebih stabil daripada v0.1.x efisien, karena penggunaan bahasa PHP sehingga tidak mengkompilasi perluasan apa pun dapat digunakan untuk mengurangi biaya akses dan pemeliharaan
Tambahkan direktori lib ke PHP include_path dan gunakan autoloader seperti yang ada di direktori contoh (kode mengikuti konvensi satu kelas per file PEAR/Zend).
Cukup tambahkan ketergantungan nmred/kafka-php
ke proyek Anda jika Anda menggunakan Komposer untuk mengelola ketergantungan proyek Anda.
$ composer require nmred/kafka-php
Berikut adalah contoh minimal file composer.json :
{
"require": {
"nmred/kafka-php": "0.2.*"
}
}
Properti konfigurasi didokumentasikan dalam Konfigurasi
<?php
require ' ../vendor/autoload.php ' ;
date_default_timezone_set ( ' PRC ' );
use Monolog Logger ;
use Monolog Handler StdoutHandler ;
// Create the logger
$ logger = new Logger ( ' my_logger ' );
// Now add some handlers
$ logger -> pushHandler ( new StdoutHandler ());
$ config = Kafka ProducerConfig:: getInstance ();
$ config -> setMetadataRefreshIntervalMs ( 10000 );
$ config -> setMetadataBrokerList ( ' 10.13.4.159:9192 ' );
$ config -> setBrokerVersion ( ' 1.0.0 ' );
$ config -> setRequiredAck ( 1 );
$ config -> setIsAsyn ( false );
$ config -> setProduceInterval ( 500 );
$ producer = new Kafka Producer (
function () {
return [
[
' topic ' => ' test ' ,
' value ' => ' test....message. ' ,
' key ' => ' testkey ' ,
],
];
}
);
$ producer -> setLogger ( $ logger );
$ producer -> success ( function ( $ result ) {
var_dump ( $ result );
});
$ producer -> error ( function ( $ errorCode ) {
var_dump ( $ errorCode );
});
$ producer -> send ( true );
<?php
require ' ../vendor/autoload.php ' ;
date_default_timezone_set ( ' PRC ' );
use Monolog Logger ;
use Monolog Handler StdoutHandler ;
// Create the logger
$ logger = new Logger ( ' my_logger ' );
// Now add some handlers
$ logger -> pushHandler ( new StdoutHandler ());
$ config = Kafka ProducerConfig:: getInstance ();
$ config -> setMetadataRefreshIntervalMs ( 10000 );
$ config -> setMetadataBrokerList ( ' 127.0.0.1:9192 ' );
$ config -> setBrokerVersion ( ' 1.0.0 ' );
$ config -> setRequiredAck ( 1 );
$ config -> setIsAsyn ( false );
$ config -> setProduceInterval ( 500 );
$ producer = new Kafka Producer ();
$ producer -> setLogger ( $ logger );
for ( $ i = 0 ; $ i < 100 ; $ i ++) {
$ producer -> send ([
[
' topic ' => ' test1 ' ,
' value ' => ' test1....message. ' ,
' key ' => '' ,
],
]);
}
<?php
require ' ../vendor/autoload.php ' ;
date_default_timezone_set ( ' PRC ' );
use Monolog Logger ;
use Monolog Handler StdoutHandler ;
// Create the logger
$ logger = new Logger ( ' my_logger ' );
// Now add some handlers
$ logger -> pushHandler ( new StdoutHandler ());
$ config = Kafka ConsumerConfig:: getInstance ();
$ config -> setMetadataRefreshIntervalMs ( 10000 );
$ config -> setMetadataBrokerList ( ' 10.13.4.159:9192 ' );
$ config -> setGroupId ( ' test ' );
$ config -> setBrokerVersion ( ' 1.0.0 ' );
$ config -> setTopics ([ ' test ' ]);
// $ config - > setOffsetReset ( 'earliest' );
$ consumer = new Kafka Consumer ();
$ consumer -> setLogger ( $ logger );
$ consumer -> start ( function ( $ topic , $ part , $ message ) {
var_dump ( $ message );
});
Lihat Contoh
Grup 1: 531522091 Grup 2: 657517955