DEBIDO A PROBLEMAS CON LOS LANZAMIENTOS, SE SUGIERE UTILIZAR https://github.com/wbarnha/kafka-python-ng POR EL MOMENTO
Cliente Python para el sistema de procesamiento de flujo distribuido Apache Kafka. kafka-python está diseñado para funcionar de manera muy similar al cliente java oficial, con algunas interfaces pitónicas (por ejemplo, iteradores de consumidor).
kafka-python se utiliza mejor con brokers más nuevos (0.9+), pero es compatible con versiones anteriores (hasta 0.8.0). Algunas funciones solo estarán habilitadas en corredores más nuevos. Por ejemplo, grupos de consumidores totalmente coordinados (es decir, asignación de partición dinámica a múltiples consumidores en el mismo grupo) requieren el uso de 0.9+ brokers Kafka. Para admitir esta función en versiones anteriores del corredor, sería necesario escribir y mantener un código personalizado de elección de liderazgo y membresía/verificación de estado (quizás usando zookeeper o cónsul). Para los brokers más antiguos, puede lograr algo similar asignando manualmente diferentes particiones a cada instancia de consumidor con herramientas de administración de configuración como chef, ansible, etc. Este enfoque funcionará bien, aunque no admite el reequilibrio en caso de fallas. Consulte <https://kafka-python.readthedocs.io/en/master/compatibility.html> para obtener más detalles.
Tenga en cuenta que la rama maestra puede contener funciones inéditas. Para obtener la documentación de la versión, consulte readthedocs y/o la ayuda en línea de Python.
>>> pip install kafka - python
KafkaConsumer es un consumidor de mensajes de alto nivel, destinado a funcionar de la forma más similar posible al cliente Java oficial. El soporte total para grupos de consumidores coordinados requiere el uso de agentes Kafka que admitan las API del grupo: kafka v0.9+.
Consulte <https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html> para obtener detalles de configuración y API.
El iterador del consumidor devuelve ConsumerRecords, que son tuplas con nombre simples que exponen los atributos básicos del mensaje: tema, partición, desplazamiento, clave y valor:
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer( ' my_favorite_topic ' )
>>> for msg in consumer:
... print (msg)
>>> # join a consumer group for dynamic partition assignment and offset commits
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer( ' my_favorite_topic ' , group_id = ' my_favorite_group ' )
>>> for msg in consumer:
... print (msg)
>>> # manually assign the partition list for the consumer
>>> from kafka import TopicPartition
>>> consumer = KafkaConsumer( bootstrap_servers = ' localhost:1234 ' )
>>> consumer.assign([TopicPartition( ' foobar ' , 2 )])
>>> msg = next (consumer)
>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer( value_deserializer = msgpack.loads)
>>> consumer.subscribe([ ' msgpackfoo ' ])
>>> for msg in consumer:
... assert isinstance (msg.value, dict )
>>> # Access record headers. The returned value is a list of tuples
>>> # with str, bytes for key and value
>>> for msg in consumer:
... print (msg.headers)
>>> # Get consumer metrics
>>> metrics = consumer.metrics()
KafkaProducer es un productor de mensajes asincrónicos de alto nivel. La clase está destinada a funcionar de la forma más similar posible al cliente Java oficial. Consulte <https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html> para obtener más detalles.
>>> from kafka import KafkaProducer
>>> producer = KafkaProducer( bootstrap_servers = ' localhost:1234 ' )
>>> for _ in range ( 100 ):
... producer.send( ' foobar ' , b ' some_message_bytes ' )
>>> # Block until a single message is sent (or timeout)
>>> future = producer.send( ' foobar ' , b ' another_message ' )
>>> result = future.get( timeout = 60 )
>>> # Block until all pending messages are at least put on the network
>>> # NOTE : This does not guarantee delivery or success! It is really
>>> # only useful if you configure internal batching using linger_ms
>>> producer.flush()
>>> # Use a key for hashed-partitioning
>>> producer.send( ' foobar ' , key = b ' foo ' , value = b ' bar ' )
>>> # Serialize json messages
>>> import json
>>> producer = KafkaProducer( value_serializer = lambda v : json.dumps(v).encode( ' utf-8 ' ))
>>> producer.send( ' fizzbuzz ' , { ' foo ' : ' bar ' })
>>> # Serialize string keys
>>> producer = KafkaProducer( key_serializer = str .encode)
>>> producer.send( ' flipflap ' , key = ' ping ' , value = b ' 1234 ' )
>>> # Compress messages
>>> producer = KafkaProducer( compression_type = ' gzip ' )
>>> for i in range ( 1000 ):
... producer.send( ' foobar ' , b ' msg %d ' % i)
>>> # Include record headers. The format is list of tuples with string key
>>> # and bytes value.
>>> producer.send( ' foobar ' , value = b ' c29tZSB2YWx1ZQ== ' , headers = [( ' content-encoding ' , b ' base64 ' )])
>>> # Get producer performance metrics
>>> metrics = producer.metrics()
KafkaProducer se puede utilizar en subprocesos sin problemas, a diferencia de KafkaConsumer, que no puede.
Si bien es posible utilizar KafkaConsumer de manera local, se recomienda el multiprocesamiento.
kafka-python admite los siguientes formatos de compresión:
gzip es compatible de forma nativa, los demás requieren la instalación de bibliotecas adicionales. Consulte <https://kafka-python.readthedocs.io/en/master/install.html> para obtener más información.
Kafka utiliza sumas de verificación CRC32 para validar mensajes. kafka-python incluye una implementación pura de Python para mayor compatibilidad. Para mejorar el rendimiento de las aplicaciones de alto rendimiento, kafka-python utilizará crc32c para el código nativo optimizado si está instalado. Consulte <https://kafka-python.readthedocs.io/en/master/install.html> para obtener instrucciones de instalación. Consulte https://pypi.org/project/crc32c/ para obtener detalles sobre la biblioteca crc32c subyacente.
Un objetivo secundario de kafka-python es proporcionar una capa de protocolo fácil de usar para interactuar con los corredores de Kafka a través de la respuesta de Python. Esto es útil para pruebas, sondeos y experimentación general. El soporte del protocolo se aprovecha para habilitar un método KafkaClient.check_version() que sondea un broker Kafka e intenta identificar qué versión está ejecutando (0.8.0 a 2.6+).