PHP-rdkafka est un client Kafka stable , prêt pour la production et rapide pour PHP basé sur librdkafka.
La version actuelle prend en charge PHP >= 8.1.0, librdkafka >= 1.5.3, Kafka >= 0.8. La version 6.x prend en charge PHP 7.x..8.x, librdkafka 0.11..2.x. Les anciennes versions prennent en charge PHP 5.
Le but de l'extension est d'être une liaison librdkafka de bas niveau et sans opinion, axée sur la production et le support à long terme.
Les API de consommateur , de producteur et de métadonnées de haut niveau et de bas niveau sont prises en charge.
La documentation est disponible ici.
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/rdkafka.setup.html
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/rdkafka.examples.html
Les paramètres de configuration utilisés ci-dessous peuvent être trouvés dans la référence de configuration Librdkafka
Pour produire, il faut d'abord créer un producteur, et y ajouter des courtiers (serveurs Kafka) :
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' log_level ' , ( string ) LOG_DEBUG );
$ conf -> set ( ' debug ' , ' all ' );
$ rk = new RdKafka Producer ( $ conf );
$ rk -> addBrokers ( " 10.0.0.1:9092,10.0.0.2:9092 " );
Avertissement Assurez-vous que votre producteur suit un arrêt approprié (voir ci-dessous) pour ne pas perdre de messages.
Ensuite, nous créons une instance de sujet à partir du producteur :
<?php
$ topic = $ rk -> newTopic ( " test " );
À partir de là, nous pouvons produire autant de messages que nous le souhaitons, en utilisant la méthode Produce :
<?php
$ topic -> produce ( RD_KAFKA_PARTITION_UA , 0 , " Message payload " );
Le premier argument est la partition. RD_KAFKA_PARTITION_UA signifie unassigned et permet à librdkafka de choisir la partition.
Le deuxième argument est constitué d'indicateurs de message et doit être soit 0
ou RD_KAFKA_MSG_F_BLOCK
pour bloquer la production en file d'attente pleine. La charge utile du message peut être n'importe quoi.
Cela doit être fait avant de détruire une instance de producteur
pour s'assurer que toutes les demandes de produits en file d'attente et en vol sont complétées
avant de terminer. Utilisez une valeur raisonnable pour $timeout_ms
.
Avertissement Ne pas appeler flush peut entraîner une perte de message !
$ rk -> flush ( $ timeout_ms );
Si vous ne souhaitez pas envoyer des messages qui n'ont pas encore été envoyés, vous pouvez utiliser purge()
avant d'appeler flush()
:
// Forget messages that are not fully sent yet
$ rk -> purge ( RD_KAFKA_PURGE_F_QUEUE );
$ rk -> flush ( $ timeout_ms );
La classe RdKafkaKafkaConsumer prend en charge l'attribution/révocation automatique des partitions. Voir l'exemple ici.
Remarque Le consommateur de bas niveau est une API héritée, veuillez préférer utiliser le consommateur de haut niveau
Il faut d'abord créer un consommateur de bas niveau, et y ajouter des courtiers (serveurs Kafka) :
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' log_level ' , ( string ) LOG_DEBUG );
$ conf -> set ( ' debug ' , ' all ' );
$ rk = new RdKafka Consumer ( $ conf );
$ rk -> addBrokers ( " 10.0.0.1,10.0.0.2 " );
Ensuite, créez une instance de sujet en appelant la méthode newTopic()
et commencez à consommer sur la partition 0 :
<?php
$ topic = $ rk -> newTopic ( " test " );
// The first argument is the partition to consume from.
// The second argument is the offset at which to start consumption. Valid values
// are: RD_KAFKA_OFFSET_BEGINNING, RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED.
$ topic -> consumeStart ( 0 , RD_KAFKA_OFFSET_BEGINNING );
Ensuite, récupérez les messages consommés :
<?php
while ( true ) {
// The first argument is the partition (again).
// The second argument is the timeout.
$ msg = $ topic -> consume ( 0 , 1000 );
if ( null === $ msg || $ msg -> err === RD_KAFKA_RESP_ERR__PARTITION_EOF ) {
// Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
continue ;
} elseif ( $ msg -> err ) {
echo $ msg -> errstr (), "n" ;
break ;
} else {
echo $ msg -> payload , "n" ;
}
}
Remarque Le consommateur de bas niveau est une API héritée, veuillez préférer utiliser le consommateur de haut niveau
La consommation à partir de plusieurs sujets et/ou partitions peut être effectuée en demandant à librdkafka de transférer tous les messages de ces sujets/partitions vers une file d'attente interne, puis en consommant à partir de cette file d'attente :
Création de la file d'attente :
<?php
$ queue = $ rk -> newQueue ();
Ajout de partitions de sujets à la file d'attente :
<?php
$ topic1 = $ rk -> newTopic ( " topic1 " );
$ topic1 -> consumeQueueStart ( 0 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
$ topic1 -> consumeQueueStart ( 1 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
$ topic2 = $ rk -> newTopic ( " topic2 " );
$ topic2 -> consumeQueueStart ( 0 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
Ensuite, récupérez les messages consommés dans la file d'attente :
<?php
while ( true ) {
// The only argument is the timeout.
$ msg = $ queue -> consume ( 1000 );
if ( null === $ msg || $ msg -> err === RD_KAFKA_RESP_ERR__PARTITION_EOF ) {
// Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
continue ;
} elseif ( $ msg -> err ) {
echo $ msg -> errstr (), "n" ;
break ;
} else {
echo $ msg -> payload , "n" ;
}
}
librdkafka stocke par défaut les compensations sur le courtier.
Si vous utilisez un fichier local pour le stockage offset, le fichier est créé par défaut dans le répertoire actuel, avec un nom basé sur le sujet et la partition. Le répertoire peut être modifié en définissant la propriété de configuration offset.store.path
.
Pour contrôler manuellement le décalage, définissez enable.auto.offset.store
sur false
.
Les paramètres auto.commit.interval.ms
et auto.commit.enable
contrôleront
si les compensations stockées seront automatiquement validées auprès du courtier et dans quel intervalle.
Pour contrôler manuellement le décalage, définissez enable.auto.commit
sur false
.
Temps maximum autorisé entre les appels pour consommer les messages destinés aux consommateurs de haut niveau.
Si cet intervalle est dépassé, le consommateur est considéré comme ayant échoué et le groupe sera
rééquilibrer afin de réaffecter les partitions à un autre membre du groupe de consommateurs.
group.id
est responsable de la définition de votre identifiant de groupe de consommateurs et il doit être unique (et ne doit pas changer). Kafka l'utilise pour reconnaître les applications et stocker leurs compensations.
<?php
$ topicConf = new RdKafka TopicConf ();
$ topicConf -> set ( " auto.commit.interval.ms " , 1e3 );
$ topic = $ rk -> newTopic ( " test " , $ topicConf );
$ topic -> consumeStart ( 0 , RD_KAFKA_OFFSET_STORED );
Référence de configuration de Librdkafka
librdkafka mettra en mémoire tampon jusqu'à 1 Go de messages pour chaque partition consommée par défaut. Vous pouvez réduire l'utilisation de la mémoire en réduisant la valeur du paramètre queued.max.messages.kbytes
sur vos consommateurs.
Chaque instance de consommateur et de producteur récupérera les métadonnées des sujets à un intervalle défini par le paramètre topic.metadata.refresh.interval.ms
. Selon votre version de librdkafka, le paramètre est par défaut de 10 secondes ou 600 secondes.
librdkafka récupère les métadonnées de tous les sujets du cluster par défaut. Définir topic.metadata.refresh.sparse
sur la chaîne "true"
garantit que librdkafka récupère uniquement les sujets qu'il utilise.
Définir topic.metadata.refresh.sparse
sur "true"
et topic.metadata.refresh.interval.ms
sur 600 secondes (plus un peu de gigue) peut réduire considérablement la bande passante, en fonction du nombre de consommateurs et de sujets.
Ce paramètre permet aux threads librdkafka de se terminer dès que librdkafka en a terminé avec eux. Cela permet effectivement à vos processus/requêtes PHP de se terminer rapidement.
Lorsque vous activez cela, vous devez masquer le signal comme ceci :
<?php
// once
pcntl_sigprocmask ( SIG_BLOCK , array ( SIGIO ));
// any time
$ conf -> set ( ' internal.termination.signal ' , SIGIO );
Durée maximale pendant laquelle une opération de socket de courtier peut se bloquer. Une valeur inférieure améliore la réactivité au détriment d'une utilisation du processeur légèrement plus élevée.
La réduction de la valeur de ce paramètre améliore la vitesse d’arrêt. La valeur définit la durée maximale pendant laquelle librdkafka bloquera dans une itération d'une boucle de lecture. Cela définit également la fréquence à laquelle le thread principal librdkafka vérifiera la terminaison.
Ceci définit le temps maximum et par défaut que librdkafka attendra avant d'envoyer un lot de messages. Réduire ce paramètre à par exemple 1 ms garantit que les messages sont envoyés dès que possible, au lieu d'être groupés.
Cela a été constaté pour réduire le temps d'arrêt de l'instance rdkafka et du processus/requête PHP.
Voici une configuration optimisée pour une faible latence. Cela permet à un processus/requête PHP d'envoyer des messages dès que possible et de se terminer rapidement.
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' socket.timeout.ms ' , 50 ); // or socket.blocking.max.ms, depending on librdkafka version
if ( function_exists ( ' pcntl_sigprocmask ' )) {
pcntl_sigprocmask ( SIG_BLOCK , array ( SIGIO ));
$ conf -> set ( ' internal.termination.signal ' , SIGIO );
} else {
$ conf -> set ( ' queue.buffering.max.ms ' , 1 );
}
$ producer = new RdKafka Producer ( $ conf );
$ consumer = new RdKafka Consumer ( $ conf );
Il est conseillé d'appeler Poll à intervalles réguliers pour effectuer des rappels. Dans php-rdkafka:3.x
poll a également été appelé pendant l'arrêt, donc ne pas l'appeler à intervalles réguliers pourrait
entraîner un arrêt légèrement plus long. L'exemple ci-dessous interroge jusqu'à ce qu'il n'y ait plus d'événements dans la file d'attente :
$producer->produce(...);
while ($producer->getOutQLen() > 0) {
$producer->poll(1);
}
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/book.rdkafka.html
La source de la documentation peut être trouvée ici
Si la documentation ne suffit pas, n'hésitez pas à poser des questions sur les canaux php-rdkafka sur Gitter ou Google Groups.
Parce que votre IDE n'est pas capable de découvrir automatiquement l'API php-rdkadka, vous pouvez envisager d'utiliser un package externe fournissant un ensemble de stubs pour les classes, fonctions et constantes php-rdkafka : kwn/php-rdkafka-stubs
Si vous souhaitez contribuer, merci :)
Avant de commencer, veuillez jeter un œil au document CONTRIBUTING pour voir comment fusionner vos modifications.
Documentation copiée de librdkafka.
Auteurs : voir contributeurs.
php-rdkafka est publié sous la licence MIT.