Diese Bibliothek ist veraltet und wird nicht mehr verwaltet oder unterstützt. Das aktuell aktive Community-Projekt finden Sie unter https://github.com/faust-streaming/faust
Version: | 1.10.4 |
---|---|
Web: | http://faust.readthedocs.io/ |
Herunterladen: | http://pypi.org/project/faust |
Quelle: | http://github.com/robinhood/faust |
Schlüsselwörter: | verteilt, Stream, asynchron, Verarbeitung, Daten, Warteschlange, Zustandsverwaltung |
# Python Streams
# Forever scalable event processing & in-memory durable K/V store;
# as a library w/ asyncio & static typing.
import faust
Faust ist eine Stream-Verarbeitungsbibliothek, die die Ideen von Kafka Streams auf Python portiert.
Bei Robinhood wird es zum Aufbau leistungsstarker verteilter Systeme und Echtzeit-Datenpipelines verwendet, die täglich Milliarden von Ereignissen verarbeiten.
Faust bietet sowohl Stream-Verarbeitung als auch Ereignisverarbeitung und weist Ähnlichkeiten mit Tools wie Kafka Streams, Apache Spark/Storm/Samza/Flink auf.
Es verwendet kein DSL, es ist nur Python! Das bedeutet, dass Sie bei der Stream-Verarbeitung alle Ihre bevorzugten Python-Bibliotheken verwenden können: NumPy, PyTorch, Pandas, NLTK, Django, Flask, SQLAlchemy, ++
Faust erfordert Python 3.6 oder höher für die neue async/await-Syntax und Variablentypanmerkungen.
Hier ist ein Beispiel für die Verarbeitung eines Stroms eingehender Bestellungen:
app = faust . App ( 'myapp' , broker = 'kafka://localhost' )
# Models describe how messages are serialized:
# {"account_id": "3fae-...", amount": 3}
class Order ( faust . Record ):
account_id : str
amount : int
@ app . agent ( value_type = Order )
async def order ( orders ):
async for order in orders :
# process infinite stream of orders.
print ( f'Order for { order . account_id } : { order . amount } ' )
Der Agent-Dekorator definiert einen „Stream-Prozessor“, der im Wesentlichen aus einem Kafka-Thema konsumiert und für jedes empfangene Ereignis etwas tut.
Der Agent ist eine async def
Definitionsfunktion und kann daher auch andere Vorgänge asynchron ausführen, z. B. Webanfragen.
Dieses System kann den Status beibehalten und sich wie eine Datenbank verhalten. Tabellen sind benannte verteilte Schlüssel-/Wertspeicher, die Sie als reguläre Python-Wörterbücher verwenden können.
Tabellen werden lokal auf jedem Computer mithilfe einer superschnellen, in C++ geschriebenen eingebetteten Datenbank namens RocksDB gespeichert.
Tabellen können auch aggregierte Zählungen speichern, die optional „gefenstert“ werden, sodass Sie die „Anzahl der Klicks vom letzten Tag“ oder die „Anzahl der Klicks in der letzten Stunde“ verfolgen können. Zum Beispiel. Wie Kafka Streams unterstützen wir Tumbling-, Hopping- und Sliding-Zeitfenster, und alte Fenster können abgelaufen werden, um zu verhindern, dass sich Daten füllen.
Aus Gründen der Zuverlässigkeit verwenden wir ein Kafka-Thema als „Write-Ahead-Log“. Immer wenn ein Schlüssel geändert wird, veröffentlichen wir dies im Änderungsprotokoll. Standby-Knoten nutzen dieses Änderungsprotokoll, um eine exakte Kopie der Daten zu behalten und eine sofortige Wiederherstellung zu ermöglichen, falls einer der Knoten ausfällt.
Für den Benutzer ist eine Tabelle nur ein Wörterbuch, aber die Daten werden zwischen Neustarts beibehalten und über Knoten hinweg repliziert, sodass bei einem Failover andere Knoten automatisch übernehmen können.
Sie können Seitenaufrufe nach URL zählen:
# data sent to 'clicks' topic sharded by URL key.
# e.g. key="http://example.com" value="1"
click_topic = app . topic ( 'clicks' , key_type = str , value_type = int )
# default value for missing URL will be 0 with `default=int`
counts = app . Table ( 'click_counts' , default = int )
@ app . agent ( click_topic )
async def count_click ( clicks ):
async for url , count in clicks . items ():
counts [ url ] += count
Die an das Kafka-Thema gesendeten Daten sind partitioniert, was bedeutet, dass die Klicks nach URL so aufgeteilt werden, dass jede Zählung für dieselbe URL an dieselbe Faust-Worker-Instanz übermittelt wird.
Faust unterstützt alle Arten von Stream-Daten: Bytes, Unicode und serialisierte Strukturen, verfügt aber auch über „Modelle“, die moderne Python-Syntax verwenden, um zu beschreiben, wie Schlüssel und Werte in Streams serialisiert werden:
# Order is a json serialized dictionary,
# having these fields:
class Order ( faust . Record ):
account_id : str
product_id : str
price : float
quantity : float = 1.0
orders_topic = app . topic ( 'orders' , key_type = str , value_type = Order )
@ app . agent ( orders_topic )
async def process_order ( orders ):
async for order in orders :
# process each order using regular Python
total_price = order . price * order . quantity
await send_order_received_email ( order . account_id , order )
Faust wird mithilfe des mypy
Typprüfers statisch typisiert, sodass Sie beim Schreiben von Anwendungen die Vorteile statischer Typen nutzen können.
Der Faust-Quellcode ist klein, gut organisiert und dient als gute Ressource zum Erlernen der Implementierung von Kafka Streams.
Faust ist äußerst einfach zu bedienen. Um andere Stream-Verarbeitungslösungen nutzen zu können, müssen Sie komplizierte Hello-World-Projekte und Infrastrukturanforderungen erfüllen. Faust benötigt nur Kafka, der Rest ist nur Python. Wenn Sie also Python kennen, können Sie Faust bereits für die Stream-Verarbeitung verwenden und es kann in nahezu alles integriert werden.
Hier ist eine der einfacheren Anwendungen, die Sie erstellen können:
Faust importieren Klassenbegrüßung(faust.Record): from_name: str to_name: str app = faust.App('hello-app', Broker='kafka://localhost') topic = app.topic('hello-topic', value_type=Greeting) @app.agent(thema) async def Hallo (Grüße): asynchron für Begrüßung in Begrüßungen: print(f'Hallo von {greeting.from_name} an {greeting.to_name}') @app.timer(Intervall=1,0) async def example_sender(app): warte auf hallo.send( value=Gruß(from_name='Faust', to_name='you'), ) if __name__ == '__main__': app.main()
Wahrscheinlich sind Sie von den Schlüsselwörtern „async“ und „await“ etwas eingeschüchtert, aber Sie müssen nicht wissen, wie asyncio
funktioniert, um Faust zu verwenden: ahmen Sie einfach die Beispiele nach, und alles wird gut.
Die Beispielanwendung startet zwei Aufgaben: Eine verarbeitet einen Stream, die andere ist ein Hintergrundthread, der Ereignisse an diesen Stream sendet. In einer realen Anwendung veröffentlicht Ihr System Ereignisse zu Kafka-Themen, die Ihre Prozessoren nutzen können, und der Hintergrundthread wird nur benötigt, um Daten in unser Beispiel einzuspeisen.
Sie können Faust entweder über den Python Package Index (PyPI) oder aus der Quelle installieren.
So installieren Sie mit pip:
$ pip install -U faust
Faust definiert außerdem eine Gruppe von setuptools
Erweiterungen, die zum Installieren von Faust und den Abhängigkeiten für eine bestimmte Funktion verwendet werden können.
Sie können diese in Ihren Anforderungen oder in der pip
-Befehlszeile angeben, indem Sie Klammern verwenden. Trennen Sie mehrere Bundles durch Komma:
$ pip install " faust[rocksdb] "
$ pip install " faust[rocksdb,uvloop,fast,redis] "
Folgende Pakete sind verfügbar:
faust[rocksdb] : | für die Verwendung von RocksDB zum Speichern des Faust-Tabellenstatus. Empfohlen in der Produktion. |
---|
faust[redis] : | für die Verwendung von Redis_ als einfaches Caching-Backend (Memcached-Stil). |
---|
faust[yaml] : | für die Verwendung von YAML und der PyYAML -Bibliothek in Streams. |
---|
faust[fast] : | zur Installation aller verfügbaren C-Speedup-Erweiterungen für den Faust-Kern. |
---|
faust[datadog] : | zur Verwendung des Datadog Faust Monitors. |
---|---|
faust[statsd] : | zur Verwendung des Statsd Faust Monitors. |
faust[uvloop] : | für die Verwendung von Faust mit uvloop . |
---|---|
faust[eventlet] : | für die Verwendung von Faust mit eventlet |
faust[debug] : | für die Verwendung aiomonitor zum Verbinden und Debuggen eines laufenden Faust-Workers. |
---|---|
faust[setproctitle] : | Wenn das setproctitle -Modul installiert ist, verwendet der Faust-Worker es, um einen schöneren Prozessnamen in ps / top -Listen festzulegen. Wird auch mit den fast und debug Bundles installiert. |
Laden Sie die neueste Version von Faust von http://pypi.org/project/faust herunter
Sie können es folgendermaßen installieren:
$ tar xvfz faust-0.0.0.tar.gz
$ cd faust-0.0.0
$ python setup.py build
# python setup.py install
Der letzte Befehl muss als privilegierter Benutzer ausgeführt werden, wenn Sie derzeit keine virtuelle Umgebung verwenden.
Sie können den neuesten Snapshot von Faust mit dem folgenden pip
-Befehl installieren:
$ pip install https://github.com/robinhood/faust/zipball/master#egg=faust
Ja! Verwenden Sie eventlet
als Brücke zur Integration mit asyncio
.
eventlet
verwenden Dieser Ansatz funktioniert mit jeder blockierenden Python-Bibliothek, die mit eventlet
arbeiten kann.
Für die Verwendung von eventlet
müssen Sie das Modul aioeventlet
installieren. Dieses können Sie als Bundle zusammen mit Faust installieren:
$ pip install -U faust[eventlet]
Um Eventlet dann tatsächlich als Ereignisschleife zu verwenden, müssen Sie entweder das Argument -L <faust --loop>
für das faust
-Programm verwenden:
$ faust -L eventlet -A myproj worker -l info
oder fügen Sie import mode.loop.eventlet
oben in Ihrem Einstiegspunktskript hinzu:
#!/usr/bin/env python3
import mode . loop . eventlet # noqa
Warnung
Es ist sehr wichtig, dass dies ganz oben im Modul steht und ausgeführt wird, bevor Sie Bibliotheken importieren.
Ja! Verwenden Sie die tornado.platform.asyncio
-Bridge: http://www.tornadoweb.org/en/stable/asyncio.html
Ja! Verwenden Sie die asyncio
Reaktor-Implementierung: https://twistedmatrix.com/documents/17.1.0/api/twisted.internet.asyncioreactor.html
Nein. Faust erfordert Python 3.6 oder höher, da es in hohem Maße Funktionen nutzt, die in Python 3.6 eingeführt wurden (async,wait,Variablentypanmerkungen).
Möglicherweise müssen Sie den Grenzwert für die maximale Anzahl geöffneter Dateien erhöhen. Der folgende Beitrag erklärt, wie das unter OS X geht: https://blog.dekstroza.io/ulimit-shenanigans-on-osx-el-capitan/
Faust unterstützt Kafka mit Version >= 0.10.
Für Diskussionen über die Nutzung, Entwicklung und Zukunft von Faust treten Sie bitte dem „fauststream“_ Slack bei.
Wenn Sie Vorschläge, Fehlerberichte oder Ärgernisse haben, melden Sie diese bitte an unseren Issue-Tracker unter https://github.com/robinhood/faust/issues/
Diese Software ist unter der New BSD License lizenziert. Den vollständigen Lizenztext finden Sie in der Datei LICENSE
im obersten Distributionsverzeichnis.
Die Entwicklung von Faust findet auf GitHub statt: https://github.com/robinhood/faust
Wir laden Sie herzlich ein, sich an der Entwicklung von Faust zu beteiligen.
Lesen Sie unbedingt auch den Abschnitt „Beitrag zu Faust“ in der Dokumentation.
Von jedem, der in den Codebasen, Issue-Trackern, Chatrooms und Mailinglisten des Projekts interagiert, wird erwartet, dass er den Faust-Verhaltenskodex befolgt.
Als Mitwirkende und Betreuer dieser Projekte und im Interesse der Förderung einer offenen und einladenden Community verpflichten wir uns, alle Menschen zu respektieren, die durch das Melden von Problemen, das Posten von Funktionsanfragen, das Aktualisieren der Dokumentation, das Einreichen von Pull-Requests oder Patches und andere Aktivitäten einen Beitrag leisten.
Wir setzen uns dafür ein, dass die Teilnahme an diesen Projekten für alle ein belästigungsfreies Erlebnis wird, unabhängig von Erfahrungsstand, Geschlecht, Geschlechtsidentität und -ausdruck, sexueller Orientierung, Behinderung, persönlichem Aussehen, Körpergröße, Rasse, ethnischer Zugehörigkeit, Alter, Religion usw Nationalität.
Beispiele für inakzeptables Verhalten von Teilnehmern sind:
Projektbetreuer haben das Recht und die Verantwortung, Kommentare, Commits, Code, Wiki-Änderungen, Probleme und andere Beiträge, die nicht mit diesem Verhaltenskodex übereinstimmen, zu entfernen, zu bearbeiten oder abzulehnen. Durch die Übernahme dieses Verhaltenskodex verpflichten sich Projektbetreuer dazu, diese Grundsätze in allen Aspekten der Verwaltung dieses Projekts fair und konsequent anzuwenden. Projektbetreuer, die den Verhaltenskodex nicht befolgen oder durchsetzen, können dauerhaft aus dem Projektteam entfernt werden.
Dieser Verhaltenskodex gilt sowohl innerhalb von Projekträumen als auch im öffentlichen Raum, wenn eine Einzelperson das Projekt oder seine Gemeinschaft vertritt.
Fälle von missbräuchlichem, belästigendem oder anderweitig inakzeptablem Verhalten können gemeldet werden, indem Sie ein Problem eröffnen oder einen oder mehrere Projektbetreuer kontaktieren.
Dieser Verhaltenskodex basiert auf dem Contributor Covenant, Version 1.2.0, verfügbar unter http://contributor-covenant.org/version/1/2/0/.