該庫已被棄用,不再受管理或支援。目前活躍的社群專案可以在 https://github.com/faust-streaming/faust 找到
版本: | 1.10.4 |
---|---|
網址: | http://faust.readthedocs.io/ |
下載: | http://pypi.org/project/faust |
來源: | http://github.com/robinhood/faust |
關鍵字: | 分散式、流、非同步、處理、資料、佇列、狀態管理 |
# Python Streams
# Forever scalable event processing & in-memory durable K/V store;
# as a library w/ asyncio & static typing.
import faust
Faust是一個串流處理函式庫,將 Kafka Streams 的想法移植到了 Python。
Robinhood 使用它來建立高性能分散式系統和即時資料管道,每天處理數十億個事件。
Faust 提供串流處理和事件處理,與 Kafka Streams、Apache Spark/Storm/Samza/Flink 等工具有相似之處,
它不使用 DSL,只是 Python!這意味著您可以在流處理時使用所有您喜歡的 Python 庫:NumPy、PyTorch、Pandas、NLTK、Django、Flask、SQLAlchemy、++
Faust 需要 Python 3.6 或更高版本才能使用新的 async/await 語法和變數類型註解。
以下是處理傳入訂單流的範例:
app = faust . App ( 'myapp' , broker = 'kafka://localhost' )
# Models describe how messages are serialized:
# {"account_id": "3fae-...", amount": 3}
class Order ( faust . Record ):
account_id : str
amount : int
@ app . agent ( value_type = Order )
async def order ( orders ):
async for order in orders :
# process infinite stream of orders.
print ( f'Order for { order . account_id } : { order . amount } ' )
Agent 裝飾器定義了一個“流處理器”,它本質上是從 Kafka 主題消費,並對其接收到的每個事件執行一些操作。
該代理程式是一個async def
函數,因此也可以非同步執行其他操作,例如 Web 請求。
這個系統可以持久保存狀態,就像資料庫一樣。表被命名為分散式鍵/值存儲,您可以將其用作常規 Python 字典。
表使用用 C++ 編寫的超快速嵌入式資料庫(稱為 RocksDB)本地儲存在每台電腦上。
表格還可以儲存可以選擇「視窗化」的聚合計數,以便您可以追蹤「最後一天的點擊次數」或「最後一小時的點擊次數」。例如。與 Kafka Streams 一樣,我們支援翻滾、跳躍和滑動時間窗口,並且舊窗口可以過期以阻止資料填滿。
為了可靠性,我們使用 Kafka 主題作為「預寫日誌」。每當金鑰發生變更時,我們都會發佈到變更日誌。備用節點使用此變更日誌來保留資料的精確副本,並在任何節點發生故障時啟用即時復原。
對於使用者來說,表只是一個字典,但資料在重新啟動之間保留並跨節點複製,因此在故障轉移時其他節點可以自動接管。
您可以按 URL 計算頁面瀏覽量:
# data sent to 'clicks' topic sharded by URL key.
# e.g. key="http://example.com" value="1"
click_topic = app . topic ( 'clicks' , key_type = str , value_type = int )
# default value for missing URL will be 0 with `default=int`
counts = app . Table ( 'click_counts' , default = int )
@ app . agent ( click_topic )
async def count_click ( clicks ):
async for url , count in clicks . items ():
counts [ url ] += count
傳送到 Kafka 主題的資料是分區的,這表示點擊將按 URL 進行分片,這樣同一 URL 的每個計數都會傳遞到同一個 Faust 工作實例。
Faust 支援任何類型的流資料:位元組、Unicode 和序列化結構,但也附帶使用現代 Python 語法來描述流中的鍵和值如何序列化的「模型」:
# Order is a json serialized dictionary,
# having these fields:
class Order ( faust . Record ):
account_id : str
product_id : str
price : float
quantity : float = 1.0
orders_topic = app . topic ( 'orders' , key_type = str , value_type = Order )
@ app . agent ( orders_topic )
async def process_order ( orders ):
async for order in orders :
# process each order using regular Python
total_price = order . price * order . quantity
await send_order_received_email ( order . account_id , order )
Faust 是靜態類型的,使用mypy
類型檢查器,因此您可以在編寫應用程式時利用靜態類型。
Faust 原始碼很小,組織良好,是學習 Kafka Streams 實現的良好資源。
Faust 非常容易使用。要開始使用其他串流處理解決方案,您需要複雜的 hello-world 專案和基礎架構需求。 Faust 只需要 Kafka,其餘的只是 Python,所以如果你了解 Python,你已經可以使用 Faust 進行串流處理,並且它可以與幾乎任何東西整合。
這是您可以製作的更簡單的應用程式之一:
進口浮士德 問候語類別(faust.Record): 寄件者名稱:str 收件者名稱:str app = faust.App('hello-app',broker='kafka://localhost') topic = app.topic('hello-topic', value_type=問候語) @app.agent(主題) 非同步定義你好(問候語): 問候語中的非同步問候語: print(f'你好,從 {greeting.from_name} 到 {greeting.to_name}') @app.timer(間隔=1.0) 非同步 def example_sender(app): 等待你好.發送( value=問候語(from_name='浮士德', to_name='你'), ) 如果 __name__ == '__main__': 應用程式.main()
您可能對 async 和 wait 關鍵字有點害怕,但您不必知道asyncio
如何工作即可使用 Faust:只需模仿範例,就可以了。
此範例應用程式啟動兩個任務:一個是處理流,另一個是後台執行緒向該流發送事件。在現實應用程式中,您的系統將向 Kafka 主題發布事件,您的處理器可以從中使用這些事件,並且後台執行緒只需要將資料輸入到我們的範例中。
您可以透過 Python 套件索引 (PyPI) 或從原始碼安裝 Faust。
使用 pip 安裝:
$ pip install -U faust
Faust 還定義了一組setuptools
擴展,可用於安裝 Faust 以及給定功能的依賴項。
您可以在您的要求中或使用括號在pip
命令列上指定這些內容。使用逗號分隔多個套件:
$ pip install " faust[rocksdb] "
$ pip install " faust[rocksdb,uvloop,fast,redis] "
提供以下捆綁包:
faust[rocksdb] : | 使用 RocksDB 儲存 Faust 表狀態。 推薦用於生產。 |
---|
faust[redis] : | 使用 Redis_ 作為簡單的快取後端(Memcached 風格)。 |
---|
faust[yaml] : | 用於在流程中使用 YAML 和PyYAML 函式庫。 |
---|
faust[fast] : | 用於將所有可用的 C 加速擴充功能安裝到 Faust 核心。 |
---|
faust[datadog] : | 用於使用 Datadog Faust 監視器。 |
---|---|
faust[statsd] : | 用於使用 Statsd Faust 監視器。 |
faust[uvloop] : | 用於將 Faust 與uvloop 一起使用。 |
---|---|
faust[eventlet] : | 用於將 Faust 與eventlet 結合使用 |
faust[debug] : | 用於使用aiomonitor 連接和調試正在運行的 Faust 工作線程。 |
---|---|
faust[setproctitle] : | 安裝setproctitle 模組後,Faust 工作程式將使用它在ps / top 清單中設定更好的進程名稱。也隨fast 套件和debug 包一起安裝。 |
從 http://pypi.org/project/faust 下載最新版本的 Faust
您可以透過執行以下操作來安裝它:
$ tar xvfz faust-0.0.0.tar.gz
$ cd faust-0.0.0
$ python setup.py build
# python setup.py install
如果您目前未使用 virtualenv,則必須以特權使用者身分執行最後一個指令。
您可以使用以下pip
指令安裝 Faust 的最新快照:
$ pip install https://github.com/robinhood/faust/zipball/master#egg=faust
是的!使用eventlet
作為與asyncio
整合的橋樑。
eventlet
此方法適用於任何可與eventlet
配合使用的阻塞 Python 函式庫。
使用eventlet
需要安裝aioeventlet
模組,您可以將其與 Faust 一起作為捆綁包安裝:
$ pip install -U faust[eventlet]
然後,要實際使用 eventlet 作為事件循環,您必須使用faust
程式的-L <faust --loop>
參數:
$ faust -L eventlet -A myproj worker -l info
或在入口點腳本的頂部新增import mode.loop.eventlet
:
#!/usr/bin/env python3
import mode . loop . eventlet # noqa
警告
這是非常重要的,它位於模組的最頂部,並且它在導入庫之前執行。
是的!使用tornado.platform.asyncio
橋接器:http://www.tornadoweb.org/en/stable/asyncio.html
是的!使用asyncio
反應器實作:https://twistedmatrix.com/documents/17.1.0/api/twisted.internet.asyncioreactor.html
不需要。
您可能需要增加開啟檔案的最大數量的限制。以下文章解釋如何在 OS X 上執行此操作:https://blog.dekstroza.io/ulimit-shenanigans-on-osx-el-capitan/
Faust 支援版本 >= 0.10 的 kafka。
有關 Faust 的使用、開發和未來的討論,請加入`fauststream`_ Slack。
如果您有任何建議、錯誤報告或煩惱,請將其報告給我們的問題追蹤器:https://github.com/robinhood/faust/issues/
該軟體根據新 BSD 許可證獲得許可。請參閱頂級分發目錄中的LICENSE
檔案以取得完整的許可證文字。
Faust 的開發在 GitHub 上進行:https://github.com/robinhood/faust
我們強烈鼓勵您參與 Faust 的開發。
請務必閱讀文件中的「為 Faust 做貢獻」部分。
在專案程式碼庫、問題追蹤器、聊天室和郵件清單中進行互動的每個人都應遵循 Faust 行為準則。
作為這些專案的貢獻者和維護者,為了培育一個開放和熱情的社區,我們承諾尊重所有透過報告問題、發布功能請求、更新文件、提交拉取請求或修補程式以及其他活動做出貢獻的人。
我們致力於為每個人提供無騷擾的參與這些計畫的體驗,無論其經驗水平、性別、性別認同和表達、性取向、殘疾、個人外表、體型、種族、民族、年齡、宗教或國籍。
參與者不可接受的行為範例包括:
專案維護者有權利和責任刪除、編輯或拒絕不符合本行為準則的評論、提交、程式碼、wiki 編輯、問題和其他貢獻。透過採用本行為準則,專案維護者承諾公平且一致地將這些原則應用於管理專案的各個方面。不遵守或執行行為準則的專案維護人員可能會被永久從專案團隊中除名。
當個人代表項目或其社群時,本行為準則適用於專案空間和公共空間。
可以透過提出問題或聯繫一名或多名專案維護人員來報告辱罵、騷擾或其他不可接受的行為。
本行為準則改編自《貢獻者契約》1.2.0 版,網址為 http://contributor-covenant.org/version/1/2/0/。