PHP-rdkafka ist ein stabiler , produktionsbereiter und schneller Kafka-Client für PHP, der auf librdkafka basiert.
Die aktuelle Version unterstützt PHP >= 8.1.0, librdkafka >= 1.5.3, Kafka >= 0.8. Version 6.x unterstützt PHP 7.x..8.x, librdkafka 0.11..2.x. Ältere Versionen unterstützen PHP 5.
Das Ziel der Erweiterung besteht darin, eine unparteiische Librdkafka-Bindung auf niedrigem Niveau zu schaffen, die sich auf Produktion und langfristige Unterstützung konzentriert.
Die High-Level- und Low-Level -Consumer- , Producer- und Metadaten- APIs werden unterstützt.
Die Dokumentation finden Sie hier.
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/rdkafka.setup.html
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/rdkafka.examples.html
Die unten verwendeten Konfigurationsparameter finden Sie in der Librdkafka-Konfigurationsreferenz
Zum Produzieren müssen wir zunächst einen Produzenten erstellen und diesem Broker (Kafka-Server) hinzufügen:
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' log_level ' , ( string ) LOG_DEBUG );
$ conf -> set ( ' debug ' , ' all ' );
$ rk = new RdKafka Producer ( $ conf );
$ rk -> addBrokers ( " 10.0.0.1:9092,10.0.0.2:9092 " );
Warnung Stellen Sie sicher, dass Ihr Produzent das ordnungsgemäße Herunterfahren befolgt (siehe unten), um keine Nachrichten zu verlieren.
Als nächstes erstellen wir eine Themeninstanz vom Produzenten:
<?php
$ topic = $ rk -> newTopic ( " test " );
Von dort aus können wir mit der Methode „produzieren“ so viele Nachrichten produzieren, wie wir möchten:
<?php
$ topic -> produce ( RD_KAFKA_PARTITION_UA , 0 , " Message payload " );
Das erste Argument ist die Partition. RD_KAFKA_PARTITION_UA steht für unassigned und lässt librdkafka die Partition auswählen.
Das zweite Argument sind Nachrichtenflags und sollten entweder 0 sein
oder RD_KAFKA_MSG_F_BLOCK
um die Produktion in einer vollen Warteschlange zu blockieren. Die Nachrichtennutzlast kann alles sein.
Dies sollte vor der Zerstörung einer Produzenteninstanz erfolgen
um sicherzustellen, dass alle in der Warteschlange stehenden und während des Flugs anfallenden Produktanfragen abgeschlossen werden
vor der Beendigung. Verwenden Sie einen angemessenen Wert für $timeout_ms
.
Warnung: Wenn Sie Flush nicht aufrufen, kann dies zum Verlust von Nachrichten führen!
$ rk -> flush ( $ timeout_ms );
Falls es Ihnen egal ist, Nachrichten zu senden, die noch nicht gesendet wurden, können Sie purge()
verwenden, bevor Sie flush()
aufrufen:
// Forget messages that are not fully sent yet
$ rk -> purge ( RD_KAFKA_PURGE_F_QUEUE );
$ rk -> flush ( $ timeout_ms );
Die RdKafkaKafkaConsumer-Klasse unterstützt die automatische Zuweisung/Widerrufung von Partitionen. Sehen Sie sich das Beispiel hier an.
Hinweis Der Low-Level-Consumer ist eine Legacy-API. Bitte verwenden Sie lieber den High-Level-Consumer
Wir müssen zunächst einen Low-Level-Consumer erstellen und diesem Broker (Kafka-Server) hinzufügen:
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' log_level ' , ( string ) LOG_DEBUG );
$ conf -> set ( ' debug ' , ' all ' );
$ rk = new RdKafka Consumer ( $ conf );
$ rk -> addBrokers ( " 10.0.0.1,10.0.0.2 " );
Erstellen Sie als Nächstes eine Themeninstanz, indem Sie die Methode newTopic()
aufrufen, und beginnen Sie mit der Nutzung auf Partition 0:
<?php
$ topic = $ rk -> newTopic ( " test " );
// The first argument is the partition to consume from.
// The second argument is the offset at which to start consumption. Valid values
// are: RD_KAFKA_OFFSET_BEGINNING, RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED.
$ topic -> consumeStart ( 0 , RD_KAFKA_OFFSET_BEGINNING );
Rufen Sie als Nächstes die konsumierten Nachrichten ab:
<?php
while ( true ) {
// The first argument is the partition (again).
// The second argument is the timeout.
$ msg = $ topic -> consume ( 0 , 1000 );
if ( null === $ msg || $ msg -> err === RD_KAFKA_RESP_ERR__PARTITION_EOF ) {
// Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
continue ;
} elseif ( $ msg -> err ) {
echo $ msg -> errstr (), "n" ;
break ;
} else {
echo $ msg -> payload , "n" ;
}
}
Hinweis Der Low-Level-Consumer ist eine Legacy-API. Bitte verwenden Sie lieber den High-Level-Consumer
Der Konsum aus mehreren Themen und/oder Partitionen kann erfolgen, indem Sie librdkafka anweisen, alle Nachrichten von diesen Themen/Partitionen an eine interne Warteschlange weiterzuleiten, und dann aus dieser Warteschlange konsumieren:
Erstellen der Warteschlange:
<?php
$ queue = $ rk -> newQueue ();
Themenpartitionen zur Warteschlange hinzufügen:
<?php
$ topic1 = $ rk -> newTopic ( " topic1 " );
$ topic1 -> consumeQueueStart ( 0 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
$ topic1 -> consumeQueueStart ( 1 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
$ topic2 = $ rk -> newTopic ( " topic2 " );
$ topic2 -> consumeQueueStart ( 0 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
Rufen Sie als Nächstes die verbrauchten Nachrichten aus der Warteschlange ab:
<?php
while ( true ) {
// The only argument is the timeout.
$ msg = $ queue -> consume ( 1000 );
if ( null === $ msg || $ msg -> err === RD_KAFKA_RESP_ERR__PARTITION_EOF ) {
// Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
continue ;
} elseif ( $ msg -> err ) {
echo $ msg -> errstr (), "n" ;
break ;
} else {
echo $ msg -> payload , "n" ;
}
}
librdkafka speichert standardmäßig Offsets auf dem Broker.
Wenn Sie eine lokale Datei für die Offset-Speicherung verwenden, wird die Datei standardmäßig im aktuellen Verzeichnis erstellt, mit einem Namen, der auf dem Thema und der Partition basiert. Das Verzeichnis kann durch Festlegen der Konfigurationseigenschaft offset.store.path
geändert werden.
Um den Offset manuell zu steuern, setzen Sie enable.auto.offset.store
auf false
.
Die Einstellungen auto.commit.interval.ms
und auto.commit.enable
steuern
ob und in welchem Intervall die gespeicherten Offsets automatisch an den Broker übermittelt werden.
Um den Offset manuell zu steuern, setzen Sie enable.auto.commit
auf false
.
Maximal zulässige Zeit zwischen Aufrufen zum Konsumieren von Nachrichten für High-Level-Consumer.
Wenn dieses Intervall überschritten wird, gilt der Verbraucher als ausgefallen und die Gruppe wird ausgefallen sein
Neuverteilung, um die Partitionen einem anderen Mitglied der Verbrauchergruppe neu zuzuweisen.
group.id
ist für die Festlegung Ihrer Verbrauchergruppen-ID verantwortlich und diese sollte eindeutig sein (und sich nicht ändern). Kafka nutzt es, um Anwendungen zu erkennen und Offsets für sie zu speichern.
<?php
$ topicConf = new RdKafka TopicConf ();
$ topicConf -> set ( " auto.commit.interval.ms " , 1e3 );
$ topic = $ rk -> newTopic ( " test " , $ topicConf );
$ topic -> consumeStart ( 0 , RD_KAFKA_OFFSET_STORED );
Referenz zur Librdkafka-Konfiguration
librdkafka puffert standardmäßig bis zu 1 GB an Nachrichten für jede genutzte Partition. Sie können die Speichernutzung senken, indem Sie den Wert des Parameters queued.max.messages.kbytes
auf Ihren Verbrauchern reduzieren.
Jede Consumer- und Producer-Instanz ruft Themenmetadaten in einem durch den Parameter topic.metadata.refresh.interval.ms
definierten Intervall ab. Abhängig von Ihrer librdkafka-Version ist der Parameter standardmäßig auf 10 Sekunden oder 600 Sekunden eingestellt.
librdkafka ruft standardmäßig die Metadaten für alle Themen des Clusters ab. Durch Setzen von topic.metadata.refresh.sparse
auf die Zeichenfolge "true"
wird sichergestellt, dass librdkafka nur die von ihm verwendeten Themen abruft.
Wenn Sie topic.metadata.refresh.sparse
auf "true"
und topic.metadata.refresh.interval.ms
auf 600 Sekunden (plus etwas Jitter) setzen, kann die Bandbreite abhängig von der Anzahl der Verbraucher und Themen erheblich reduziert werden.
Mit dieser Einstellung können librdkafka-Threads beendet werden, sobald librdkafka mit ihnen fertig ist. Dadurch können Ihre PHP-Prozesse/Anfragen effektiv schnell beendet werden.
Wenn Sie dies aktivieren, müssen Sie das Signal wie folgt maskieren:
<?php
// once
pcntl_sigprocmask ( SIG_BLOCK , array ( SIGIO ));
// any time
$ conf -> set ( ' internal.termination.signal ' , SIGIO );
Maximale Zeit, die ein Broker-Socket-Vorgang blockieren darf. Ein niedrigerer Wert verbessert die Reaktionsfähigkeit auf Kosten einer etwas höheren CPU-Auslastung.
Durch Verringern des Werts dieser Einstellung wird die Abschaltgeschwindigkeit verbessert. Der Wert definiert die maximale Zeit, die librdkafka in einer Iteration einer Leseschleife blockiert. Dies definiert auch, wie oft der Hauptthread von librdkafka auf Beendigung prüft.
Dies definiert die maximale und standardmäßige Zeit, die librdkafka wartet, bevor es einen Nachrichtenstapel sendet. Durch die Reduzierung dieser Einstellung auf z. B. 1 ms wird sichergestellt, dass Nachrichten so schnell wie möglich und nicht gestapelt gesendet werden.
Es hat sich gezeigt, dass dies die Abschaltzeit der rdkafka-Instanz und des PHP-Prozesses/der PHP-Anfrage verkürzt.
Hier ist eine Konfiguration, die für geringe Latenz optimiert ist. Dadurch kann ein PHP-Prozess/eine PHP-Anfrage so schnell wie möglich Nachrichten senden und schnell beendet werden.
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' socket.timeout.ms ' , 50 ); // or socket.blocking.max.ms, depending on librdkafka version
if ( function_exists ( ' pcntl_sigprocmask ' )) {
pcntl_sigprocmask ( SIG_BLOCK , array ( SIGIO ));
$ conf -> set ( ' internal.termination.signal ' , SIGIO );
} else {
$ conf -> set ( ' queue.buffering.max.ms ' , 1 );
}
$ producer = new RdKafka Producer ( $ conf );
$ consumer = new RdKafka Consumer ( $ conf );
Es wird empfohlen, die Umfrage in regelmäßigen Abständen anzurufen, um Rückrufe zu ermöglichen. In php-rdkafka:3.x
poll wurde auch während des Herunterfahrens aufgerufen, daher könnte es sein, dass es nicht in regelmäßigen Abständen aufgerufen wird
zu einem etwas längeren Stillstand führen. Im folgenden Beispiel werden Abfragen durchgeführt, bis sich keine Ereignisse mehr in der Warteschlange befinden:
$producer->produce(...);
while ($producer->getOutQLen() > 0) {
$producer->poll(1);
}
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/book.rdkafka.html
Die Quelle der Dokumentation finden Sie hier
Wenn die Dokumentation nicht ausreicht, können Sie gerne eine Frage auf den php-rdkafka-Kanälen auf Gitter oder Google Groups stellen.
Da Ihre IDE die PHP-Rdkafka-API nicht automatisch erkennen kann, können Sie die Verwendung eines externen Pakets in Betracht ziehen, das eine Reihe von Stubs für PHP-Rdkafka-Klassen, -Funktionen und -Konstanten bereitstellt: kwn/php-rdkafka-stubs
Wenn Sie einen Beitrag leisten möchten, vielen Dank :)
Bevor Sie beginnen, werfen Sie bitte einen Blick auf das Dokument „CONTRIBUTING“, um zu erfahren, wie Sie Ihre Änderungen zusammenführen können.
Dokumentation von librdkafka kopiert.
Autoren: siehe Mitwirkende.
php-rdkafka wird unter der MIT-Lizenz veröffentlicht.