一個使用 Python 中的可觀察集合和查詢運算子函數編寫非同步和基於事件的程式的函式庫
對於 v3.X,請前往 v3 分支。
ReactiveX for Python v4.x 在 Python 3.7 或更高版本上運行。安裝:
pip3 install reactivex
ReactiveX for Python (RxPY) 是一個函式庫,用於在 Python 中使用可觀察序列和可管道查詢運算子來編寫非同步和基於事件的程式。使用 Rx,開發人員可以使用 Observables 表示非同步資料流,使用運算子查詢非同步資料流,並使用調度程序參數化資料/事件流中的並發性。
import reactivex as rx
from reactivex import operators as ops
source = rx . of ( "Alpha" , "Beta" , "Gamma" , "Delta" , "Epsilon" )
composed = source . pipe (
ops . map ( lambda s : len ( s )),
ops . filter ( lambda i : i >= 5 )
)
composed . subscribe ( lambda value : print ( "Received {0}" . format ( value )))
閱讀文件以了解 ReactiveX 的原理並取得可用運算子的完整參考。
如果您需要從 RxPY v1.x 或 v3.x 遷移程式碼,請閱讀遷移部分。
此處也提供了第三方文件清單。
加入 GitHub 討論上的對話!如果您有任何疑問或建議。
ReactiveX for Python 是一個相當完整的 Rx 實現,擁有超過 120 個運算符,以及超過 1300 個通過的單元測試。 RxPY 主要是 RxJS 的直接移植,但在線程和阻塞運算子方面也藉鑒了 Rx.NET 和 RxJava 的一些東西。
ReactiveX for Python 遵循 PEP 8,因此所有函數和方法名稱均採用snake_cased
,即小寫,單字之間用下劃線分隔,以提高可讀性。
因此.NET 程式碼如:
var group = source . GroupBy ( i => i % 3 ) ;
在Python中需要用_
來寫:
group = source . pipe ( ops . group_by ( lambda i : i % 3 ))
使用 ReactiveX for Python,當運算子有多個選用參數時,您應該使用命名關鍵字參數而不是位置參數。 RxPY 不會嘗試偵測您提供給操作員(或不提供)哪些參數。
該專案是使用 Poetry 進行管理的。程式碼採用 Black, isort 格式。使用pyright 和mypy 對程式碼進行靜態類型檢查。
如果您想利用預設的 VSCode 集成,請先配置 Poetry 以在儲存庫中建立其虛擬環境:
poetry config virtualenvs.in-project true
克隆儲存庫後,啟動工具:
poetry install
poetry run pre-commit install
運行單元測試:
poetry run pytest
執行程式碼檢查(手動):
poetry run pre-commit run --all-files