このライブラリは非推奨となり、管理もサポートも行われなくなりました。現在アクティブなコミュニティ プロジェクトは、https://github.com/faust-streaming/faust で見つけることができます。
バージョン: | 1.10.4 |
---|---|
ウェブ: | http://faust.readthedocs.io/ |
ダウンロード: | http://pypi.org/project/faust |
ソース: | http://github.com/robinhood/faust |
キーワード: | 分散、ストリーム、非同期、処理、データ、キュー、状態管理 |
# Python Streams
# Forever scalable event processing & in-memory durable K/V store;
# as a library w/ asyncio & static typing.
import faust
Faust は、Kafka Streams のアイデアを Python に移植したストリーム処理ライブラリです。
Robinhood では、毎日数十億のイベントを処理する高性能分散システムとリアルタイム データ パイプラインを構築するために使用されています。
Faust はストリーム処理とイベント処理の両方を提供し、Kafka Streams、Apache Spark/Storm/Samza/Flink などのツールと類似性を共有します。
DSL は使用せず、Python だけを使用します。これは、ストリーム処理時にお気に入りの Python ライブラリ (NumPy、PyTorch、Pandas、NLTK、Django、Flask、SQLAlchemy、++) をすべて使用できることを意味します。
Faust では、新しい async/await 構文と変数型の注釈を使用するには、Python 3.6 以降が必要です。
受信注文のストリームを処理する例を次に示します。
app = faust . App ( 'myapp' , broker = 'kafka://localhost' )
# Models describe how messages are serialized:
# {"account_id": "3fae-...", amount": 3}
class Order ( faust . Record ):
account_id : str
amount : int
@ app . agent ( value_type = Order )
async def order ( orders ):
async for order in orders :
# process infinite stream of orders.
print ( f'Order for { order . account_id } : { order . amount } ' )
エージェント デコレータは、基本的に Kafka トピックから消費し、受信したすべてのイベントに対して何らかの処理を行う「ストリーム プロセッサ」を定義します。
エージェントはasync def
関数であるため、Web リクエストなどの他の操作を非同期で実行することもできます。
このシステムは状態を保持し、データベースのように機能します。テーブルは分散キー/値ストアと呼ばれ、通常の Python 辞書として使用できます。
テーブルは、RocksDB と呼ばれる C++ で書かれた超高速組み込みデータベースを使用して各マシンにローカルに保存されます。
テーブルには、オプションで「ウィンドウ化」された集計カウントを保存することもできるため、「前日のクリック数」または「過去 1 時間のクリック数」を追跡できます。例えば。 Kafka ストリームと同様に、時間のタンブリング、ホッピング、スライディング ウィンドウをサポートしており、データがいっぱいになるのを防ぐために古いウィンドウを期限切れにすることができます。
信頼性を高めるために、Kafka トピックを「先行書き込みログ」として使用します。キーが変更されるたびに、変更ログに公開されます。スタンバイ ノードは、この変更ログを使用してデータの正確なレプリカを保持し、ノードのいずれかに障害が発生した場合の即時回復を可能にします。
ユーザーにとってテーブルは単なる辞書ですが、データは再起動間で保持され、ノード間でレプリケートされるため、フェールオーバー時に他のノードが自動的に引き継ぐことができます。
URL ごとにページビューをカウントできます。
# data sent to 'clicks' topic sharded by URL key.
# e.g. key="http://example.com" value="1"
click_topic = app . topic ( 'clicks' , key_type = str , value_type = int )
# default value for missing URL will be 0 with `default=int`
counts = app . Table ( 'click_counts' , default = int )
@ app . agent ( click_topic )
async def count_click ( clicks ):
async for url , count in clicks . items ():
counts [ url ] += count
Kafka トピックに送信されるデータはパーティション化されます。つまり、同じ URL のすべてのカウントが同じ Faust ワーカー インスタンスに配信されるように、クリックが URL ごとに分割されます。
Faust は、バイト、Unicode、シリアル化された構造など、あらゆるタイプのストリーム データをサポートしていますが、最新の Python 構文を使用してストリーム内のキーと値がどのようにシリアル化されるかを記述する「モデル」も付属しています。
# Order is a json serialized dictionary,
# having these fields:
class Order ( faust . Record ):
account_id : str
product_id : str
price : float
quantity : float = 1.0
orders_topic = app . topic ( 'orders' , key_type = str , value_type = Order )
@ app . agent ( orders_topic )
async def process_order ( orders ):
async for order in orders :
# process each order using regular Python
total_price = order . price * order . quantity
await send_order_received_email ( order . account_id , order )
Faust はmypy
型チェッカーを使用して静的に型指定されるため、アプリケーションを作成するときに静的型を利用できます。
Faust のソース コードは小さく、よく整理されており、Kafka Streams の実装を学習するための優れたリソースとして機能します。
ファウストは非常に使いやすいです。他のストリーム処理ソリューションの使用を開始するには、複雑な hello-world プロジェクトとインフラストラクチャ要件が必要です。 Faust に必要なのは Kafka のみで、残りは Python だけです。そのため、Python を知っていれば、すでに Faust を使用してストリーム処理を行うことができ、ほぼあらゆるものと統合できます。
以下に、より簡単に作成できるアプリケーションの 1 つを示します。
ファウストを輸入する クラス挨拶(faust.Record): 差出人名: str to_name: str app = faust.App('hello-app', Broker='kafka://localhost') topic = app.topic('hello-topic', value_type=Greeting) @app.agent(トピック) async def hello(挨拶): 挨拶中の挨拶の非同期: print(f'こんにちは、{greeting.from_name} から {greeting.to_name} まで') @app.timer(間隔=1.0) 非同期デフォルト example_sender(app): 待ってください、こんにちは。送信( value=挨拶(from_name='ファウスト', to_name='あなた'), ) __name__ == '__main__'の場合: app.main()
おそらく、async キーワードと await キーワードに少し怖気づくかもしれませんが、Faust を使用するためにasyncio
どのように機能するかを知る必要はありません。例を真似するだけで大丈夫です。
サンプル アプリケーションは 2 つのタスクを開始します。1 つはストリームの処理で、もう 1 つはそのストリームにイベントを送信するバックグラウンド スレッドです。実際のアプリケーションでは、システムはプロセッサが消費できるイベントを Kafka トピックに発行します。バックグラウンド スレッドは、この例にデータをフィードするためにのみ必要です。
Faust は、Python Package Index (PyPI) 経由またはソースからインストールできます。
pip を使用してインストールするには:
$ pip install -U faust
Faust は、Faust と特定の機能の依存関係をインストールするために使用できるsetuptools
拡張機能のグループも定義します。
これらは、要件内で指定することも、 pip
コマンドラインで括弧を使用して指定することもできます。複数のバンドルはカンマを使用して区切ります。
$ pip install " faust[rocksdb] "
$ pip install " faust[rocksdb,uvloop,fast,redis] "
次のバンドルが利用可能です。
faust[rocksdb] : | ファウストテーブルの状態を保存するために RocksDB を使用するため。 本番環境で推奨されます。 |
---|
faust[redis] : | Redis_ を単純なキャッシュ バックエンド (Memcached スタイル) として使用するため。 |
---|
faust[yaml] : | ストリームで YAML とPyYAML ライブラリを使用するためのものです。 |
---|
faust[fast] : | 利用可能なすべての C スピードアップ拡張機能を Faust コアにインストールします。 |
---|
faust[datadog] : | Datadog Faust モニターを使用するため。 |
---|---|
faust[statsd] : | Statsd Faust モニターを使用するため。 |
faust[uvloop] : | uvloop で Faust を使用するため。 |
---|---|
faust[eventlet] : | eventlet でファウストを使用する場合 |
faust[debug] : | aiomonitor 使用して実行中の Faust ワーカーに接続し、デバッグするため。 |
---|---|
faust[setproctitle] : | setproctitle モジュールがインストールされると、Faust ワーカーはそれを使用して、 ps / top リストにより適切なプロセス名を設定します。 fast バンドルとdebug バンドルでもインストールされます。 |
http://pypi.org/project/faust からファウストの最新バージョンをダウンロードします。
次のようにしてインストールできます。
$ tar xvfz faust-0.0.0.tar.gz
$ cd faust-0.0.0
$ python setup.py build
# python setup.py install
現在 virtualenv を使用していない場合は、最後のコマンドを特権ユーザーとして実行する必要があります。
次のpip
コマンドを使用して、Faust の最新のスナップショットをインストールできます。
$ pip install https://github.com/robinhood/faust/zipball/master#egg=faust
はい! eventlet
ブリッジとして使用して、 asyncio
と統合します。
eventlet
の使用このアプローチは、 eventlet
で動作するブロッキング Python ライブラリで動作します。
eventlet
を使用するには、 aioeventlet
モジュールをインストールする必要があります。これは、Faust とともにバンドルとしてインストールできます。
$ pip install -U faust[eventlet]
次に、実際にイベントレットをイベント ループとして使用するには、 faust
プログラムに-L <faust --loop>
引数を使用する必要があります。
$ faust -L eventlet -A myproj worker -l info
または、エントリ ポイント スクリプトの先頭にimport mode.loop.eventlet
を追加します。
#!/usr/bin/env python3
import mode . loop . eventlet # noqa
警告
これがモジュールの最上部にあり、ライブラリをインポートする前に実行されることが非常に重要です。
はい! tornado.platform.asyncio
ブリッジを使用します: http://www.tornadoweb.org/en/stable/asyncio.html
はい! asyncio
リアクター実装を使用します: https://twistedmatrix.com/documents/17.1.0/api/twisted.internet.asyncioreactor.html
いいえ。Faust は Python 3.6 で導入された機能 (async、await、変数型アノテーション) を多用するため、Python 3.6 以降が必要です。
開くファイルの最大数の制限を増やす必要がある場合があります。次の投稿では、OS X でその方法を説明しています: https://blog.dekstroza.io/ulimit-shenanigans-on-osx-el-capitan/
Faust はバージョン 0.10 以上の kafka をサポートします。
Faust の使用法、開発、将来についてのディスカッションについては、 `fauststream`_ Slack に参加してください。
提案、バグレポート、または迷惑な点がある場合は、https://github.com/robinhood/faust/issues/ にある問題トラッカーに報告してください。
このソフトウェアは、New BSD License に基づいてライセンス供与されています。完全なライセンス テキストについては、最上位の配布ディレクトリにあるLICENSE
ファイルを参照してください。
Faust の開発は GitHub で行われます: https://github.com/robinhood/faust
ファウストの開発に参加することを強くお勧めします。
ドキュメントの「Faust への貢献」セクションも必ずお読みください。
プロジェクトのコード ベース、問題トラッカー、チャット ルーム、メーリング リストでやり取りするすべての人は、ファウストの行動規範に従うことが期待されます。
これらのプロジェクトの貢献者および管理者として、またオープンで歓迎的なコミュニティを育成するという観点から、私たちは問題の報告、機能リクエストの投稿、ドキュメントの更新、プル リクエストやパッチの送信、その他の活動を通じて貢献するすべての人々を尊重することを誓約します。
私たちは、経験のレベル、性別、性自認と性表現、性的指向、障害、外見、体の大きさ、人種、民族、年齢、宗教、性別に関係なく、すべての人がハラスメントのないプロジェクトに参加できるように努めます。国籍。
参加者による容認できない行為の例は次のとおりです。
プロジェクト管理者は、この行動規範に従わないコメント、コミット、コード、Wiki 編集、問題、その他の投稿を削除、編集、または拒否する権利と責任を有します。この行動規範を採用することにより、プロジェクト管理者は、このプロジェクト管理のあらゆる側面にこれらの原則を公正かつ一貫して適用することを約束します。行動規範に従わない、または強制しないプロジェクト管理者は、プロジェクト チームから永久に削除される場合があります。
この行動規範は、プロジェクト スペース内と、個人がプロジェクトまたはそのコミュニティを代表する場合の公共スペースの両方に適用されます。
虐待、嫌がらせ、またはその他の容認できない行為の事例は、問題を開くか、1 人以上のプロジェクト管理者に連絡することによって報告できます。
この行動規範は、http://contributor-covenant.org/version/1/2/0/ で入手できる Contributor Covenant バージョン 1.2.0 から改変されたものです。