Эта библиотека устарела и больше не управляется и не поддерживается. Текущий активный проект сообщества можно найти по адресу https://github.com/faust-streaming/faust.
Версия: | 1.10.4 |
---|---|
Интернет: | http://faust.readthedocs.io/ |
Скачать: | http://pypi.org/project/faust |
Источник: | http://github.com/robinhood/faust |
Ключевые слова: | распределенный, поток, асинхронный, обработка, данные, очередь, управление состоянием |
# Python Streams
# Forever scalable event processing & in-memory durable K/V store;
# as a library w/ asyncio & static typing.
import faust
Faust — это библиотека потоковой обработки, переносящая идеи Kafka Streams на Python.
Он используется в Robinhood для создания высокопроизводительных распределенных систем и конвейеров данных в реальном времени, которые ежедневно обрабатывают миллиарды событий.
Faust обеспечивает как потоковую обработку , так и обработку событий , имея сходство с такими инструментами, как Kafka Streams, Apache Spark/Storm/Samza/Flink,
Он не использует DSL, это просто Python! Это означает, что вы можете использовать все ваши любимые библиотеки Python при потоковой обработке: NumPy, PyTorch, Pandas, NLTK, Django, Flask, SQLAlchemy, ++.
Faust требует Python 3.6 или более поздней версии для нового синтаксиса async/await и аннотаций типов переменных.
Вот пример обработки потока входящих заказов:
app = faust . App ( 'myapp' , broker = 'kafka://localhost' )
# Models describe how messages are serialized:
# {"account_id": "3fae-...", amount": 3}
class Order ( faust . Record ):
account_id : str
amount : int
@ app . agent ( value_type = Order )
async def order ( orders ):
async for order in orders :
# process infinite stream of orders.
print ( f'Order for { order . account_id } : { order . amount } ' )
Декоратор Agent определяет «потоковый процессор», который, по сути, получает информацию из темы Kafka и делает что-то для каждого полученного события.
Агент представляет собой async def
, поэтому может также выполнять другие операции асинхронно, например веб-запросы.
Эта система может сохранять состояние, действуя как база данных. Таблицы — это распределенные хранилища ключей и значений, которые можно использовать как обычные словари Python.
Таблицы хранятся локально на каждом компьютере с использованием сверхбыстрой встроенной базы данных, написанной на C++, которая называется RocksDB.
Таблицы также могут хранить совокупные значения, которые могут быть «оконными», чтобы вы могли отслеживать «количество кликов за последний день» или «количество кликов за последний час». например. Как и Kafka Streams, мы поддерживаем переворачивающиеся, прыгающие и скользящие окна времени, а срок действия старых окон может быть истечен, чтобы предотвратить заполнение данных.
Для надежности мы используем тему Kafka как «журнал упреждающей записи». Всякий раз, когда ключ меняется, мы публикуем его в журнале изменений. Резервные узлы используют этот журнал изменений, чтобы сохранить точную копию данных и обеспечить мгновенное восстановление в случае сбоя любого из узлов.
Для пользователя таблица — это просто словарь, но данные сохраняются между перезапусками и реплицируются между узлами, поэтому при аварийном переключении другие узлы могут автоматически взять на себя управление.
Вы можете подсчитать просмотры страниц по URL:
# data sent to 'clicks' topic sharded by URL key.
# e.g. key="http://example.com" value="1"
click_topic = app . topic ( 'clicks' , key_type = str , value_type = int )
# default value for missing URL will be 0 with `default=int`
counts = app . Table ( 'click_counts' , default = int )
@ app . agent ( click_topic )
async def count_click ( clicks ):
async for url , count in clicks . items ():
counts [ url ] += count
Данные, отправляемые в тему Kafka, секционируются, что означает, что клики будут сегментироваться по URL-адресам таким образом, что каждый счетчик для одного и того же URL-адреса будет доставлен в один и тот же экземпляр рабочего Faust.
Faust поддерживает любой тип потоковых данных: байты, Unicode и сериализованные структуры, но также поставляется с «моделями», которые используют современный синтаксис Python для описания того, как сериализуются ключи и значения в потоках:
# Order is a json serialized dictionary,
# having these fields:
class Order ( faust . Record ):
account_id : str
product_id : str
price : float
quantity : float = 1.0
orders_topic = app . topic ( 'orders' , key_type = str , value_type = Order )
@ app . agent ( orders_topic )
async def process_order ( orders ):
async for order in orders :
# process each order using regular Python
total_price = order . price * order . quantity
await send_order_received_email ( order . account_id , order )
Faust статически типизирован с использованием средства проверки типов mypy
, поэтому вы можете использовать преимущества статических типов при написании приложений.
Исходный код Faust небольшой, хорошо организованный и служит хорошим ресурсом для изучения реализации Kafka Streams.
Фауст чрезвычайно прост в использовании. Чтобы начать использовать другие решения для потоковой обработки, у вас есть сложные проекты hello world и требования к инфраструктуре. Для Faust требуется только Kafka, остальное — это только Python, поэтому, если вы знаете Python, вы уже можете использовать Faust для потоковой обработки, и он может интегрироваться практически с чем угодно.
Вот одно из самых простых приложений, которые вы можете сделать:
импортировать Фауст Приветствие класса (faust.Record): from_name: ул to_name: ул app = faust.App('hello-app',broker='kafka://localhost') theme = app.topic('hello-topic', value_type=Приветствие) @app.agent(тема) асинхронное определение привет (приветствие): асинхронность для приветствия в приветствиях: print(f'Привет от {greeting.from_name} до {greeting.to_name}') @app.timer(интервал=1,0) асинхронная защита example_sender (приложение): жду привет.отправить( value=Приветствие(from_name='Фауст', to_name='вы'), ) если __name__ == '__main__': приложение.main()
Вероятно, вас немного напугали ключевые слова async и await, но вам не обязательно знать, как работает asyncio
, чтобы использовать Faust: просто подражайте примерам, и все будет в порядке.
Пример приложения запускает две задачи: одна обрабатывает поток, другая — фоновый поток, отправляющий события в этот поток. В реальном приложении ваша система будет публиковать события в темах Kafka, которые смогут использовать ваши процессоры, а фоновый поток необходим только для подачи данных в наш пример.
Вы можете установить Faust либо через индекс пакетов Python (PyPI), либо из исходного кода.
Для установки с помощью pip:
$ pip install -U faust
Faust также определяет группу расширений setuptools
, которые можно использовать для установки Faust и зависимостей для данной функции.
Вы можете указать их в своих требованиях или в командной строке pip
используя скобки. Разделяйте несколько пакетов запятой:
$ pip install " faust[rocksdb] "
$ pip install " faust[rocksdb,uvloop,fast,redis] "
Доступны следующие пакеты:
faust[rocksdb] : | за использование RocksDB для хранения состояния таблицы Faust. Рекомендован в производство. |
---|
faust[redis] : | для использования Redis_ в качестве простого механизма кэширования (в стиле Memcached). |
---|
faust[yaml] : | для использования YAML и библиотеки PyYAML в потоках. |
---|
faust[fast] : | для установки всех доступных расширений ускорения C для ядра Faust. |
---|
faust[datadog] : | для использования монитора Datadog Faust. |
---|---|
faust[statsd] : | для использования монитора Statsd Faust. |
faust[uvloop] : | за использование Faust с uvloop . |
---|---|
faust[eventlet] : | для использования Faust с eventlet |
faust[debug] : | за использование aiomonitor для подключения и отладки работающего рабочего Faust. |
---|---|
faust[setproctitle] : | когда модуль setproctitle установлен, рабочий Faust будет использовать его для установки более красивого имени процесса в списках ps / top . Также устанавливается вместе с fast и debug пакетами. |
Загрузите последнюю версию Faust с http://pypi.org/project/faust.
Вы можете установить его, выполнив:
$ tar xvfz faust-0.0.0.tar.gz
$ cd faust-0.0.0
$ python setup.py build
# python setup.py install
Последняя команда должна быть выполнена от имени привилегированного пользователя, если вы в данный момент не используете virtualenv.
Вы можете установить последний снимок Faust, используя следующую команду pip
:
$ pip install https://github.com/robinhood/faust/zipball/master#egg=faust
Да! Используйте eventlet
как мост для интеграции с asyncio
.
eventlet
Этот подход работает с любой блокирующей библиотекой Python, которая может работать с eventlet
.
Для использования eventlet
вам необходимо установить модуль aioeventlet
, и вы можете установить его в виде пакета вместе с Faust:
$ pip install -U faust[eventlet]
Затем, чтобы фактически использовать eventlet в качестве цикла событий, вам нужно либо использовать аргумент -L <faust --loop>
для программы faust
:
$ faust -L eventlet -A myproj worker -l info
или добавьте import mode.loop.eventlet
в начало сценария точки входа:
#!/usr/bin/env python3
import mode . loop . eventlet # noqa
Предупреждение
Очень важно, чтобы он находился в самом верху модуля и выполнялся до импорта библиотек.
Да! Используйте tornado.platform.asyncio
: http://www.tornadoweb.org/en/stable/asyncio.html.
Да! Используйте реализацию реактора asyncio
: https://twistedmatrix.com/documents/17.1.0/api/twisted.internet.asyncioreactor.html.
Нет. Для Faust требуется Python 3.6 или более поздней версии, поскольку он активно использует функции, представленные в Python 3.6 (async, await, аннотации типов переменных).
Возможно, вам придется увеличить лимит максимального количества открытых файлов. В следующем сообщении объясняется, как это сделать в OS X: https://blog.dekstroza.io/ulimit-shenanigans-on-osx-el-capitan/
Faust поддерживает Kafka версии >= 0.10.
Для обсуждения использования, развития и будущего Faust присоединяйтесь к `fauststream`_ Slack.
Если у вас есть какие-либо предложения, сообщения об ошибках или неприятности, сообщите о них в нашу систему отслеживания проблем по адресу https://github.com/robinhood/faust/issues/.
Это программное обеспечение распространяется по новой лицензии BSD. Полный текст лицензии см. в файле LICENSE
в верхнем каталоге распространения.
Разработка Faust ведется на GitHub: https://github.com/robinhood/faust.
Мы настоятельно рекомендуем вам принять участие в разработке Faust.
Обязательно прочтите раздел «Вклад в Faust» в документации.
Ожидается, что все, кто взаимодействует с базами кода проекта, системами отслеживания проблем, чатами и списками рассылки, будут следовать Кодексу поведения Фауста.
Как участники и сопровождающие этих проектов, а также в интересах развития открытого и гостеприимного сообщества, мы обязуемся уважать всех людей, которые вносят свой вклад, сообщая о проблемах, публикуя запросы функций, обновляя документацию, отправляя запросы на включение или исправления и другие действия.
Мы стремимся сделать участие в этих проектах свободным от притеснений для всех, независимо от уровня опыта, пола, гендерной идентичности и гендерного самовыражения, сексуальной ориентации, инвалидности, внешнего вида, размера тела, расы, этнической принадлежности, возраста, религии или национальность.
Примеры неприемлемого поведения участников включают в себя:
Сопровождающие проекта имеют право и ответственность удалять, редактировать или отклонять комментарии, коммиты, код, изменения вики, проблемы и другие материалы, которые не соответствуют настоящему Кодексу поведения. Принимая настоящий Кодекс поведения, специалисты по сопровождению проекта обязуются справедливо и последовательно применять эти принципы ко всем аспектам управления этим проектом. Специалисты по сопровождению проекта, которые не соблюдают или не обеспечивают соблюдение Кодекса поведения, могут быть навсегда исключены из команды проекта.
Этот кодекс поведения применяется как в помещениях проекта, так и в общественных местах, когда человек представляет проект или его сообщество.
О случаях оскорбительного, преследующего или иного неприемлемого поведения можно сообщить, открыв проблему или связавшись с одним или несколькими сопровождающими проекта.
Настоящий Кодекс поведения является адаптированным вариантом Соглашения участников, версия 1.2.0, доступного по адресу http://contributor-covenant.org/version/1/2/0/.