由於發布問題,建議暫時使用 https://github.com/wbarnha/kafka-python-ng
Apache Kafka 分散式串流處理系統的 Python 用戶端。 kafka-python 的設計功能與官方 java 用戶端非常相似,並帶有一些 pythonic 介面(例如,消費者迭代器)。
kafka-python 最適合與較新的代理(0.9+)一起使用,但向後相容舊版本(至 0.8.0)。某些功能僅在較新的經紀商上啟用。例如,完全協調的消費者群組——即動態分區分配給同一組中的多個消費者——需要使用0.9+的kafka代理程式。要為早期代理版本支援此功能,需要編寫和維護自訂領導選舉和成員/健康檢查程式碼(可能使用 Zookeeper 或 consul)。對於較舊的代理,您可以透過使用Chef、ansible 等組態管理工具手動為每個消費者實例分配不同的分區來實現類似的效果。重新平衡。有關更多詳細信息,請參閱 <https://kafka-python.readthedocs.io/en/master/compatibility.html>。
請注意,主分支可能包含未發布的功能。有關發布文檔,請參閱 readthedocs 和/或 python 的內聯說明。
>>> pip install kafka - python
KafkaConsumer 是一個高級訊息消費者,旨在盡可能與官方 java 用戶端操作類似。對協調消費者組的全面支援需要使用支援組 API 的 kafka 代理:kafka v0.9+。
有關 API 和配置詳細信息,請參閱 <https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html>。
消費者迭代器傳回 ConsumerRecords,它們是公開基本訊息屬性的簡單命名元組:主題、分割區、偏移量、鍵和值:
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer( ' my_favorite_topic ' )
>>> for msg in consumer:
... print (msg)
>>> # join a consumer group for dynamic partition assignment and offset commits
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer( ' my_favorite_topic ' , group_id = ' my_favorite_group ' )
>>> for msg in consumer:
... print (msg)
>>> # manually assign the partition list for the consumer
>>> from kafka import TopicPartition
>>> consumer = KafkaConsumer( bootstrap_servers = ' localhost:1234 ' )
>>> consumer.assign([TopicPartition( ' foobar ' , 2 )])
>>> msg = next (consumer)
>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer( value_deserializer = msgpack.loads)
>>> consumer.subscribe([ ' msgpackfoo ' ])
>>> for msg in consumer:
... assert isinstance (msg.value, dict )
>>> # Access record headers. The returned value is a list of tuples
>>> # with str, bytes for key and value
>>> for msg in consumer:
... print (msg.headers)
>>> # Get consumer metrics
>>> metrics = consumer.metrics()
KafkaProducer 是一個高級非同步訊息生產者。該類別的運作方式與官方 java 用戶端盡可能相似。有關更多詳細信息,請參閱 <https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html>。
>>> from kafka import KafkaProducer
>>> producer = KafkaProducer( bootstrap_servers = ' localhost:1234 ' )
>>> for _ in range ( 100 ):
... producer.send( ' foobar ' , b ' some_message_bytes ' )
>>> # Block until a single message is sent (or timeout)
>>> future = producer.send( ' foobar ' , b ' another_message ' )
>>> result = future.get( timeout = 60 )
>>> # Block until all pending messages are at least put on the network
>>> # NOTE : This does not guarantee delivery or success! It is really
>>> # only useful if you configure internal batching using linger_ms
>>> producer.flush()
>>> # Use a key for hashed-partitioning
>>> producer.send( ' foobar ' , key = b ' foo ' , value = b ' bar ' )
>>> # Serialize json messages
>>> import json
>>> producer = KafkaProducer( value_serializer = lambda v : json.dumps(v).encode( ' utf-8 ' ))
>>> producer.send( ' fizzbuzz ' , { ' foo ' : ' bar ' })
>>> # Serialize string keys
>>> producer = KafkaProducer( key_serializer = str .encode)
>>> producer.send( ' flipflap ' , key = ' ping ' , value = b ' 1234 ' )
>>> # Compress messages
>>> producer = KafkaProducer( compression_type = ' gzip ' )
>>> for i in range ( 1000 ):
... producer.send( ' foobar ' , b ' msg %d ' % i)
>>> # Include record headers. The format is list of tuples with string key
>>> # and bytes value.
>>> producer.send( ' foobar ' , value = b ' c29tZSB2YWx1ZQ== ' , headers = [( ' content-encoding ' , b ' base64 ' )])
>>> # Get producer performance metrics
>>> metrics = producer.metrics()
KafkaProducer 可以毫無問題地跨線程使用,而 KafkaConsumer 則不能。
雖然可以以線程本地方式使用 KafkaConsumer,但建議使用多處理。
kafka-python 支援以下壓縮格式:
gzip 是原生支援的,其他的需要安裝額外的函式庫。有關更多信息,請參閱 <https://kafka-python.readthedocs.io/en/master/install.html>。
Kafka 使用 CRC32 校驗和來驗證訊息。 kafka-python 包含一個純 python 實作以實現相容性。為了提高高吞吐量應用程式的效能,kafka-python 將使用 crc32c 來優化本機程式碼(如果已安裝)。有關安裝說明,請參閱 <https://kafka-python.readthedocs.io/en/master/install.html>。有關底層 crc32c 庫的詳細信息,請參閱 https://pypi.org/project/crc32c/。
kafka-python 的第二個目標是提供一個易於使用的協定層,用於透過 python repl 與 kafka 代理進行互動。這對於測試、探測和一般實驗很有用。利用協定支援來啟用 KafkaClient.check_version() 方法,該方法探測 kafka 代理並嘗試識別它正在運行的版本(0.8.0 到 2.6+)。