中文档
Kafka-php เป็นไคลเอนต์ PHP kafka ล้วนๆ ที่ปัจจุบันรองรับ Kafka เวอร์ชันมากกว่า 0.8.x โปรเจ็กต์ v0.2.x และ v0.1.x นี้เข้ากันไม่ได้หากใช้ v0.1.x ดั้งเดิม คุณสามารถอ้างอิงถึง document Kafka PHP v0.1.x Document แต่แนะนำให้เปลี่ยนเป็น v0.2.x v0.2.x ใช้ PHP แบบอะซิงโครนัสและการโต้ตอบของโบรกเกอร์ kafka มีเสถียรภาพมากกว่า v0.1.x มีประสิทธิภาพ เนื่องจากการใช้ภาษา PHP ดังนั้นอย่าคอมไพล์ส่วนขยายใด ๆ สามารถใช้เพื่อลดต้นทุนการเข้าถึงและบำรุงรักษา
เพิ่มไดเร็กทอรี lib ลงใน PHP include_path และใช้ตัวโหลดอัตโนมัติเหมือนกับที่อยู่ในไดเร็กทอรีตัวอย่าง (โค้ดเป็นไปตามแบบแผน PEAR/Zend one-class-per-file)
เพียงเพิ่มการขึ้นต่อกัน nmred/kafka-php
ให้กับโปรเจ็กต์ของคุณ หากคุณใช้ Composer เพื่อจัดการการขึ้นต่อกันของโปรเจ็กต์ของคุณ
$ composer require nmred/kafka-php
นี่คือตัวอย่างขั้นต่ำของไฟล์ composer.json :
{
"require": {
"nmred/kafka-php": "0.2.*"
}
}
คุณสมบัติการกำหนดค่าได้รับการบันทึกไว้ในการกำหนดค่า
<?php
require ' ../vendor/autoload.php ' ;
date_default_timezone_set ( ' PRC ' );
use Monolog Logger ;
use Monolog Handler StdoutHandler ;
// Create the logger
$ logger = new Logger ( ' my_logger ' );
// Now add some handlers
$ logger -> pushHandler ( new StdoutHandler ());
$ config = Kafka ProducerConfig:: getInstance ();
$ config -> setMetadataRefreshIntervalMs ( 10000 );
$ config -> setMetadataBrokerList ( ' 10.13.4.159:9192 ' );
$ config -> setBrokerVersion ( ' 1.0.0 ' );
$ config -> setRequiredAck ( 1 );
$ config -> setIsAsyn ( false );
$ config -> setProduceInterval ( 500 );
$ producer = new Kafka Producer (
function () {
return [
[
' topic ' => ' test ' ,
' value ' => ' test....message. ' ,
' key ' => ' testkey ' ,
],
];
}
);
$ producer -> setLogger ( $ logger );
$ producer -> success ( function ( $ result ) {
var_dump ( $ result );
});
$ producer -> error ( function ( $ errorCode ) {
var_dump ( $ errorCode );
});
$ producer -> send ( true );
<?php
require ' ../vendor/autoload.php ' ;
date_default_timezone_set ( ' PRC ' );
use Monolog Logger ;
use Monolog Handler StdoutHandler ;
// Create the logger
$ logger = new Logger ( ' my_logger ' );
// Now add some handlers
$ logger -> pushHandler ( new StdoutHandler ());
$ config = Kafka ProducerConfig:: getInstance ();
$ config -> setMetadataRefreshIntervalMs ( 10000 );
$ config -> setMetadataBrokerList ( ' 127.0.0.1:9192 ' );
$ config -> setBrokerVersion ( ' 1.0.0 ' );
$ config -> setRequiredAck ( 1 );
$ config -> setIsAsyn ( false );
$ config -> setProduceInterval ( 500 );
$ producer = new Kafka Producer ();
$ producer -> setLogger ( $ logger );
for ( $ i = 0 ; $ i < 100 ; $ i ++) {
$ producer -> send ([
[
' topic ' => ' test1 ' ,
' value ' => ' test1....message. ' ,
' key ' => '' ,
],
]);
}
<?php
require ' ../vendor/autoload.php ' ;
date_default_timezone_set ( ' PRC ' );
use Monolog Logger ;
use Monolog Handler StdoutHandler ;
// Create the logger
$ logger = new Logger ( ' my_logger ' );
// Now add some handlers
$ logger -> pushHandler ( new StdoutHandler ());
$ config = Kafka ConsumerConfig:: getInstance ();
$ config -> setMetadataRefreshIntervalMs ( 10000 );
$ config -> setMetadataBrokerList ( ' 10.13.4.159:9192 ' );
$ config -> setGroupId ( ' test ' );
$ config -> setBrokerVersion ( ' 1.0.0 ' );
$ config -> setTopics ([ ' test ' ]);
// $ config - > setOffsetReset ( 'earliest' );
$ consumer = new Kafka Consumer ();
$ consumer -> setLogger ( $ logger );
$ consumer -> start ( function ( $ topic , $ part , $ message ) {
var_dump ( $ message );
});
อ้างอิงตัวอย่าง
กลุ่ม 1: 531522091 กลุ่ม 2: 657517955