由于发布问题,建议暂时使用 https://github.com/wbarnha/kafka-python-ng
Apache Kafka 分布式流处理系统的 Python 客户端。 kafka-python 的设计功能与官方 java 客户端非常相似,带有一些 pythonic 接口(例如,消费者迭代器)。
kafka-python 最适合与较新的代理(0.9+)一起使用,但向后兼容旧版本(至 0.8.0)。某些功能仅在较新的经纪商上启用。例如,完全协调的消费者组——即动态分区分配给同一组中的多个消费者——需要使用0.9+的kafka代理。要为早期代理版本支持此功能,需要编写和维护自定义领导选举和成员/健康检查代码(可能使用 Zookeeper 或 consul)。对于较旧的代理,您可以通过使用 Chef、ansible 等配置管理工具手动为每个消费者实例分配不同的分区来实现类似的效果。这种方法可以很好地工作,尽管它不支持故障时的重新平衡。有关更多详细信息,请参阅 <https://kafka-python.readthedocs.io/en/master/compatibility.html>。
请注意,主分支可能包含未发布的功能。有关发布文档,请参阅 readthedocs 和/或 python 的内联帮助。
>>> pip install kafka - python
KafkaConsumer 是一个高级消息消费者,旨在尽可能与官方 java 客户端操作类似。对协调消费者组的全面支持需要使用支持组 API 的 kafka 代理:kafka v0.9+。
有关 API 和配置详细信息,请参阅 <https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html>。
消费者迭代器返回 ConsumerRecords,它们是公开基本消息属性的简单命名元组:主题、分区、偏移量、键和值:
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer( ' my_favorite_topic ' )
>>> for msg in consumer:
... print (msg)
>>> # join a consumer group for dynamic partition assignment and offset commits
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer( ' my_favorite_topic ' , group_id = ' my_favorite_group ' )
>>> for msg in consumer:
... print (msg)
>>> # manually assign the partition list for the consumer
>>> from kafka import TopicPartition
>>> consumer = KafkaConsumer( bootstrap_servers = ' localhost:1234 ' )
>>> consumer.assign([TopicPartition( ' foobar ' , 2 )])
>>> msg = next (consumer)
>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer( value_deserializer = msgpack.loads)
>>> consumer.subscribe([ ' msgpackfoo ' ])
>>> for msg in consumer:
... assert isinstance (msg.value, dict )
>>> # Access record headers. The returned value is a list of tuples
>>> # with str, bytes for key and value
>>> for msg in consumer:
... print (msg.headers)
>>> # Get consumer metrics
>>> metrics = consumer.metrics()
KafkaProducer 是一个高级异步消息生产者。该类的运行方式与官方 java 客户端尽可能相似。有关更多详细信息,请参阅 <https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html>。
>>> from kafka import KafkaProducer
>>> producer = KafkaProducer( bootstrap_servers = ' localhost:1234 ' )
>>> for _ in range ( 100 ):
... producer.send( ' foobar ' , b ' some_message_bytes ' )
>>> # Block until a single message is sent (or timeout)
>>> future = producer.send( ' foobar ' , b ' another_message ' )
>>> result = future.get( timeout = 60 )
>>> # Block until all pending messages are at least put on the network
>>> # NOTE : This does not guarantee delivery or success! It is really
>>> # only useful if you configure internal batching using linger_ms
>>> producer.flush()
>>> # Use a key for hashed-partitioning
>>> producer.send( ' foobar ' , key = b ' foo ' , value = b ' bar ' )
>>> # Serialize json messages
>>> import json
>>> producer = KafkaProducer( value_serializer = lambda v : json.dumps(v).encode( ' utf-8 ' ))
>>> producer.send( ' fizzbuzz ' , { ' foo ' : ' bar ' })
>>> # Serialize string keys
>>> producer = KafkaProducer( key_serializer = str .encode)
>>> producer.send( ' flipflap ' , key = ' ping ' , value = b ' 1234 ' )
>>> # Compress messages
>>> producer = KafkaProducer( compression_type = ' gzip ' )
>>> for i in range ( 1000 ):
... producer.send( ' foobar ' , b ' msg %d ' % i)
>>> # Include record headers. The format is list of tuples with string key
>>> # and bytes value.
>>> producer.send( ' foobar ' , value = b ' c29tZSB2YWx1ZQ== ' , headers = [( ' content-encoding ' , b ' base64 ' )])
>>> # Get producer performance metrics
>>> metrics = producer.metrics()
KafkaProducer 可以毫无问题地跨线程使用,而 KafkaConsumer 则不能。
虽然可以以线程本地方式使用 KafkaConsumer,但建议使用多处理。
kafka-python 支持以下压缩格式:
gzip 是原生支持的,其他的需要安装额外的库。有关更多信息,请参阅 <https://kafka-python.readthedocs.io/en/master/install.html>。
Kafka 使用 CRC32 校验和来验证消息。 kafka-python 包含一个纯 python 实现以实现兼容性。为了提高高吞吐量应用程序的性能,kafka-python 将使用 crc32c 来优化本机代码(如果已安装)。有关安装说明,请参阅 <https://kafka-python.readthedocs.io/en/master/install.html>。有关底层 crc32c 库的详细信息,请参阅 https://pypi.org/project/crc32c/。
kafka-python 的第二个目标是提供一个易于使用的协议层,用于通过 python repl 与 kafka 代理进行交互。这对于测试、探测和一般实验很有用。利用协议支持来启用 KafkaClient.check_version() 方法,该方法探测 kafka 代理并尝试识别它正在运行的版本(0.8.0 到 2.6+)。