Esta biblioteca ha quedado obsoleta y ya no se administra ni se admite. El proyecto comunitario activo actual se puede encontrar en https://github.com/faust-streaming/faust
Versión: | 1.10.4 |
---|---|
Web: | http://faust.readthedocs.io/ |
Descargar: | http://pypi.org/project/faust |
Fuente: | http://github.com/robinhood/faust |
Palabras clave: | distribuido, flujo, asíncrono, procesamiento, datos, cola, gestión de estado |
# Python Streams
# Forever scalable event processing & in-memory durable K/V store;
# as a library w/ asyncio & static typing.
import faust
Faust es una biblioteca de procesamiento de flujos que traslada las ideas de Kafka Streams a Python.
Se utiliza en Robinhood para construir sistemas distribuidos de alto rendimiento y canales de datos en tiempo real que procesan miles de millones de eventos todos los días.
Faust proporciona procesamiento de transmisiones y procesamiento de eventos , y comparte similitudes con herramientas como Kafka Streams, Apache Spark/Storm/Samza/Flink,
No utiliza DSL, ¡es solo Python! Esto significa que puede utilizar todas sus bibliotecas de Python favoritas al procesar transmisiones: NumPy, PyTorch, Pandas, NLTK, Django, Flask, SQLAlchemy, ++
Faust requiere Python 3.6 o posterior para la nueva sintaxis async/await y las anotaciones de tipo variable.
A continuación se muestra un ejemplo de procesamiento de un flujo de pedidos entrantes:
app = faust . App ( 'myapp' , broker = 'kafka://localhost' )
# Models describe how messages are serialized:
# {"account_id": "3fae-...", amount": 3}
class Order ( faust . Record ):
account_id : str
amount : int
@ app . agent ( value_type = Order )
async def order ( orders ):
async for order in orders :
# process infinite stream of orders.
print ( f'Order for { order . account_id } : { order . amount } ' )
El decorador de agentes define un "procesador de flujo" que esencialmente consume de un tema de Kafka y hace algo por cada evento que recibe.
El agente es una función async def
, por lo que también puede realizar otras operaciones de forma asíncrona, como solicitudes web.
Este sistema puede persistir en el estado, actuando como una base de datos. Las tablas son almacenes de clave/valor distribuidos con nombres que puede utilizar como diccionarios de Python normales.
Las tablas se almacenan localmente en cada máquina utilizando una base de datos integrada súper rápida escrita en C++, llamada RocksDB.
Las tablas también pueden almacenar recuentos agregados que, opcionalmente, están en "ventanas" para que pueda realizar un seguimiento del "número de clics del último día" o del "número de clics en la última hora". Por ejemplo. Al igual que Kafka Streams, admitimos ventanas de tiempo giratorias, saltantes y deslizantes, y las ventanas antiguas pueden expirar para evitar que los datos se llenen.
Para mayor confiabilidad utilizamos un tema de Kafka como "registro de escritura anticipada". Cada vez que se cambia una clave, la publicamos en el registro de cambios. Los nodos en espera consumen este registro de cambios para mantener una réplica exacta de los datos y permiten la recuperación instantánea en caso de que alguno de los nodos falle.
Para el usuario, una tabla es solo un diccionario, pero los datos persisten entre reinicios y se replican entre nodos, de modo que, en caso de conmutación por error, otros nodos puedan tomar el control automáticamente.
Puede contar las visitas a la página por URL:
# data sent to 'clicks' topic sharded by URL key.
# e.g. key="http://example.com" value="1"
click_topic = app . topic ( 'clicks' , key_type = str , value_type = int )
# default value for missing URL will be 0 with `default=int`
counts = app . Table ( 'click_counts' , default = int )
@ app . agent ( click_topic )
async def count_click ( clicks ):
async for url , count in clicks . items ():
counts [ url ] += count
Los datos enviados al tema de Kafka están particionados, lo que significa que los clics se fragmentarán por URL de tal manera que cada recuento para la misma URL se entregará a la misma instancia de trabajador de Faust.
Faust admite cualquier tipo de datos de flujo: bytes, Unicode y estructuras serializadas, pero también viene con "Modelos" que usan la sintaxis moderna de Python para describir cómo se serializan las claves y los valores en los flujos:
# Order is a json serialized dictionary,
# having these fields:
class Order ( faust . Record ):
account_id : str
product_id : str
price : float
quantity : float = 1.0
orders_topic = app . topic ( 'orders' , key_type = str , value_type = Order )
@ app . agent ( orders_topic )
async def process_order ( orders ):
async for order in orders :
# process each order using regular Python
total_price = order . price * order . quantity
await send_order_received_email ( order . account_id , order )
Faust se escribe estáticamente mediante el verificador de tipos mypy
, por lo que puede aprovechar los tipos estáticos al escribir aplicaciones.
El código fuente de Faust es pequeño, está bien organizado y sirve como un buen recurso para aprender la implementación de Kafka Streams.
Fausto es extremadamente fácil de usar. Para comenzar a utilizar otras soluciones de procesamiento de flujo, debe tener proyectos de hola mundo y requisitos de infraestructura complicados. Faust solo requiere Kafka, el resto es solo Python, por lo que si conoce Python, ya puede usar Faust para realizar procesamiento de secuencias y puede integrarse con casi cualquier cosa.
Esta es una de las aplicaciones más fáciles que puedes hacer:
importar fausto Saludo de clase (faust.Record): from_name: cadena to_name: cadena aplicación = fausto.App('hola-aplicación', corredor='kafka://localhost') tema = app.topic('hola-tema', value_type=Saludo) @app.agent(tema) asíncrono def hola (saludos): asíncrono para saludar en saludos: print(f'Hola de {saludo.de_nombre} a {saludo.a_nombre}') @app.timer(intervalo=1.0) asíncrono def ejemplo_remitente (aplicación): espera hola.enviar( valor=Saludo(from_name='Fausto', to_name='tú'), ) si __nombre__ == '__principal__': aplicación.principal()
Probablemente te sientas un poco intimidado por las palabras clave async y await, pero no es necesario que sepas cómo funciona asyncio
para usar Faust: simplemente imita los ejemplos y estarás bien.
La aplicación de ejemplo inicia dos tareas: una procesa una secuencia y la otra es un subproceso en segundo plano que envía eventos a esa secuencia. En una aplicación de la vida real, su sistema publicará eventos en temas de Kafka que sus procesadores pueden consumir, y el hilo en segundo plano solo es necesario para introducir datos en nuestro ejemplo.
Puede instalar Faust a través del índice de paquetes de Python (PyPI) o desde la fuente.
Para instalar usando pip:
$ pip install -U faust
Faust también define un grupo de extensiones setuptools
que se pueden usar para instalar Faust y las dependencias de una característica determinada.
Puede especificarlos en sus requisitos o en la línea de comandos pip
usando corchetes. Separe varios paquetes usando la coma:
$ pip install " faust[rocksdb] "
$ pip install " faust[rocksdb,uvloop,fast,redis] "
Los siguientes paquetes están disponibles:
faust[rocksdb] : | para usar RocksDB para almacenar el estado de la tabla Fausto. Recomendado en producción. |
---|
faust[redis] : | para usar Redis_ como un backend de almacenamiento en caché simple (estilo Memcached). |
---|
faust[yaml] : | para usar YAML y la biblioteca PyYAML en transmisiones. |
---|
faust[fast] : | para instalar todas las extensiones de aceleración de C disponibles en Faust Core. |
---|
faust[datadog] : | para utilizar el monitor Datadog Faust. |
---|---|
faust[statsd] : | para usar el monitor Statsd Faust. |
faust[uvloop] : | para usar Fausto con uvloop . |
---|---|
faust[eventlet] : | para usar Fausto con eventlet |
faust[debug] : | para usar aiomonitor para conectar y depurar un trabajador Faust en ejecución. |
---|---|
faust[setproctitle] : | Cuando el módulo setproctitle esté instalado, el trabajador de Faust lo usará para establecer un nombre de proceso más agradable en los listados ps / top . También se instala con los paquetes fast y debug . |
Descargue la última versión de Faust desde http://pypi.org/project/faust
Puedes instalarlo haciendo:
$ tar xvfz faust-0.0.0.tar.gz
$ cd faust-0.0.0
$ python setup.py build
# python setup.py install
El último comando debe ejecutarse como usuario privilegiado si actualmente no está utilizando un virtualenv.
Puede instalar la última instantánea de Faust usando el siguiente comando pip
:
$ pip install https://github.com/robinhood/faust/zipball/master#egg=faust
¡Sí! Utilice eventlet
como puente para integrar con asyncio
.
eventlet
Este enfoque funciona con cualquier biblioteca Python de bloqueo que pueda funcionar con eventlet
.
El uso de eventlet
requiere que instales el módulo aioeventlet
, y puedes instalarlo como un paquete junto con Faust:
$ pip install -U faust[eventlet]
Luego, para usar eventlet como bucle de eventos, debe usar el argumento -L <faust --loop>
para el programa faust
:
$ faust -L eventlet -A myproj worker -l info
o agregue import mode.loop.eventlet
en la parte superior de su script de punto de entrada:
#!/usr/bin/env python3
import mode . loop . eventlet # noqa
Advertencia
Es muy importante que esté en la parte superior del módulo y que se ejecute antes de importar bibliotecas.
¡Sí! Utilice el puente tornado.platform.asyncio
: http://www.tornadoweb.org/en/stable/asyncio.html
¡Sí! Utilice la implementación del reactor asyncio
: https://twistedmatrix.com/documents/17.1.0/api/twisted.internet.asyncioreactor.html
No. Faust requiere Python 3.6 o posterior, ya que utiliza en gran medida características que se introdujeron en Python 3.6 (async, await, anotaciones de tipo variable).
Es posible que deba aumentar el límite de la cantidad máxima de archivos abiertos. La siguiente publicación explica cómo hacerlo en OS X: https://blog.dekstroza.io/ulimit-shenanigans-on-osx-el-capitan/
Fausto soporta kafka con versión >= 0.10.
Para debates sobre el uso, el desarrollo y el futuro de Faust, únase a `fauststream`_ Slack.
Si tiene alguna sugerencia, informe de error o molestia, infórmelo a nuestro rastreador de problemas en https://github.com/robinhood/faust/issues/
Este software tiene la licencia Nueva Licencia BSD. Consulte el archivo LICENSE
en el directorio de distribución superior para obtener el texto completo de la licencia.
El desarrollo de Fausto ocurre en GitHub: https://github.com/robinhood/faust
Le recomendamos encarecidamente que participe en el desarrollo de Fausto.
Asegúrese de leer también la sección Contribuyendo a Fausto en la documentación.
Se espera que todos los que interactúan en las bases de códigos, rastreadores de problemas, salas de chat y listas de correo del proyecto sigan el Código de conducta de Faust.
Como contribuyentes y mantenedores de estos proyectos, y con el fin de fomentar una comunidad abierta y acogedora, nos comprometemos a respetar a todas las personas que contribuyen informando problemas, publicando solicitudes de funciones, actualizando documentación, enviando solicitudes de extracción o parches y otras actividades.
Estamos comprometidos a hacer que la participación en estos proyectos sea una experiencia libre de acoso para todos, independientemente del nivel de experiencia, género, identidad y expresión de género, orientación sexual, discapacidad, apariencia personal, tamaño corporal, raza, etnia, edad, religión o nacionalidad.
Ejemplos de comportamiento inaceptable por parte de los participantes incluyen:
Los mantenedores del proyecto tienen el derecho y la responsabilidad de eliminar, editar o rechazar comentarios, confirmaciones, códigos, ediciones de wiki, problemas y otras contribuciones que no estén alineadas con este Código de conducta. Al adoptar este Código de conducta, los mantenedores del proyecto se comprometen a aplicar estos principios de manera justa y consistente en todos los aspectos de la gestión de este proyecto. Los mantenedores del proyecto que no sigan ni hagan cumplir el Código de conducta pueden ser eliminados permanentemente del equipo del proyecto.
Este código de conducta se aplica tanto dentro de los espacios del proyecto como en los espacios públicos cuando un individuo representa el proyecto o su comunidad.
Se pueden informar casos de comportamiento abusivo, acosador o de otro modo inaceptable abriendo un problema o contactando a uno o más de los mantenedores del proyecto.
Este Código de conducta está adaptado del Convenio del colaborador, versión 1.2.0 disponible en http://contributor-covenant.org/version/1/2/0/.