一个使用 Python 中的可观察集合和查询运算符函数编写异步和基于事件的程序的库
对于 v3.X,请转到 v3 分支。
ReactiveX for Python v4.x 在 Python 3.7 或更高版本上运行。安装:
pip3 install reactivex
ReactiveX for Python (RxPY) 是一个库,用于在 Python 中使用可观察序列和可管道查询运算符来编写异步和基于事件的程序。使用 Rx,开发人员可以使用 Observables 表示异步数据流,使用运算符查询异步数据流,并使用调度程序参数化数据/事件流中的并发性。
import reactivex as rx
from reactivex import operators as ops
source = rx . of ( "Alpha" , "Beta" , "Gamma" , "Delta" , "Epsilon" )
composed = source . pipe (
ops . map ( lambda s : len ( s )),
ops . filter ( lambda i : i >= 5 )
)
composed . subscribe ( lambda value : print ( "Received {0}" . format ( value )))
阅读文档以了解 ReactiveX 的原理并获取可用运算符的完整参考。
如果您需要从 RxPY v1.x 或 v3.x 迁移代码,请阅读迁移部分。
此处还提供了第三方文档列表。
加入 GitHub 讨论上的对话!如果您有任何疑问或建议。
ReactiveX for Python 是一个相当完整的 Rx 实现,拥有超过 120 个运算符,以及超过 1300 个通过的单元测试。 RxPY 主要是 RxJS 的直接移植,但在线程和阻塞运算符方面也借鉴了 Rx.NET 和 RxJava 的一些东西。
ReactiveX for Python 遵循 PEP 8,因此所有函数和方法名称均采用snake_cased
,即小写,单词之间用下划线分隔,以提高可读性。
因此.NET 代码如:
var group = source . GroupBy ( i => i % 3 ) ;
在Python中需要用_
来写:
group = source . pipe ( ops . group_by ( lambda i : i % 3 ))
使用 ReactiveX for Python,当运算符有多个可选参数时,您应该使用命名关键字参数而不是位置参数。 RxPY 不会尝试检测您向操作员提供(或不提供)哪些参数。
该项目是使用 Poetry 进行管理的。代码采用 Black, isort 格式。使用pyright 和mypy 对代码进行静态类型检查。
如果您想利用默认的 VSCode 集成,请首先配置 Poetry 以在存储库中创建其虚拟环境:
poetry config virtualenvs.in-project true
克隆存储库后,激活工具:
poetry install
poetry run pre-commit install
运行单元测试:
poetry run pytest
运行代码检查(手动):
poetry run pre-commit run --all-files