KARENA MASALAH DENGAN RILIS, DISARANKAN UNTUK MENGGUNAKAN https://github.com/wbarnha/kafka-python-ng UNTUK SAAT INI
Klien Python untuk sistem pemrosesan aliran terdistribusi Apache Kafka. kafka-python dirancang untuk berfungsi seperti klien java resmi, dengan tambahan antarmuka pythonic (misalnya, iterator konsumen).
kafka-python paling baik digunakan dengan broker baru (0.9+), tetapi kompatibel dengan versi lama (hingga 0.8.0). Beberapa fitur hanya akan diaktifkan pada broker baru. Misalnya, grup konsumen yang terkoordinasi sepenuhnya -- yaitu, penetapan partisi dinamis ke beberapa konsumen dalam grup yang sama -- memerlukan penggunaan 0,9+ broker kafka. Mendukung fitur ini untuk rilis broker sebelumnya memerlukan penulisan dan pemeliharaan pemilihan kepemimpinan khusus dan kode keanggotaan/pemeriksaan kesehatan (mungkin menggunakan penjaga kebun binatang atau konsul). Untuk broker lama, Anda dapat mencapai hal serupa dengan secara manual menetapkan partisi berbeda ke setiap instance konsumen dengan alat manajemen konfigurasi seperti chef, ansible, dll. Pendekatan ini akan berfungsi dengan baik, meskipun tidak mendukung penyeimbangan ulang jika terjadi kegagalan. Lihat <https://kafka-python.readthedocs.io/en/master/compatibility.html> untuk detail selengkapnya.
Harap dicatat bahwa cabang master mungkin berisi fitur yang belum dirilis. Untuk dokumentasi rilis, silakan lihat bantuan inline readthedocs dan/atau python.
>>> pip install kafka - python
KafkaConsumer adalah konsumen pesan tingkat tinggi, yang dimaksudkan untuk beroperasi semirip mungkin dengan klien java resmi. Dukungan penuh untuk kelompok konsumen yang terkoordinasi memerlukan penggunaan broker kafka yang mendukung API Grup: kafka v0.9+.
Lihat <https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html> untuk detail API dan konfigurasi.
Iterator konsumen mengembalikan ConsumerRecords, yang merupakan tupel bernama sederhana yang mengekspos atribut pesan dasar: topik, partisi, offset, kunci, dan nilai:
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer( ' my_favorite_topic ' )
>>> for msg in consumer:
... print (msg)
>>> # join a consumer group for dynamic partition assignment and offset commits
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer( ' my_favorite_topic ' , group_id = ' my_favorite_group ' )
>>> for msg in consumer:
... print (msg)
>>> # manually assign the partition list for the consumer
>>> from kafka import TopicPartition
>>> consumer = KafkaConsumer( bootstrap_servers = ' localhost:1234 ' )
>>> consumer.assign([TopicPartition( ' foobar ' , 2 )])
>>> msg = next (consumer)
>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer( value_deserializer = msgpack.loads)
>>> consumer.subscribe([ ' msgpackfoo ' ])
>>> for msg in consumer:
... assert isinstance (msg.value, dict )
>>> # Access record headers. The returned value is a list of tuples
>>> # with str, bytes for key and value
>>> for msg in consumer:
... print (msg.headers)
>>> # Get consumer metrics
>>> metrics = consumer.metrics()
KafkaProducer adalah produser pesan asinkron tingkat tinggi. Kelas ini dimaksudkan untuk beroperasi semirip mungkin dengan klien java resmi. Lihat <https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html> untuk detail selengkapnya.
>>> from kafka import KafkaProducer
>>> producer = KafkaProducer( bootstrap_servers = ' localhost:1234 ' )
>>> for _ in range ( 100 ):
... producer.send( ' foobar ' , b ' some_message_bytes ' )
>>> # Block until a single message is sent (or timeout)
>>> future = producer.send( ' foobar ' , b ' another_message ' )
>>> result = future.get( timeout = 60 )
>>> # Block until all pending messages are at least put on the network
>>> # NOTE : This does not guarantee delivery or success! It is really
>>> # only useful if you configure internal batching using linger_ms
>>> producer.flush()
>>> # Use a key for hashed-partitioning
>>> producer.send( ' foobar ' , key = b ' foo ' , value = b ' bar ' )
>>> # Serialize json messages
>>> import json
>>> producer = KafkaProducer( value_serializer = lambda v : json.dumps(v).encode( ' utf-8 ' ))
>>> producer.send( ' fizzbuzz ' , { ' foo ' : ' bar ' })
>>> # Serialize string keys
>>> producer = KafkaProducer( key_serializer = str .encode)
>>> producer.send( ' flipflap ' , key = ' ping ' , value = b ' 1234 ' )
>>> # Compress messages
>>> producer = KafkaProducer( compression_type = ' gzip ' )
>>> for i in range ( 1000 ):
... producer.send( ' foobar ' , b ' msg %d ' % i)
>>> # Include record headers. The format is list of tuples with string key
>>> # and bytes value.
>>> producer.send( ' foobar ' , value = b ' c29tZSB2YWx1ZQ== ' , headers = [( ' content-encoding ' , b ' base64 ' )])
>>> # Get producer performance metrics
>>> metrics = producer.metrics()
KafkaProducer dapat digunakan di seluruh thread tanpa masalah, tidak seperti KafkaConsumer yang tidak bisa.
Meskipun KafkaConsumer dapat digunakan dengan cara thread-lokal, disarankan untuk melakukan multiproses.
kafka-python mendukung format kompresi berikut:
gzip didukung secara asli, yang lain memerlukan instalasi perpustakaan tambahan. Lihat <https://kafka-python.readthedocs.io/en/master/install.html> untuk informasi selengkapnya.
Kafka menggunakan checksum CRC32 untuk memvalidasi pesan. kafka-python menyertakan implementasi python murni untuk kompatibilitas. Untuk meningkatkan kinerja aplikasi dengan throughput tinggi, kafka-python akan menggunakan crc32c untuk kode asli yang dioptimalkan jika diinstal. Lihat <https://kafka-python.readthedocs.io/en/master/install.html> untuk petunjuk instalasi. Lihat https://pypi.org/project/crc32c/ untuk detail tentang lib crc32c yang mendasarinya.
Tujuan kedua kafka-python adalah menyediakan lapisan protokol yang mudah digunakan untuk berinteraksi dengan broker kafka melalui python repl. Ini berguna untuk pengujian, penyelidikan, dan eksperimen umum. Dukungan protokol dimanfaatkan untuk mengaktifkan metode KafkaClient.check_version() yang menyelidiki broker kafka dan mencoba mengidentifikasi versi mana yang dijalankan (0.8.0 hingga 2.6+).