릴리스 관련 문제로 인해 당분간 https://github.com/wbarnha/kafka-python-ng를 사용하는 것이 좋습니다.
Apache Kafka 분산 스트림 처리 시스템용 Python 클라이언트입니다. kafka-python은 Python 인터페이스(예: 소비자 반복자)를 추가하여 공식 Java 클라이언트와 매우 유사하게 작동하도록 설계되었습니다.
kafka-python은 최신 브로커(0.9+)와 함께 사용하는 것이 가장 좋지만 이전 버전(0.8.0까지)과도 역호환됩니다. 일부 기능은 최신 브로커에서만 활성화됩니다. 예를 들어 완전히 조정된 소비자 그룹(예: 동일한 그룹의 여러 소비자에 대한 동적 파티션 할당)에는 0.9+ kafka 브로커를 사용해야 합니다. 이전 브로커 릴리스에 대해 이 기능을 지원하려면 사용자 정의 리더십 선출 및 멤버십/상태 확인 코드(아마도 사육사 또는 영사 사용)를 작성하고 유지 관리해야 합니다. 이전 브로커의 경우 Chef, Ansible 등과 같은 구성 관리 도구를 사용하여 각 소비자 인스턴스에 서로 다른 파티션을 수동으로 할당하여 유사한 결과를 얻을 수 있습니다. 이 접근 방식은 제대로 작동하지만 오류 시 재조정을 지원하지 않습니다. 자세한 내용은 <https://kafka-python.readthedocs.io/en/master/compatibility.html>을 참조하세요.
마스터 브랜치에는 아직 출시되지 않은 기능이 포함될 수 있습니다. 릴리스 문서는 readthedocs 및/또는 Python의 인라인 도움말을 참조하세요.
>>> pip install kafka - python
KafkaConsumer는 공식 Java 클라이언트와 최대한 유사하게 작동하도록 고안된 고급 메시지 소비자입니다. 조정된 소비자 그룹을 완벽하게 지원하려면 그룹 API(kafka v0.9+)를 지원하는 kafka 브로커를 사용해야 합니다.
API 및 구성 세부정보는 <https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html>을 참조하세요.
소비자 반복자는 주제, 파티션, 오프셋, 키 및 값과 같은 기본 메시지 속성을 노출하는 간단한 명명된 튜플인 ConsumerRecords를 반환합니다.
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer( ' my_favorite_topic ' )
>>> for msg in consumer:
... print (msg)
>>> # join a consumer group for dynamic partition assignment and offset commits
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer( ' my_favorite_topic ' , group_id = ' my_favorite_group ' )
>>> for msg in consumer:
... print (msg)
>>> # manually assign the partition list for the consumer
>>> from kafka import TopicPartition
>>> consumer = KafkaConsumer( bootstrap_servers = ' localhost:1234 ' )
>>> consumer.assign([TopicPartition( ' foobar ' , 2 )])
>>> msg = next (consumer)
>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer( value_deserializer = msgpack.loads)
>>> consumer.subscribe([ ' msgpackfoo ' ])
>>> for msg in consumer:
... assert isinstance (msg.value, dict )
>>> # Access record headers. The returned value is a list of tuples
>>> # with str, bytes for key and value
>>> for msg in consumer:
... print (msg.headers)
>>> # Get consumer metrics
>>> metrics = consumer.metrics()
KafkaProducer는 높은 수준의 비동기 메시지 생성자입니다. 이 클래스는 공식 Java 클라이언트와 최대한 유사하게 작동하도록 만들어졌습니다. 자세한 내용은 <https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html>을 참조하세요.
>>> from kafka import KafkaProducer
>>> producer = KafkaProducer( bootstrap_servers = ' localhost:1234 ' )
>>> for _ in range ( 100 ):
... producer.send( ' foobar ' , b ' some_message_bytes ' )
>>> # Block until a single message is sent (or timeout)
>>> future = producer.send( ' foobar ' , b ' another_message ' )
>>> result = future.get( timeout = 60 )
>>> # Block until all pending messages are at least put on the network
>>> # NOTE : This does not guarantee delivery or success! It is really
>>> # only useful if you configure internal batching using linger_ms
>>> producer.flush()
>>> # Use a key for hashed-partitioning
>>> producer.send( ' foobar ' , key = b ' foo ' , value = b ' bar ' )
>>> # Serialize json messages
>>> import json
>>> producer = KafkaProducer( value_serializer = lambda v : json.dumps(v).encode( ' utf-8 ' ))
>>> producer.send( ' fizzbuzz ' , { ' foo ' : ' bar ' })
>>> # Serialize string keys
>>> producer = KafkaProducer( key_serializer = str .encode)
>>> producer.send( ' flipflap ' , key = ' ping ' , value = b ' 1234 ' )
>>> # Compress messages
>>> producer = KafkaProducer( compression_type = ' gzip ' )
>>> for i in range ( 1000 ):
... producer.send( ' foobar ' , b ' msg %d ' % i)
>>> # Include record headers. The format is list of tuples with string key
>>> # and bytes value.
>>> producer.send( ' foobar ' , value = b ' c29tZSB2YWx1ZQ== ' , headers = [( ' content-encoding ' , b ' base64 ' )])
>>> # Get producer performance metrics
>>> metrics = producer.metrics()
KafkaProducer는 KafkaConsumer와 달리 문제 없이 스레드 간에 사용할 수 있습니다.
스레드 로컬 방식으로 KafkaConsumer를 사용할 수 있지만 다중 처리가 권장됩니다.
kafka-python은 다음 압축 형식을 지원합니다:
gzip은 기본적으로 지원되지만 나머지는 추가 라이브러리를 설치해야 합니다. 자세한 내용은 <https://kafka-python.readthedocs.io/en/master/install.html>을 참조하세요.
Kafka는 CRC32 체크섬을 사용하여 메시지의 유효성을 검사합니다. kafka-python에는 호환성을 위해 순수 Python 구현이 포함되어 있습니다. 처리량이 많은 애플리케이션의 성능을 향상시키기 위해 kafka-python은 설치된 경우 최적화된 네이티브 코드에 crc32c를 사용합니다. 설치 지침은 <https://kafka-python.readthedocs.io/en/master/install.html>을 참조하세요. 기본 crc32c lib에 대한 자세한 내용은 https://pypi.org/project/crc32c/를 참조하세요.
kafka-python의 두 번째 목표는 Python repl을 통해 kafka 브로커와 상호 작용하기 위한 사용하기 쉬운 프로토콜 계층을 제공하는 것입니다. 이는 테스트, 프로빙 및 일반 실험에 유용합니다. 프로토콜 지원을 활용하여 kafka 브로커를 조사하고 실행 중인 버전(0.8.0 ~ 2.6+)을 식별하려고 시도하는 KafkaClient.check_version() 메서드를 활성화합니다.