Esta biblioteca foi descontinuada e não é mais gerenciada ou suportada. O atual projeto comunitário ativo pode ser encontrado em https://github.com/faust-streaming/faust
Versão: | 1.10.4 |
---|---|
Rede: | http://faust.readthedocs.io/ |
Download: | http://pypi.org/project/faust |
Fonte: | http://github.com/robinhood/faust |
Palavras-chave: | distribuído, fluxo, assíncrono, processamento, dados, fila, gerenciamento de estado |
# Python Streams
# Forever scalable event processing & in-memory durable K/V store;
# as a library w/ asyncio & static typing.
import faust
Faust é uma biblioteca de processamento de stream, que transfere as ideias do Kafka Streams para Python.
Ele é usado na Robinhood para construir sistemas distribuídos de alto desempenho e pipelines de dados em tempo real que processam bilhões de eventos todos os dias.
Faust fornece processamento de fluxo e processamento de eventos , compartilhando semelhança com ferramentas como Kafka Streams, Apache Spark/Storm/Samza/Flink,
Não usa DSL, é apenas Python! Isso significa que você pode usar todas as suas bibliotecas Python favoritas ao processar fluxo: NumPy, PyTorch, Pandas, NLTK, Django, Flask, SQLAlchemy, ++
Faust requer Python 3.6 ou posterior para a nova sintaxe async/await e anotações de tipo de variável.
Aqui está um exemplo de processamento de um fluxo de pedidos recebidos:
app = faust . App ( 'myapp' , broker = 'kafka://localhost' )
# Models describe how messages are serialized:
# {"account_id": "3fae-...", amount": 3}
class Order ( faust . Record ):
account_id : str
amount : int
@ app . agent ( value_type = Order )
async def order ( orders ):
async for order in orders :
# process infinite stream of orders.
print ( f'Order for { order . account_id } : { order . amount } ' )
O decorador do Agente define um "processador de fluxo" que essencialmente consome um tópico Kafka e faz algo para cada evento que recebe.
O agente é uma função async def
, portanto também pode executar outras operações de forma assíncrona, como solicitações da web.
Este sistema pode persistir no estado, agindo como um banco de dados. As tabelas são denominadas armazenamentos de chave/valor distribuídos que você pode usar como dicionários regulares do Python.
As tabelas são armazenadas localmente em cada máquina usando um banco de dados incorporado super rápido escrito em C++, chamado RocksDB.
As tabelas também podem armazenar contagens agregadas que são opcionalmente "em janela" para que você possa acompanhar o "número de cliques do último dia" ou o "número de cliques na última hora". por exemplo. Assim como o Kafka Streams, oferecemos suporte a janelas de tempo oscilantes, saltitantes e deslizantes, e janelas antigas podem expirar para impedir que os dados sejam preenchidos.
Para maior confiabilidade, usamos um tópico Kafka como "write-ahead-log". Sempre que uma chave é alterada, publicamos no changelog. Os nós em espera consomem esse changelog para manter uma réplica exata dos dados e permitem a recuperação instantânea caso algum dos nós falhe.
Para o usuário, uma tabela é apenas um dicionário, mas os dados são persistidos entre as reinicializações e replicados entre os nós, de modo que, no failover, outros nós possam assumir o controle automaticamente.
Você pode contar visualizações de página por URL:
# data sent to 'clicks' topic sharded by URL key.
# e.g. key="http://example.com" value="1"
click_topic = app . topic ( 'clicks' , key_type = str , value_type = int )
# default value for missing URL will be 0 with `default=int`
counts = app . Table ( 'click_counts' , default = int )
@ app . agent ( click_topic )
async def count_click ( clicks ):
async for url , count in clicks . items ():
counts [ url ] += count
Os dados enviados para o tópico Kafka são particionados, o que significa que os cliques serão fragmentados por URL de forma que cada contagem para o mesmo URL seja entregue à mesma instância de trabalho do Faust.
Faust suporta qualquer tipo de dados de fluxo: bytes, Unicode e estruturas serializadas, mas também vem com "Modelos" que usam a sintaxe Python moderna para descrever como chaves e valores em fluxos são serializados:
# Order is a json serialized dictionary,
# having these fields:
class Order ( faust . Record ):
account_id : str
product_id : str
price : float
quantity : float = 1.0
orders_topic = app . topic ( 'orders' , key_type = str , value_type = Order )
@ app . agent ( orders_topic )
async def process_order ( orders ):
async for order in orders :
# process each order using regular Python
total_price = order . price * order . quantity
await send_order_received_email ( order . account_id , order )
Faust é digitado estaticamente, usando o verificador de tipo mypy
, para que você possa aproveitar as vantagens dos tipos estáticos ao escrever aplicativos.
O código-fonte do Faust é pequeno, bem organizado e serve como um bom recurso para aprender a implementação do Kafka Streams.
Faust é extremamente fácil de usar. Para começar a usar outras soluções de processamento de fluxo, você tem projetos complexos e requisitos de infraestrutura. O Faust requer apenas Kafka, o resto é apenas Python, então se você conhece Python, já pode usar o Faust para fazer processamento de stream e ele pode ser integrado com praticamente qualquer coisa.
Aqui está uma das aplicações mais fáceis que você pode fazer:
importar fausto saudação da classe (faust.Record): from_name: str to_name: str app = faust.App('hello-app', corretor='kafka://localhost') tópico = app.topic('hello-topic', value_type=Saudação) @app.agent(tópico) assíncrono def olá (saudações): assíncrono para saudação em saudações: print(f'Olá de {greeting.from_name} para {greeting.to_name}') @app.timer(intervalo=1,0) assíncrono def exemplo_sender(aplicativo): aguarde olá.send( value=Saudação(from_name='Fausto', to_name='você'), ) se __nome__ == '__principal__': app.main()
Você provavelmente está um pouco intimidado pelas palavras-chave async e await, mas não precisa saber como asyncio
funciona para usar o Faust: apenas imite os exemplos e você ficará bem.
O aplicativo de exemplo inicia duas tarefas: uma é processar um fluxo, a outra é um encadeamento em segundo plano que envia eventos para esse fluxo. Em uma aplicação da vida real, seu sistema publicará eventos em tópicos Kafka que seus processadores podem consumir, e o thread de segundo plano só é necessário para alimentar dados em nosso exemplo.
Você pode instalar o Faust por meio do Python Package Index (PyPI) ou da fonte.
Para instalar usando pip:
$ pip install -U faust
O Faust também define um grupo de extensões setuptools
que podem ser usadas para instalar o Faust e as dependências de um determinado recurso.
Você pode especificá-los em seus requisitos ou na linha de comando pip
usando colchetes. Separe vários pacotes usando vírgula:
$ pip install " faust[rocksdb] "
$ pip install " faust[rocksdb,uvloop,fast,redis] "
Os seguintes pacotes estão disponíveis:
faust[rocksdb] : | por usar RocksDB para armazenar o estado da tabela Faust. Recomendado em produção. |
---|
faust[redis] : | para usar Redis_ como um back-end de cache simples (estilo Memcached). |
---|
faust[yaml] : | para usar YAML e a biblioteca PyYAML em streams. |
---|
faust[fast] : | para instalar todas as extensões de aceleração C disponíveis no núcleo Faust. |
---|
faust[datadog] : | para usar o monitor Datadog Faust. |
---|---|
faust[statsd] : | para usar o monitor Statsd Faust. |
faust[uvloop] : | por usar Faust com uvloop . |
---|---|
faust[eventlet] : | para usar Faust com eventlet |
faust[debug] : | para usar aiomonitor para conectar e depurar um trabalhador Faust em execução. |
---|---|
faust[setproctitle] : | quando o módulo setproctitle estiver instalado, o trabalhador Faust o usará para definir um nome de processo melhor nas listagens ps / top . Também instalado com os pacotes fast e debug . |
Baixe a versão mais recente do Faust em http://pypi.org/project/faust
Você pode instalá-lo fazendo:
$ tar xvfz faust-0.0.0.tar.gz
$ cd faust-0.0.0
$ python setup.py build
# python setup.py install
O último comando deve ser executado como um usuário privilegiado se você não estiver usando um virtualenv.
Você pode instalar o snapshot mais recente do Faust usando o seguinte comando pip
:
$ pip install https://github.com/robinhood/faust/zipball/master#egg=faust
Sim! Use eventlet
como ponte para integração com asyncio
.
eventlet
Essa abordagem funciona com qualquer biblioteca Python de bloqueio que possa funcionar com eventlet
.
O uso eventlet
requer a instalação do módulo aioeventlet
, e você pode instalá-lo como um pacote junto com o Faust:
$ pip install -U faust[eventlet]
Então, para realmente usar o eventlet como loop de eventos, você deve usar o argumento -L <faust --loop>
para o programa faust
:
$ faust -L eventlet -A myproj worker -l info
ou adicione import mode.loop.eventlet
na parte superior do script do ponto de entrada:
#!/usr/bin/env python3
import mode . loop . eventlet # noqa
Aviso
É muito importante que isso esteja no topo do módulo e que seja executado antes de você importar as bibliotecas.
Sim! Use a ponte tornado.platform.asyncio
: http://www.tornadoweb.org/en/stable/asyncio.html
Sim! Use a implementação do reator asyncio
: https://twistedmatrix.com/documents/17.1.0/api/twisted.internet.asyncioreactor.html
Não. Faust requer Python 3.6 ou posterior, uma vez que utiliza intensamente recursos que foram introduzidos em Python 3.6 (anotações assíncronas, de espera, de tipo variável).
Pode ser necessário aumentar o limite do número máximo de arquivos abertos. A postagem a seguir explica como fazer isso no OS X: https://blog.dekstroza.io/ulimit-shenanigans-on-osx-el-capitan/
Faust suporta kafka com versão >= 0.10.
Para discussões sobre o uso, desenvolvimento e futuro do Faust, junte-se ao `fauststream`_ Slack.
Se você tiver alguma sugestão, relatório de bug ou aborrecimento, relate-o ao nosso rastreador de problemas em https://github.com/robinhood/faust/issues/
Este software está licenciado sob a nova licença BSD. Consulte o arquivo LICENSE
no diretório de distribuição superior para obter o texto completo da licença.
O desenvolvimento do Faust acontece no GitHub: https://github.com/robinhood/faust
Você é altamente encorajado a participar do desenvolvimento de Fausto.
Certifique-se de ler também a seção Contribuindo para Faust na documentação.
Espera-se que todos que interagem nas bases de código, rastreadores de problemas, salas de bate-papo e listas de e-mail do projeto sigam o Código de Conduta Faust.
Como contribuidores e mantenedores desses projetos, e no interesse de promover uma comunidade aberta e acolhedora, nos comprometemos a respeitar todas as pessoas que contribuem por meio de relatórios de problemas, publicação de solicitações de recursos, atualização de documentação, envio de pull requests ou patches e outras atividades.
Estamos empenhados em tornar a participação nestes projetos uma experiência livre de assédio para todos, independentemente do nível de experiência, género, identidade e expressão de género, orientação sexual, deficiência, aparência pessoal, tamanho corporal, raça, etnia, idade, religião ou nacionalidade.
Exemplos de comportamento inaceitável por parte dos participantes incluem:
Os mantenedores do projeto têm o direito e a responsabilidade de remover, editar ou rejeitar comentários, commits, códigos, edições de wiki, problemas e outras contribuições que não estejam alinhadas com este Código de Conduta. Ao adotar este Código de Conduta, os mantenedores do projeto comprometem-se a aplicar estes princípios de forma justa e consistente a todos os aspectos do gerenciamento deste projeto. Os mantenedores do projeto que não seguirem ou não aplicarem o Código de Conduta poderão ser removidos permanentemente da equipe do projeto.
Este código de conduta aplica-se tanto nos espaços do projeto como nos espaços públicos quando um indivíduo representa o projeto ou a sua comunidade.
Instâncias de comportamento abusivo, de assédio ou de outra forma inaceitável podem ser relatadas abrindo um problema ou entrando em contato com um ou mais mantenedores do projeto.
Este Código de Conduta é adaptado do Contributor Covenant, versão 1.2.0, disponível em http://contributor-covenant.org/version/1/2/0/.