Cette bibliothèque est obsolète et n'est plus gérée ou prise en charge. Le projet communautaire actif actuel peut être trouvé sur https://github.com/faust-streaming/faust
Version: | 1.10.4 |
---|---|
Internet : | http://faust.readthedocs.io/ |
Télécharger: | http://pypi.org/project/faust |
Source: | http://github.com/robinhood/faust |
Mots-clés : | distribué, flux, asynchrone, traitement, données, file d'attente, gestion d'état |
# Python Streams
# Forever scalable event processing & in-memory durable K/V store;
# as a library w/ asyncio & static typing.
import faust
Faust est une bibliothèque de traitement de flux, transférant les idées de Kafka Streams vers Python.
Il est utilisé chez Robinhood pour créer des systèmes distribués hautes performances et des pipelines de données en temps réel qui traitent des milliards d'événements chaque jour.
Faust fournit à la fois le traitement des flux et le traitement des événements , partageant des similitudes avec des outils tels que Kafka Streams, Apache Spark/Storm/Samza/Flink,
Il n'utilise pas de DSL, c'est juste du Python ! Cela signifie que vous pouvez utiliser toutes vos bibliothèques Python préférées lors du traitement du flux : NumPy, PyTorch, Pandas, NLTK, Django, Flask, SQLAlchemy, ++
Faust nécessite Python 3.6 ou version ultérieure pour la nouvelle syntaxe async/wait et les annotations de type variable.
Voici un exemple de traitement d'un flux de commandes entrantes :
app = faust . App ( 'myapp' , broker = 'kafka://localhost' )
# Models describe how messages are serialized:
# {"account_id": "3fae-...", amount": 3}
class Order ( faust . Record ):
account_id : str
amount : int
@ app . agent ( value_type = Order )
async def order ( orders ):
async for order in orders :
# process infinite stream of orders.
print ( f'Order for { order . account_id } : { order . amount } ' )
Le décorateur d'agent définit un « processeur de flux » qui consomme essentiellement un sujet Kafka et fait quelque chose pour chaque événement qu'il reçoit.
L'agent est une fonction async def
, il peut donc également effectuer d'autres opérations de manière asynchrone, telles que des requêtes Web.
Ce système peut persister dans un état agissant comme une base de données. Les tables sont nommées magasins de clés/valeurs distribués que vous pouvez utiliser comme dictionnaires Python classiques.
Les tables sont stockées localement sur chaque machine à l'aide d'une base de données intégrée ultra rapide écrite en C++, appelée RocksDB.
Les tableaux peuvent également stocker des décomptes globaux qui sont éventuellement « fenêtrés » afin que vous puissiez suivre le « nombre de clics du dernier jour » ou le « nombre de clics au cours de la dernière heure ». Par exemple. Comme Kafka Streams, nous prenons en charge les fenêtres de temps tumultueuses, sautillantes et coulissantes, et les anciennes fenêtres peuvent expirer pour empêcher les données de se remplir.
Pour des raisons de fiabilité, nous utilisons un sujet Kafka comme « journal à écriture anticipée ». Chaque fois qu'une clé est modifiée, nous la publions dans le journal des modifications. Les nœuds de secours consomment ce journal des modifications pour conserver une réplique exacte des données et permettent une récupération instantanée en cas de panne de l'un des nœuds.
Pour l'utilisateur, une table n'est qu'un dictionnaire, mais les données sont conservées entre les redémarrages et répliquées sur les nœuds afin qu'en cas de basculement, d'autres nœuds puissent prendre le relais automatiquement.
Vous pouvez compter les pages vues par URL :
# data sent to 'clicks' topic sharded by URL key.
# e.g. key="http://example.com" value="1"
click_topic = app . topic ( 'clicks' , key_type = str , value_type = int )
# default value for missing URL will be 0 with `default=int`
counts = app . Table ( 'click_counts' , default = int )
@ app . agent ( click_topic )
async def count_click ( clicks ):
async for url , count in clicks . items ():
counts [ url ] += count
Les données envoyées au sujet Kafka sont partitionnées, ce qui signifie que les clics seront partagés par URL de telle sorte que chaque décompte pour la même URL soit transmis à la même instance de travail Faust.
Faust prend en charge tout type de données de flux : octets, Unicode et structures sérialisées, mais est également livré avec des « Modèles » qui utilisent la syntaxe Python moderne pour décrire comment les clés et les valeurs des flux sont sérialisées :
# Order is a json serialized dictionary,
# having these fields:
class Order ( faust . Record ):
account_id : str
product_id : str
price : float
quantity : float = 1.0
orders_topic = app . topic ( 'orders' , key_type = str , value_type = Order )
@ app . agent ( orders_topic )
async def process_order ( orders ):
async for order in orders :
# process each order using regular Python
total_price = order . price * order . quantity
await send_order_received_email ( order . account_id , order )
Faust est typé statiquement, en utilisant le vérificateur de type mypy
, afin que vous puissiez profiter des types statiques lors de l'écriture d'applications.
Le code source de Faust est petit, bien organisé et constitue une bonne ressource pour apprendre la mise en œuvre de Kafka Streams.
Faust est extrêmement simple à utiliser. Pour commencer à utiliser d'autres solutions de traitement de flux, vous avez des projets Hello World et des exigences d'infrastructure complexes. Faust ne nécessite que Kafka, le reste n'est que Python, donc si vous connaissez Python, vous pouvez déjà utiliser Faust pour effectuer du traitement de flux, et il peut s'intégrer à presque tout.
Voici l'une des applications les plus simples que vous puissiez réaliser :
importer des fautes salutation de classe (faust.Record): nom_de : str to_name : str app = faust.App('hello-app', courtier='kafka://localhost') topic = app.topic('bonjour-topic', value_type=Salutation) @app.agent (sujet) async def bonjour (salutations) : asynchrone pour les salutations dans les salutations : print(f'Bonjour de {greeting.from_name} à {greeting.to_name}') @app.timer(intervalle=1.0) async def example_sender (application): attendre bonjour.envoyer( value=Salutation(from_name='Faust', to_name='vous'), ) si __name__ == '__main__' : app.main()
Vous êtes probablement un peu intimidé par les mots-clés async et wait, mais vous n'avez pas besoin de savoir comment fonctionne asyncio
pour utiliser Faust : imitez simplement les exemples, et tout ira bien.
L'exemple d'application démarre deux tâches : l'une traite un flux, l'autre est un thread d'arrière-plan qui envoie des événements à ce flux. Dans une application réelle, votre système publiera des événements dans des sujets Kafka à partir desquels vos processeurs peuvent consommer, et le thread d'arrière-plan n'est nécessaire que pour alimenter en données notre exemple.
Vous pouvez installer Faust soit via le Python Package Index (PyPI), soit à partir des sources.
Pour installer en utilisant pip :
$ pip install -U faust
Faust définit également un groupe d'extensions setuptools
qui peuvent être utilisées pour installer Faust et les dépendances pour une fonctionnalité donnée.
Vous pouvez les spécifier dans vos exigences ou sur la ligne de commande pip
en utilisant des crochets. Séparez plusieurs bundles à l'aide d'une virgule :
$ pip install " faust[rocksdb] "
$ pip install " faust[rocksdb,uvloop,fast,redis] "
Les forfaits suivants sont disponibles :
faust[rocksdb] : | pour utiliser RocksDB pour stocker l'état de la table Faust. Recommandé en production. |
---|
faust[redis] : | pour utiliser Redis_ comme simple backend de mise en cache (style Memcached). |
---|
faust[yaml] : | pour utiliser YAML et la bibliothèque PyYAML dans les flux. |
---|
faust[fast] : | pour installer toutes les extensions d'accélération C disponibles sur le noyau Faust. |
---|
faust[datadog] : | pour l'utilisation du moniteur Datadog Faust. |
---|---|
faust[statsd] : | pour utiliser le moniteur Statsd Faust. |
faust[uvloop] : | pour utiliser Faust avec uvloop . |
---|---|
faust[eventlet] : | pour utiliser Faust avec eventlet |
faust[debug] : | pour avoir utilisé aiomonitor pour connecter et déboguer un travailleur Faust en cours d'exécution. |
---|---|
faust[setproctitle] : | lorsque le module setproctitle est installé, le travailleur Faust l'utilisera pour définir un nom de processus plus agréable dans les listes ps / top . Également installé avec les bundles fast et debug . |
Téléchargez la dernière version de Faust depuis http://pypi.org/project/faust
Vous pouvez l'installer en faisant :
$ tar xvfz faust-0.0.0.tar.gz
$ cd faust-0.0.0
$ python setup.py build
# python setup.py install
La dernière commande doit être exécutée en tant qu'utilisateur privilégié si vous n'utilisez pas actuellement de virtualenv.
Vous pouvez installer le dernier instantané de Faust à l'aide de la commande pip
suivante :
$ pip install https://github.com/robinhood/faust/zipball/master#egg=faust
Oui! Utilisez eventlet
comme pont pour intégrer asyncio
.
eventlet
Cette approche fonctionne avec n'importe quelle bibliothèque Python bloquante pouvant fonctionner avec eventlet
.
L'utilisation eventlet
nécessite l'installation du module aioeventlet
, et vous pouvez l'installer sous forme de bundle avec Faust :
$ pip install -U faust[eventlet]
Ensuite, pour utiliser réellement eventlet comme boucle d'événement, vous devez soit utiliser l'argument -L <faust --loop>
du programme faust
:
$ faust -L eventlet -A myproj worker -l info
ou ajoutez import mode.loop.eventlet
en haut de votre script de point d'entrée :
#!/usr/bin/env python3
import mode . loop . eventlet # noqa
Avertissement
Il est très important que ce soit tout en haut du module et qu'il s'exécute avant d'importer les bibliothèques.
Oui! Utilisez le pont tornado.platform.asyncio
: http://www.tornadoweb.org/en/stable/asyncio.html
Oui! Utilisez l'implémentation du réacteur asyncio
: https://twistedmatrix.com/documents/17.1.0/api/twisted.internet.asyncioreactor.html
Non. Faust nécessite Python 3.6 ou version ultérieure, car il utilise fortement les fonctionnalités introduites dans Python 3.6 (async, wait, annotations de type variable).
Vous devrez peut-être augmenter la limite du nombre maximum de fichiers ouverts. L'article suivant explique comment procéder sous OS X : https://blog.dekstroza.io/ulimit-shenanigans-on-osx-el-capitan/
Faust prend en charge kafka avec la version >= 0.10.
Pour des discussions sur l'utilisation, le développement et l'avenir de Faust, veuillez rejoindre le `fauststream`_ Slack.
Si vous avez des suggestions, des rapports de bogues ou des désagréments, veuillez les signaler à notre outil de suivi des problèmes à l'adresse https://github.com/robinhood/faust/issues/
Ce logiciel est sous licence New BSD. Consultez le fichier LICENSE
dans le répertoire de distribution principal pour le texte complet de la licence.
Le développement de Faust se déroule sur GitHub : https://github.com/robinhood/faust
Vous êtes fortement encouragé à participer au développement de Faust.
Assurez-vous également de lire la section Contribuer à Faust dans la documentation.
Toute personne interagissant dans les bases de code, les outils de suivi des problèmes, les forums de discussion et les listes de diffusion du projet doit suivre le code de conduite de Faust.
En tant que contributeurs et responsables de ces projets, et dans l'intérêt de favoriser une communauté ouverte et accueillante, nous nous engageons à respecter toutes les personnes qui contribuent en signalant des problèmes, en publiant des demandes de fonctionnalités, en mettant à jour la documentation, en soumettant des demandes d'extraction ou de correctifs et d'autres activités.
Nous nous engageons à faire de la participation à ces projets une expérience sans harcèlement pour tous, quels que soient le niveau d'expérience, le sexe, l'identité et l'expression de genre, l'orientation sexuelle, le handicap, l'apparence personnelle, la taille, la race, l'origine ethnique, l'âge, la religion ou nationalité.
Voici des exemples de comportements inacceptables de la part des participants :
Les responsables du projet ont le droit et la responsabilité de supprimer, modifier ou rejeter les commentaires, les validations, le code, les modifications du wiki, les problèmes et autres contributions qui ne sont pas alignés sur ce code de conduite. En adoptant ce code de conduite, les responsables du projet s'engagent à appliquer ces principes de manière équitable et cohérente à tous les aspects de la gestion de ce projet. Les responsables du projet qui ne respectent pas ou n'appliquent pas le code de conduite peuvent être définitivement exclus de l'équipe de projet.
Ce code de conduite s'applique à la fois dans les espaces du projet et dans les espaces publics lorsqu'un individu représente le projet ou sa communauté.
Les cas de comportement abusif, de harcèlement ou autrement inacceptable peuvent être signalés en ouvrant un problème ou en contactant un ou plusieurs responsables du projet.
Ce code de conduite est adapté du Contributor Covenant, version 1.2.0 disponible sur http://contributor-covenant.org/version/1/2/0/.