该库已被弃用,不再受管理或支持。当前活跃的社区项目可以在 https://github.com/faust-streaming/faust 找到
版本: | 1.10.4 |
---|---|
网址: | http://faust.readthedocs.io/ |
下载: | http://pypi.org/project/faust |
来源: | http://github.com/robinhood/faust |
关键词: | 分布式、流、异步、处理、数据、队列、状态管理 |
# Python Streams
# Forever scalable event processing & in-memory durable K/V store;
# as a library w/ asyncio & static typing.
import faust
Faust是一个流处理库,将 Kafka Streams 的思想移植到了 Python。
Robinhood 使用它来构建高性能分布式系统和实时数据管道,每天处理数十亿个事件。
Faust 提供流处理和事件处理,与 Kafka Streams、Apache Spark/Storm/Samza/Flink 等工具有相似之处,
它不使用 DSL,只是 Python!这意味着您可以在流处理时使用所有您喜欢的 Python 库:NumPy、PyTorch、Pandas、NLTK、Django、Flask、SQLAlchemy、++
Faust 需要 Python 3.6 或更高版本才能使用新的 async/await 语法和变量类型注释。
以下是处理传入订单流的示例:
app = faust . App ( 'myapp' , broker = 'kafka://localhost' )
# Models describe how messages are serialized:
# {"account_id": "3fae-...", amount": 3}
class Order ( faust . Record ):
account_id : str
amount : int
@ app . agent ( value_type = Order )
async def order ( orders ):
async for order in orders :
# process infinite stream of orders.
print ( f'Order for { order . account_id } : { order . amount } ' )
Agent 装饰器定义了一个“流处理器”,它本质上是从 Kafka 主题进行消费,并对其接收到的每个事件执行一些操作。
该代理是一个async def
函数,因此还可以异步执行其他操作,例如 Web 请求。
该系统可以持久保存状态,就像数据库一样。表被命名为分布式键/值存储,您可以将其用作常规 Python 字典。
表使用用 C++ 编写的超快速嵌入式数据库(称为 RocksDB)本地存储在每台计算机上。
表格还可以存储可以选择“窗口化”的聚合计数,以便您可以跟踪“最后一天的点击次数”或“最后一小时的点击次数”。例如。与 Kafka Streams 一样,我们支持翻滚、跳跃和滑动时间窗口,并且旧窗口可以过期以阻止数据填满。
为了可靠性,我们使用 Kafka 主题作为“预写日志”。每当密钥发生更改时,我们都会发布到更改日志。备用节点使用此更改日志来保留数据的精确副本,并在任何节点发生故障时启用即时恢复。
对于用户来说,表只是一个字典,但数据在重新启动之间保留并跨节点复制,因此在故障转移时其他节点可以自动接管。
您可以按 URL 计算页面浏览量:
# data sent to 'clicks' topic sharded by URL key.
# e.g. key="http://example.com" value="1"
click_topic = app . topic ( 'clicks' , key_type = str , value_type = int )
# default value for missing URL will be 0 with `default=int`
counts = app . Table ( 'click_counts' , default = int )
@ app . agent ( click_topic )
async def count_click ( clicks ):
async for url , count in clicks . items ():
counts [ url ] += count
发送到 Kafka 主题的数据是分区的,这意味着点击将按 URL 进行分片,这样同一 URL 的每个计数都将传递到同一个 Faust 工作实例。
Faust 支持任何类型的流数据:字节、Unicode 和序列化结构,但还附带使用现代 Python 语法来描述流中的键和值如何序列化的“模型”:
# Order is a json serialized dictionary,
# having these fields:
class Order ( faust . Record ):
account_id : str
product_id : str
price : float
quantity : float = 1.0
orders_topic = app . topic ( 'orders' , key_type = str , value_type = Order )
@ app . agent ( orders_topic )
async def process_order ( orders ):
async for order in orders :
# process each order using regular Python
total_price = order . price * order . quantity
await send_order_received_email ( order . account_id , order )
Faust 是静态类型的,使用mypy
类型检查器,因此您可以在编写应用程序时利用静态类型。
Faust 源代码很小,组织良好,是学习 Kafka Streams 实现的良好资源。
Faust 非常容易使用。要开始使用其他流处理解决方案,您需要复杂的 hello-world 项目和基础设施要求。 Faust 只需要 Kafka,其余的只是 Python,所以如果你了解 Python,你就已经可以使用 Faust 进行流处理,并且它可以与几乎任何东西集成。
这是您可以制作的更简单的应用程序之一:
进口浮士德 问候语类(faust.Record): 发件人名称:str 收件人名称:str app = faust.App('hello-app',broker='kafka://localhost') topic = app.topic('hello-topic', value_type=问候语) @app.agent(主题) 异步定义你好(问候语): 问候语中的异步问候语: print(f'你好,从 {greeting.from_name} 到 {greeting.to_name}') @app.timer(间隔=1.0) 异步 def example_sender(app): 等待你好.发送( value=问候语(from_name='浮士德', to_name='你'), ) 如果 __name__ == '__main__': 应用程序.main()
您可能对 async 和 wait 关键字有点害怕,但您不必知道asyncio
如何工作即可使用 Faust:只需模仿示例,就可以了。
该示例应用程序启动两个任务:一个是处理流,另一个是后台线程向该流发送事件。在现实应用程序中,您的系统将向 Kafka 主题发布事件,您的处理器可以从中使用这些事件,并且后台线程只需要将数据输入到我们的示例中。
您可以通过 Python 包索引 (PyPI) 或从源代码安装 Faust。
使用 pip 安装:
$ pip install -U faust
Faust 还定义了一组setuptools
扩展,可用于安装 Faust 以及给定功能的依赖项。
您可以在您的要求中或使用括号在pip
命令行上指定这些内容。使用逗号分隔多个包:
$ pip install " faust[rocksdb] "
$ pip install " faust[rocksdb,uvloop,fast,redis] "
提供以下捆绑包:
faust[rocksdb] : | 使用 RocksDB 存储 Faust 表状态。 推荐用于生产。 |
---|
faust[redis] : | 使用 Redis_ 作为简单的缓存后端(Memcached 风格)。 |
---|
faust[yaml] : | 用于在流中使用 YAML 和PyYAML 库。 |
---|
faust[fast] : | 用于将所有可用的 C 加速扩展安装到 Faust 核心。 |
---|
faust[datadog] : | 用于使用 Datadog Faust 监视器。 |
---|---|
faust[statsd] : | 用于使用 Statsd Faust 监视器。 |
faust[uvloop] : | 用于将 Faust 与uvloop 一起使用。 |
---|---|
faust[eventlet] : | 用于将 Faust 与eventlet 结合使用 |
faust[debug] : | 用于使用aiomonitor 连接和调试正在运行的 Faust 工作线程。 |
---|---|
faust[setproctitle] : | 安装setproctitle 模块后,Faust 工作程序将使用它在ps / top 列表中设置更好的进程名称。还随fast 包和debug 包一起安装。 |
从 http://pypi.org/project/faust 下载最新版本的 Faust
您可以通过执行以下操作来安装它:
$ tar xvfz faust-0.0.0.tar.gz
$ cd faust-0.0.0
$ python setup.py build
# python setup.py install
如果您当前未使用 virtualenv,则必须以特权用户身份执行最后一个命令。
您可以使用以下pip
命令安装 Faust 的最新快照:
$ pip install https://github.com/robinhood/faust/zipball/master#egg=faust
是的!使用eventlet
作为与asyncio
集成的桥梁。
eventlet
此方法适用于任何可与eventlet
配合使用的阻塞 Python 库。
使用eventlet
需要安装aioeventlet
模块,您可以将其与 Faust 一起作为捆绑包安装:
$ pip install -U faust[eventlet]
然后,要实际使用 eventlet 作为事件循环,您必须使用faust
程序的-L <faust --loop>
参数:
$ faust -L eventlet -A myproj worker -l info
或者在入口点脚本的顶部添加import mode.loop.eventlet
:
#!/usr/bin/env python3
import mode . loop . eventlet # noqa
警告
这是非常重要的,它位于模块的最顶部,并且它在导入库之前执行。
是的!使用tornado.platform.asyncio
桥:http://www.tornadoweb.org/en/stable/asyncio.html
是的!使用asyncio
反应器实现:https://twistedmatrix.com/documents/17.1.0/api/twisted.internet.asyncioreactor.html
不需要。Faust 需要 Python 3.6 或更高版本,因为它大量使用 Python 3.6 中引入的功能(异步、等待、变量类型注释)。
您可能需要增加打开文件的最大数量的限制。以下文章解释了如何在 OS X 上执行此操作:https://blog.dekstroza.io/ulimit-shenanigans-on-osx-el-capitan/
Faust 支持版本 >= 0.10 的 kafka。
有关 Faust 的使用、开发和未来的讨论,请加入`fauststream`_ Slack。
如果您有任何建议、错误报告或烦恼,请将其报告给我们的问题跟踪器:https://github.com/robinhood/faust/issues/
该软件根据新 BSD 许可证获得许可。请参阅顶级分发目录中的LICENSE
文件以获取完整的许可证文本。
Faust 的开发在 GitHub 上进行:https://github.com/robinhood/faust
我们强烈鼓励您参与 Faust 的开发。
请务必阅读文档中的“为 Faust 做贡献”部分。
在项目代码库、问题跟踪器、聊天室和邮件列表中进行交互的每个人都应遵循 Faust 行为准则。
作为这些项目的贡献者和维护者,为了培育一个开放和热情的社区,我们承诺尊重所有通过报告问题、发布功能请求、更新文档、提交拉取请求或补丁以及其他活动做出贡献的人。
我们致力于为每个人提供无骚扰的参与这些项目的体验,无论其经验水平、性别、性别认同和表达、性取向、残疾、个人外表、体型、种族、民族、年龄、宗教或国籍。
参与者不可接受的行为示例包括:
项目维护者有权利和责任删除、编辑或拒绝不符合本行为准则的评论、提交、代码、wiki 编辑、问题和其他贡献。通过采用本行为准则,项目维护者承诺公平且一致地将这些原则应用于管理该项目的各个方面。不遵守或执行行为准则的项目维护人员可能会被永久从项目团队中除名。
当个人代表项目或其社区时,本行为准则适用于项目空间和公共空间。
可以通过提出问题或联系一名或多名项目维护人员来报告辱骂、骚扰或其他不可接受的行为。
本行为准则改编自《贡献者契约》1.2.0 版,网址为 http://contributor-covenant.org/version/1/2/0/。