PHP-rdkafka es un cliente Kafka estable , listo para producción y rápido para PHP basado en librdkafka.
La versión actual es compatible con PHP >= 8.1.0, librdkafka >= 1.5.3, Kafka >= 0.8. La versión 6.x es compatible con PHP 7.x..8.x, librdkafka 0.11..2.x. Las versiones anteriores son compatibles con PHP 5.
El objetivo de la extensión es convertirse en una vinculación de librdkafka de bajo nivel y sin opiniones, centrada en la producción y el soporte a largo plazo.
Se admiten las API de metadatos , productores y consumidores de alto y bajo nivel.
La documentación está disponible aquí.
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/rdkafka.setup.html
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/rdkafka.examples.html
Los parámetros de configuración utilizados a continuación se pueden encontrar en la referencia de configuración de Librdkafka
Para producir, primero necesitamos crear un productor y agregarle corredores (servidores Kafka):
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' log_level ' , ( string ) LOG_DEBUG );
$ conf -> set ( ' debug ' , ' all ' );
$ rk = new RdKafka Producer ( $ conf );
$ rk -> addBrokers ( " 10.0.0.1:9092,10.0.0.2:9092 " );
Advertencia Asegúrese de que su productor siga el cierre adecuado (ver más abajo) para no perder mensajes.
A continuación, creamos una instancia de tema del productor:
<?php
$ topic = $ rk -> newTopic ( " test " );
A partir de ahí, podemos producir tantos mensajes como queramos, utilizando el método producir:
<?php
$ topic -> produce ( RD_KAFKA_PARTITION_UA , 0 , " Message payload " );
El primer argumento es la partición. RD_KAFKA_PARTITION_UA significa no asignado y permite a librdkafka elegir la partición.
El segundo argumento son indicadores de mensaje y deben ser 0
o RD_KAFKA_MSG_F_BLOCK
para bloquear productos en cola llena. La carga útil del mensaje puede ser cualquier cosa.
Esto debe hacerse antes de destruir una instancia de productor.
para asegurarse de que se completen todas las solicitudes de productos agrícolas en cola y en vuelo
antes de terminar. Utilice un valor razonable para $timeout_ms
.
Advertencia ¡No llamar a flush puede provocar la pérdida del mensaje!
$ rk -> flush ( $ timeout_ms );
En caso de que no le importe enviar mensajes que aún no se han enviado, puede usar purge()
antes de llamar flush()
:
// Forget messages that are not fully sent yet
$ rk -> purge ( RD_KAFKA_PURGE_F_QUEUE );
$ rk -> flush ( $ timeout_ms );
La clase RdKafkaKafkaConsumer admite la asignación/revocación automática de particiones. Vea el ejemplo aquí.
Nota El consumidor de bajo nivel es una API heredada; prefiera utilizar el consumidor de alto nivel.
Primero necesitamos crear un consumidor de bajo nivel y agregarle corredores (servidores Kafka):
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' log_level ' , ( string ) LOG_DEBUG );
$ conf -> set ( ' debug ' , ' all ' );
$ rk = new RdKafka Consumer ( $ conf );
$ rk -> addBrokers ( " 10.0.0.1,10.0.0.2 " );
A continuación, cree una instancia de tema llamando al método newTopic()
y comience a consumir en la partición 0:
<?php
$ topic = $ rk -> newTopic ( " test " );
// The first argument is the partition to consume from.
// The second argument is the offset at which to start consumption. Valid values
// are: RD_KAFKA_OFFSET_BEGINNING, RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED.
$ topic -> consumeStart ( 0 , RD_KAFKA_OFFSET_BEGINNING );
A continuación, recupere los mensajes consumidos:
<?php
while ( true ) {
// The first argument is the partition (again).
// The second argument is the timeout.
$ msg = $ topic -> consume ( 0 , 1000 );
if ( null === $ msg || $ msg -> err === RD_KAFKA_RESP_ERR__PARTITION_EOF ) {
// Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
continue ;
} elseif ( $ msg -> err ) {
echo $ msg -> errstr (), "n" ;
break ;
} else {
echo $ msg -> payload , "n" ;
}
}
Nota El consumidor de bajo nivel es una API heredada; prefiera utilizar el consumidor de alto nivel.
Se puede consumir desde múltiples temas y/o particiones diciéndole a librdkafka que reenvíe todos los mensajes de estos temas/particiones a una cola interna y luego consumir desde esta cola:
Creando la cola:
<?php
$ queue = $ rk -> newQueue ();
Agregar particiones de temas a la cola:
<?php
$ topic1 = $ rk -> newTopic ( " topic1 " );
$ topic1 -> consumeQueueStart ( 0 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
$ topic1 -> consumeQueueStart ( 1 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
$ topic2 = $ rk -> newTopic ( " topic2 " );
$ topic2 -> consumeQueueStart ( 0 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
A continuación, recupere los mensajes consumidos de la cola:
<?php
while ( true ) {
// The only argument is the timeout.
$ msg = $ queue -> consume ( 1000 );
if ( null === $ msg || $ msg -> err === RD_KAFKA_RESP_ERR__PARTITION_EOF ) {
// Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
continue ;
} elseif ( $ msg -> err ) {
echo $ msg -> errstr (), "n" ;
break ;
} else {
echo $ msg -> payload , "n" ;
}
}
librdkafka por defecto almacena compensaciones en el corredor.
Si está utilizando un archivo local para el almacenamiento compensado, de forma predeterminada el archivo se crea en el directorio actual, con un nombre basado en el tema y la partición. El directorio se puede cambiar configurando la propiedad de configuración offset.store.path
.
Para controlar manualmente el desplazamiento, establezca enable.auto.offset.store
en false
.
Las configuraciones auto.commit.interval.ms
y auto.commit.enable
controlarán
si las compensaciones almacenadas se enviarán automáticamente al corredor y en qué intervalo.
Para controlar manualmente el desplazamiento, establezca enable.auto.commit
en false
.
Tiempo máximo permitido entre llamadas para consumir mensajes para consumidores de alto nivel.
Si se excede este intervalo, el consumidor se considera fallido y el grupo
reequilibre para reasignar las particiones a otro miembro del grupo de consumidores.
group.id
es responsable de configurar el ID de su grupo de consumidores y debe ser único (y no debe cambiar). Kafka lo utiliza para reconocer aplicaciones y almacenar compensaciones para ellas.
<?php
$ topicConf = new RdKafka TopicConf ();
$ topicConf -> set ( " auto.commit.interval.ms " , 1e3 );
$ topic = $ rk -> newTopic ( " test " , $ topicConf );
$ topic -> consumeStart ( 0 , RD_KAFKA_OFFSET_STORED );
Referencia de configuración de Librdkafka
librdkafka almacenará en búfer hasta 1 GB de mensajes para cada partición consumida de forma predeterminada. Puede reducir el uso de memoria reduciendo el valor del parámetro queued.max.messages.kbytes
en sus consumidores.
Cada instancia de consumidor y productor recuperará los metadatos de los temas en un intervalo definido por el parámetro topic.metadata.refresh.interval.ms
. Dependiendo de su versión de librdkafka, el parámetro predeterminado es 10 segundos o 600 segundos.
librdkafka recupera los metadatos de todos los temas del clúster de forma predeterminada. Establecer topic.metadata.refresh.sparse
en la cadena "true"
garantiza que librdkafka obtenga solo los temas que utiliza.
Configurar topic.metadata.refresh.sparse
en "true"
y topic.metadata.refresh.interval.ms
en 600 segundos (más algo de fluctuación) puede reducir mucho el ancho de banda, dependiendo de la cantidad de consumidores y temas.
Esta configuración permite que los subprocesos de librdkafka finalicen tan pronto como librdkafka termine con ellos. Esto efectivamente permite que sus procesos/solicitudes PHP finalicen rápidamente.
Al habilitar esto, debes enmascarar la señal de esta manera:
<?php
// once
pcntl_sigprocmask ( SIG_BLOCK , array ( SIGIO ));
// any time
$ conf -> set ( ' internal.termination.signal ' , SIGIO );
Tiempo máximo que puede bloquear una operación de socket de intermediario. Un valor más bajo mejora la capacidad de respuesta a expensas de un uso de CPU ligeramente mayor.
Reducir el valor de esta configuración mejora la velocidad de apagado. El valor define el tiempo máximo que librdkafka bloqueará en una iteración de un bucle de lectura. Esto también define la frecuencia con la que el hilo principal de librdkafka comprobará la terminación.
Esto define el tiempo máximo y predeterminado que esperará librdkafka antes de enviar un lote de mensajes. Reducir esta configuración a, por ejemplo, 1 ms garantiza que los mensajes se envíen lo antes posible, en lugar de ser por lotes.
Se ha visto que esto reduce el tiempo de apagado de la instancia rdkafka y del proceso/solicitud PHP.
Aquí hay una configuración optimizada para baja latencia. Esto permite que un proceso/solicitud PHP envíe mensajes lo antes posible y finalice rápidamente.
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' socket.timeout.ms ' , 50 ); // or socket.blocking.max.ms, depending on librdkafka version
if ( function_exists ( ' pcntl_sigprocmask ' )) {
pcntl_sigprocmask ( SIG_BLOCK , array ( SIGIO ));
$ conf -> set ( ' internal.termination.signal ' , SIGIO );
} else {
$ conf -> set ( ' queue.buffering.max.ms ' , 1 );
}
$ producer = new RdKafka Producer ( $ conf );
$ consumer = new RdKafka Consumer ( $ conf );
Se recomienda llamar a la encuesta a intervalos regulares para atender las devoluciones de llamada. En php-rdkafka:3.x
La encuesta también se llamó durante el cierre, por lo que no llamarla en intervalos regulares podría
provocará una parada un poco más larga. El siguiente ejemplo sondea hasta que no hay más eventos en la cola:
$producer->produce(...);
while ($producer->getOutQLen() > 0) {
$producer->poll(1);
}
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/book.rdkafka.html
La fuente de la documentación se puede encontrar aquí.
Si la documentación no es suficiente, no dude en hacer preguntas en los canales php-rdkafka en Gitter o Grupos de Google.
Debido a que su IDE no puede descubrir automáticamente la API de php-rdkadka, puede considerar el uso de un paquete externo que proporcione un conjunto de códigos auxiliares para clases, funciones y constantes de php-rdkafka: kwn/php-rdkafka-stubs
Si quieres contribuir, gracias :)
Antes de comenzar, eche un vistazo al documento CONTRIBUCIÓN para ver cómo fusionar sus cambios.
Documentación copiada de librdkafka.
Autores: ver colaboradores.
php-rdkafka se publica bajo la licencia MIT.