Pustaka ini sudah tidak digunakan lagi dan tidak lagi dikelola atau didukung. Proyek komunitas yang aktif saat ini dapat ditemukan di https://github.com/faust-streaming/faust
Versi: | 1.10.4 |
---|---|
jaringan: | http://faust.readthedocs.io/ |
Unduh: | http://pypi.org/project/faust |
Sumber: | http://github.com/robinhood/faust |
Kata kunci: | didistribusikan, streaming, async, pemrosesan, data, antrian, manajemen negara |
# Python Streams
# Forever scalable event processing & in-memory durable K/V store;
# as a library w/ asyncio & static typing.
import faust
Faust adalah perpustakaan pemrosesan aliran, memindahkan ide dari Kafka Streams ke Python.
Ini digunakan di Robinhood untuk membangun sistem terdistribusi berkinerja tinggi dan saluran data real-time yang memproses miliaran peristiwa setiap hari.
Faust menyediakan pemrosesan aliran dan pemrosesan peristiwa , berbagi kesamaan dengan alat seperti Kafka Streams, Apache Spark/Storm/Samza/Flink,
Itu tidak menggunakan DSL, itu hanya Python! Ini berarti Anda dapat menggunakan semua pustaka Python favorit Anda saat pemrosesan streaming: NumPy, PyTorch, Pandas, NLTK, Django, Flask, SQLAlchemy, ++
Faust memerlukan Python 3.6 atau lebih baru untuk sintaks async/await baru, dan anotasi tipe variabel.
Berikut ini contoh pemrosesan aliran pesanan masuk:
app = faust . App ( 'myapp' , broker = 'kafka://localhost' )
# Models describe how messages are serialized:
# {"account_id": "3fae-...", amount": 3}
class Order ( faust . Record ):
account_id : str
amount : int
@ app . agent ( value_type = Order )
async def order ( orders ):
async for order in orders :
# process infinite stream of orders.
print ( f'Order for { order . account_id } : { order . amount } ' )
Dekorator Agen mendefinisikan "pemroses aliran" yang pada dasarnya menggunakan topik Kafka dan melakukan sesuatu untuk setiap peristiwa yang diterimanya.
Agen adalah fungsi async def
, sehingga juga dapat melakukan operasi lain secara asinkron, seperti permintaan web.
Sistem ini dapat mempertahankan keadaan, bertindak seperti database. Tabel diberi nama penyimpanan kunci/nilai terdistribusi yang dapat Anda gunakan sebagai kamus Python biasa.
Tabel disimpan secara lokal di setiap mesin menggunakan database tertanam super cepat yang ditulis dalam C++, yang disebut RocksDB.
Tabel juga dapat menyimpan jumlah agregat yang bersifat opsional "berjendela" sehingga Anda dapat melacak "jumlah klik dari hari terakhir", atau "jumlah klik dalam satu jam terakhir". Misalnya. Seperti Kafka Streams, kami mendukung jendela waktu yang berjatuhan, melompat, dan menggeser, dan jendela lama dapat kedaluwarsa untuk menghentikan pengisian data.
Untuk keandalan, kami menggunakan topik Kafka sebagai "log tulis di depan". Setiap kali kunci diubah, kami mempublikasikannya ke log perubahan. Node siaga menggunakan log perubahan ini untuk menyimpan replika data yang tepat dan memungkinkan pemulihan instan jika salah satu node gagal.
Bagi pengguna, tabel hanyalah sebuah kamus, namun data tetap ada di antara restart dan direplikasi di seluruh node sehingga ketika failover, node lain dapat mengambil alih secara otomatis.
Anda dapat menghitung tampilan halaman berdasarkan URL:
# data sent to 'clicks' topic sharded by URL key.
# e.g. key="http://example.com" value="1"
click_topic = app . topic ( 'clicks' , key_type = str , value_type = int )
# default value for missing URL will be 0 with `default=int`
counts = app . Table ( 'click_counts' , default = int )
@ app . agent ( click_topic )
async def count_click ( clicks ):
async for url , count in clicks . items ():
counts [ url ] += count
Data yang dikirim ke topik Kafka dipartisi, yang berarti klik akan dibagi berdasarkan URL sedemikian rupa sehingga setiap penghitungan untuk URL yang sama akan dikirimkan ke instance pekerja Faust yang sama.
Faust mendukung semua jenis data aliran: byte, Unicode, dan struktur serial, tetapi juga dilengkapi dengan "Model" yang menggunakan sintaksis Python modern untuk menjelaskan bagaimana kunci dan nilai dalam aliran diserialkan:
# Order is a json serialized dictionary,
# having these fields:
class Order ( faust . Record ):
account_id : str
product_id : str
price : float
quantity : float = 1.0
orders_topic = app . topic ( 'orders' , key_type = str , value_type = Order )
@ app . agent ( orders_topic )
async def process_order ( orders ):
async for order in orders :
# process each order using regular Python
total_price = order . price * order . quantity
await send_order_received_email ( order . account_id , order )
Faust diketik secara statis, menggunakan pemeriksa tipe mypy
, sehingga Anda dapat memanfaatkan tipe statis saat menulis aplikasi.
Kode sumber Faust berukuran kecil, terorganisir dengan baik, dan berfungsi sebagai sumber yang baik untuk mempelajari implementasi Kafka Streams.
Faust sangat mudah digunakan. Untuk mulai menggunakan solusi pemrosesan aliran lainnya, Anda memiliki proyek dan persyaratan infrastruktur yang rumit. Faust hanya membutuhkan Kafka, selebihnya hanya Python, jadi jika Anda tahu Python, Anda sudah bisa menggunakan Faust untuk melakukan pemrosesan streaming, dan bisa berintegrasi dengan apa saja.
Berikut salah satu aplikasi mudah yang dapat Anda buat:
impor faust salam kelas (faust.Record): dari nama_: str ke_nama: str app = faust.App('hello-app', broker='kafka://localhost') topik = app.topic('halo-topik', value_type=Salam) @app.agent(topik) async def halo(salam): async untuk salam dalam salam: print(f'Halo dari {greeting.from_name} hingga {greeting.to_name}') @aplikasi.timer(interval=1.0) async def example_sender(aplikasi): tunggu halo.kirim( nilai=Salam(dari_nama='Faust', ke_nama='kamu'), ) jika __nama__ == '__utama__': aplikasi.utama()
Anda mungkin sedikit terintimidasi oleh kata kunci async dan menunggu, tetapi Anda tidak perlu tahu cara kerja asyncio
untuk menggunakan Faust: cukup tiru contohnya, dan Anda akan baik-baik saja.
Contoh aplikasi memulai dua tugas: satu memproses aliran, yang lainnya adalah thread latar belakang yang mengirimkan peristiwa ke aliran tersebut. Dalam aplikasi kehidupan nyata, sistem Anda akan menerbitkan peristiwa ke topik Kafka yang dapat digunakan oleh prosesor Anda, dan thread latar belakang hanya diperlukan untuk memasukkan data ke dalam contoh kita.
Anda dapat menginstal Faust melalui Python Package Index (PyPI) atau dari sumber.
Untuk menginstal menggunakan pip:
$ pip install -U faust
Faust juga mendefinisikan sekelompok ekstensi setuptools
yang dapat digunakan untuk menginstal Faust dan dependensi untuk fitur tertentu.
Anda dapat menentukannya dalam kebutuhan Anda atau pada baris perintah pip
dengan menggunakan tanda kurung. Pisahkan beberapa bundel menggunakan koma:
$ pip install " faust[rocksdb] "
$ pip install " faust[rocksdb,uvloop,fast,redis] "
Paket berikut tersedia:
faust[rocksdb] : | untuk menggunakan RocksDB untuk menyimpan status tabel Faust. Direkomendasikan dalam produksi. |
---|
faust[redis] : | untuk menggunakan Redis_ sebagai backend caching sederhana (gaya Memcached). |
---|
faust[yaml] : | untuk menggunakan YAML dan perpustakaan PyYAML di stream. |
---|
faust[fast] : | untuk menginstal semua ekstensi percepatan C yang tersedia ke inti Faust. |
---|
faust[datadog] : | untuk menggunakan monitor Datadog Faust. |
---|---|
faust[statsd] : | untuk menggunakan monitor Statsd Faust. |
faust[uvloop] : | untuk menggunakan Faust dengan uvloop . |
---|---|
faust[eventlet] : | untuk menggunakan Faust dengan eventlet |
faust[debug] : | untuk menggunakan aiomonitor untuk menghubungkan dan men-debug pekerja Faust yang sedang berjalan. |
---|---|
faust[setproctitle] : | ketika modul setproctitle diinstal, pekerja Faust akan menggunakannya untuk menetapkan nama proses yang lebih bagus di ps /daftar top . Juga diinstal dengan bundel fast dan debug . |
Unduh Faust versi terbaru dari http://pypi.org/project/faust
Anda dapat menginstalnya dengan melakukan:
$ tar xvfz faust-0.0.0.tar.gz
$ cd faust-0.0.0
$ python setup.py build
# python setup.py install
Perintah terakhir harus dijalankan sebagai pengguna yang memiliki hak istimewa jika Anda saat ini tidak menggunakan virtualenv.
Anda dapat menginstal snapshot Faust terbaru menggunakan perintah pip
berikut:
$ pip install https://github.com/robinhood/faust/zipball/master#egg=faust
Ya! Gunakan eventlet
sebagai jembatan untuk berintegrasi dengan asyncio
.
eventlet
Pendekatan ini berfungsi dengan pustaka Python pemblokiran apa pun yang dapat bekerja dengan eventlet
.
Menggunakan eventlet
mengharuskan Anda menginstal modul aioeventlet
, dan Anda dapat menginstalnya sebagai bundel bersama dengan Faust:
$ pip install -U faust[eventlet]
Kemudian untuk benar-benar menggunakan eventlet sebagai event loop Anda harus menggunakan argumen -L <faust --loop>
ke program faust
:
$ faust -L eventlet -A myproj worker -l info
atau tambahkan import mode.loop.eventlet
di bagian atas skrip titik masuk Anda:
#!/usr/bin/env python3
import mode . loop . eventlet # noqa
Peringatan
Sangat penting bahwa ini ada di bagian paling atas modul, dan dijalankan sebelum Anda mengimpor perpustakaan.
Ya! Gunakan jembatan tornado.platform.asyncio
: http://www.tornadoweb.org/en/stable/asyncio.html
Ya! Gunakan implementasi reaktor asyncio
: https://twistedmatrix.com/documents/17.1.0/api/twisted.internet.asyncioreactor.html
Tidak. Faust memerlukan Python 3.6 atau lebih baru, karena Faust banyak menggunakan fitur yang diperkenalkan di Python 3.6 (async, menunggu, anotasi tipe variabel).
Anda mungkin perlu menambah batas jumlah maksimum file yang terbuka. Posting berikut menjelaskan cara melakukannya di OS X: https://blog.dekstroza.io/ulimit-shenanigans-on-osx-el-capitan/
Faust mendukung kafka dengan versi >= 0.10.
Untuk diskusi tentang penggunaan, pengembangan, dan masa depan Faust, silakan bergabung dengan `fauststream`_ Slack.
Jika Anda memiliki saran, laporan bug, atau gangguan, silakan laporkan ke pelacak masalah kami di https://github.com/robinhood/faust/issues/
Perangkat lunak ini dilisensikan di bawah Lisensi BSD Baru. Lihat file LICENSE
di direktori distribusi teratas untuk teks lisensi lengkap.
Pengembangan Faust terjadi di GitHub: https://github.com/robinhood/faust
Anda sangat dianjurkan untuk berpartisipasi dalam pengembangan Faust.
Pastikan juga membaca bagian Berkontribusi pada Faust di dokumentasi.
Setiap orang yang berinteraksi dalam basis kode proyek, pelacak masalah, ruang obrolan, dan milis diharapkan mengikuti Kode Etik Faust.
Sebagai kontributor dan pengelola proyek-proyek ini, dan demi membina komunitas yang terbuka dan ramah, kami berjanji untuk menghormati semua orang yang berkontribusi melalui pelaporan masalah, memposting permintaan fitur, memperbarui dokumentasi, mengirimkan permintaan penarikan atau patch, dan aktivitas lainnya.
Kami berkomitmen untuk membuat partisipasi dalam proyek-proyek ini menjadi pengalaman bebas pelecehan bagi semua orang, tanpa memandang tingkat pengalaman, gender, identitas dan ekspresi gender, orientasi seksual, disabilitas, penampilan pribadi, ukuran tubuh, ras, etnis, usia, agama, atau kebangsaan.
Contoh perilaku yang tidak dapat diterima oleh peserta meliputi:
Pengelola proyek mempunyai hak dan tanggung jawab untuk menghapus, mengedit, atau menolak komentar, komitmen, kode, pengeditan wiki, masalah, dan kontribusi lainnya yang tidak selaras dengan Kode Etik ini. Dengan mengadopsi Kode Etik ini, pengelola proyek berkomitmen untuk menerapkan prinsip-prinsip ini secara adil dan konsisten pada setiap aspek pengelolaan proyek ini. Pengelola proyek yang tidak mengikuti atau menegakkan Kode Etik dapat dikeluarkan secara permanen dari tim proyek.
Kode etik ini berlaku baik di dalam ruang proyek maupun di ruang publik ketika seseorang mewakili proyek atau komunitasnya.
Contoh perilaku kasar, melecehkan, atau perilaku yang tidak dapat diterima dapat dilaporkan dengan membuka masalah atau menghubungi satu atau lebih pengelola proyek.
Kode Etik ini diadaptasi dari Perjanjian Kontributor, versi 1.2.0 yang tersedia di http://contributor-covenant.org/version/1/2/0/.