リリースに問題があるため、当面は https://github.com/wbarnha/kafka-python-ng を使用することをお勧めします。
Apache Kafka 分散ストリーム処理システム用の Python クライアント。 kafka-python は、Python インターフェイス (コンシューマ イテレータなど) を散りばめて、公式の Java クライアントとほぼ同じように機能するように設計されています。
kafka-python は新しいブローカー (0.9 以降) で使用するのが最適ですが、古いバージョン (0.8.0 まで) との下位互換性があります。一部の機能は、新しいブローカーでのみ有効になります。たとえば、完全に調整されたコンシューマ グループ、つまり、同じグループ内の複数のコンシューマへの動的パーティション割り当てには、0.9 以上の Kafka ブローカの使用が必要です。以前のブローカー リリースでこの機能をサポートするには、カスタムのリーダーシップ選出およびメンバーシップ / ヘルス チェック コード (おそらく Zookeeper または Consul を使用) を作成して維持する必要があります。古いブローカーの場合は、シェフ、アンシブルなどの構成管理ツールを使用して、各コンシューマ インスタンスに異なるパーティションを手動で割り当てることで、同様のことを実現できます。このアプローチは問題なく機能しますが、障害時のリバランスはサポートされていません。詳細については、<https://kafka-python.readthedocs.io/en/master/compatibility.html> を参照してください。
master ブランチには未リリースの機能が含まれている可能性があることに注意してください。リリース ドキュメントについては、readthedocs および/または Python のインライン ヘルプを参照してください。
>>> pip install kafka - python
KafkaConsumer は高レベルのメッセージ コンシューマーであり、公式の Java クライアントとできるだけ同じように動作することを目的としています。調整されたコンシューマ グループを完全にサポートするには、グループ API をサポートする kafka ブローカー (kafka v0.9+) を使用する必要があります。
API と設定の詳細については、<https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html> を参照してください。
コンシューマ反復子は ConsumerRecords を返します。これは、基本的なメッセージ属性 (トピック、パーティション、オフセット、キー、値) を公開する単純な名前付きタプルです。
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer( ' my_favorite_topic ' )
>>> for msg in consumer:
... print (msg)
>>> # join a consumer group for dynamic partition assignment and offset commits
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer( ' my_favorite_topic ' , group_id = ' my_favorite_group ' )
>>> for msg in consumer:
... print (msg)
>>> # manually assign the partition list for the consumer
>>> from kafka import TopicPartition
>>> consumer = KafkaConsumer( bootstrap_servers = ' localhost:1234 ' )
>>> consumer.assign([TopicPartition( ' foobar ' , 2 )])
>>> msg = next (consumer)
>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer( value_deserializer = msgpack.loads)
>>> consumer.subscribe([ ' msgpackfoo ' ])
>>> for msg in consumer:
... assert isinstance (msg.value, dict )
>>> # Access record headers. The returned value is a list of tuples
>>> # with str, bytes for key and value
>>> for msg in consumer:
... print (msg.headers)
>>> # Get consumer metrics
>>> metrics = consumer.metrics()
KafkaProducer は、高レベルの非同期メッセージ プロデューサーです。このクラスは、公式の Java クライアントとできるだけ同じように動作することを目的としています。詳細については、<https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html> を参照してください。
>>> from kafka import KafkaProducer
>>> producer = KafkaProducer( bootstrap_servers = ' localhost:1234 ' )
>>> for _ in range ( 100 ):
... producer.send( ' foobar ' , b ' some_message_bytes ' )
>>> # Block until a single message is sent (or timeout)
>>> future = producer.send( ' foobar ' , b ' another_message ' )
>>> result = future.get( timeout = 60 )
>>> # Block until all pending messages are at least put on the network
>>> # NOTE : This does not guarantee delivery or success! It is really
>>> # only useful if you configure internal batching using linger_ms
>>> producer.flush()
>>> # Use a key for hashed-partitioning
>>> producer.send( ' foobar ' , key = b ' foo ' , value = b ' bar ' )
>>> # Serialize json messages
>>> import json
>>> producer = KafkaProducer( value_serializer = lambda v : json.dumps(v).encode( ' utf-8 ' ))
>>> producer.send( ' fizzbuzz ' , { ' foo ' : ' bar ' })
>>> # Serialize string keys
>>> producer = KafkaProducer( key_serializer = str .encode)
>>> producer.send( ' flipflap ' , key = ' ping ' , value = b ' 1234 ' )
>>> # Compress messages
>>> producer = KafkaProducer( compression_type = ' gzip ' )
>>> for i in range ( 1000 ):
... producer.send( ' foobar ' , b ' msg %d ' % i)
>>> # Include record headers. The format is list of tuples with string key
>>> # and bytes value.
>>> producer.send( ' foobar ' , value = b ' c29tZSB2YWx1ZQ== ' , headers = [( ' content-encoding ' , b ' base64 ' )])
>>> # Get producer performance metrics
>>> metrics = producer.metrics()
KafkaProducer はスレッド間で問題なく使用できますが、KafkaConsumer は使用できません。
KafkaConsumer をスレッドローカルな方法で使用することは可能ですが、マルチプロセッシングをお勧めします。
kafka-python は次の圧縮形式をサポートしています。
gzip はネイティブでサポートされていますが、その他の場合は追加のライブラリをインストールする必要があります。詳細については、<https://kafka-python.readthedocs.io/en/master/install.html> を参照してください。
Kafka は CRC32 チェックサムを使用してメッセージを検証します。 kafka-python には、互換性のために純粋な Python 実装が含まれています。高スループットのアプリケーションのパフォーマンスを向上させるために、kafka-python がインストールされている場合は、最適化されたネイティブ コードに crc32c を使用します。インストール手順については、<https://kafka-python.readthedocs.io/en/master/install.html> を参照してください。基礎となる crc32c ライブラリの詳細については、https://pypi.org/project/crc32c/ を参照してください。
kafka-python の 2 番目の目標は、Python repl を介して Kafka ブローカーと対話するための使いやすいプロトコル層を提供することです。これは、テスト、調査、および一般的な実験に役立ちます。プロトコル サポートは、Kafka ブローカーを調査し、実行されているバージョン (0.8.0 から 2.6 以降) を識別しようとする KafkaClient.check_version() メソッドを有効にするために利用されます。