Tutorials | API-Dokumentation | Chatten Sie mit uns auf Slack! |
---|---|---|
Fugue ist eine einheitliche Schnittstelle für verteiltes Computing, die es Benutzern ermöglicht, Python-, Pandas- und SQL-Code auf Spark, Dask und Ray mit minimalem Umschreiben auszuführen .
Fuge wird am häufigsten verwendet für:
Um zu sehen, wie Fugue im Vergleich zu anderen Frameworks wie dbt, Arrow, Ibis, PySpark Pandas abschneidet, sehen Sie sich die Vergleiche an
Die Fugue-API ist eine Sammlung von Funktionen, die auf Pandas, Spark, Dask und Ray ausgeführt werden können. Die einfachste Möglichkeit, Fuge zu verwenden, ist die Funktion transform()
. Dadurch können Benutzer die Ausführung einer einzelnen Funktion parallelisieren, indem sie sie zu Spark, Dask oder Ray bringen. Im folgenden Beispiel nimmt die Funktion map_letter_to_food()
eine Zuordnung auf und wendet sie auf eine Spalte an. Dies sind bisher nur Pandas und Python (ohne Fuge).
import pandas as pd
from typing import Dict
input_df = pd . DataFrame ({ "id" :[ 0 , 1 , 2 ], "value" : ([ "A" , "B" , "C" ])})
map_dict = { "A" : "Apple" , "B" : "Banana" , "C" : "Carrot" }
def map_letter_to_food ( df : pd . DataFrame , mapping : Dict [ str , str ]) -> pd . DataFrame :
df [ "value" ] = df [ "value" ]. map ( mapping )
return df
Jetzt wird die Funktion map_letter_to_food()
durch Aufrufen der Funktion transform()
von Fugue zur Spark-Ausführungs-Engine gebracht. Das schema
und params
werden an den transform()
Aufruf übergeben. Das schema
wird benötigt, da es eine Voraussetzung für verteilte Frameworks ist. Ein Schema von "*"
unten bedeutet, dass alle Eingabespalten in der Ausgabe enthalten sind.
from pyspark . sql import SparkSession
from fugue import transform
spark = SparkSession . builder . getOrCreate ()
sdf = spark . createDataFrame ( input_df )
out = transform ( sdf ,
map_letter_to_food ,
schema = "*" ,
params = dict ( mapping = map_dict ),
)
# out is a Spark DataFrame
out . show ()
+---+------+
| id| value |
+---+------+
| 0| Apple |
| 1|Banana |
| 2|Carrot |
+---+------+
from typing import Iterator , Union
from pyspark . sql . types import StructType
from pyspark . sql import DataFrame , SparkSession
spark_session = SparkSession . builder . getOrCreate ()
def mapping_wrapper ( dfs : Iterator [ pd . DataFrame ], mapping ):
for df in dfs :
yield map_letter_to_food ( df , mapping )
def run_map_letter_to_food ( input_df : Union [ DataFrame , pd . DataFrame ], mapping ):
# conversion
if isinstance ( input_df , pd . DataFrame ):
sdf = spark_session . createDataFrame ( input_df . copy ())
else :
sdf = input_df . copy ()
schema = StructType ( list ( sdf . schema . fields ))
return sdf . mapInPandas ( lambda dfs : mapping_wrapper ( dfs , mapping ),
schema = schema )
result = run_map_letter_to_food ( input_df , map_dict )
result . show ()
Diese Syntax ist einfacher, sauberer und wartbarer als das PySpark-Äquivalent. Gleichzeitig wurden keine Änderungen an der ursprünglichen Pandas-basierten Funktion vorgenommen, um sie in Spark zu integrieren. Es ist weiterhin auf Pandas DataFrames verwendbar. Fugue transform()
unterstützt neben der standardmäßigen Pandas-basierten Engine auch Dask und Ray als Ausführungs-Engines.
Die Fugue-API verfügt über eine breitere Sammlung von Funktionen, die auch mit Spark, Dask und Ray kompatibel sind. Beispielsweise können wir load()
und save()
verwenden, um einen End-to-End-Workflow zu erstellen, der mit Spark, Dask und Ray kompatibel ist. Die vollständige Liste der Funktionen finden Sie in der Top Level API
import fugue . api as fa
def run ( engine = None ):
with fa . engine_context ( engine ):
df = fa . load ( "/path/to/file.parquet" )
out = fa . transform ( df , map_letter_to_food , schema = "*" )
fa . save ( out , "/path/to/output_file.parquet" )
run () # runs on Pandas
run ( engine = "spark" ) # runs on Spark
run ( engine = "dask" ) # runs on Dask
Alle Funktionen unterhalb des Kontexts werden auf dem angegebenen Backend ausgeführt. Dies erleichtert das Umschalten zwischen lokaler und verteilter Ausführung.
FugueSQL ist eine SQL-basierte Sprache, die End-to-End-Datenworkflows auf Pandas, Spark und Dask ausdrücken kann. Die obige Funktion map_letter_to_food()
wird im folgenden SQL-Ausdruck verwendet. So verwenden Sie eine von Python definierte Funktion zusammen mit der Standard-SQL SELECT
-Anweisung.
from fugue . api import fugue_sql
import json
query = """
SELECT id, value
FROM input_df
TRANSFORM USING map_letter_to_food(mapping={{mapping}}) SCHEMA *
"""
map_dict_str = json . dumps ( map_dict )
# returns Pandas DataFrame
fugue_sql ( query , mapping = map_dict_str )
# returns Spark DataFrame
fugue_sql ( query , mapping = map_dict_str , engine = "spark" )
Fugue kann über Pip oder Conda installiert werden. Zum Beispiel:
pip install fugue
Um Fugue SQL nutzen zu können, wird dringend empfohlen, das sql
Extra zu installieren:
pip install fugue[sql]
Es verfügt außerdem über die folgenden Installations-Extras:
Ein häufiger Anwendungsfall ist beispielsweise:
pip install " fugue[duckdb,spark] "
Beachten Sie, dass Fugue diese automatisch verwenden kann, ohne die Extras installieren zu müssen, wenn Sie Spark oder DuckDB bereits unabhängig voneinander installiert haben.
Der beste Einstieg in die Fuge ist das Durcharbeiten der 10-minütigen Tutorials:
Informationen zur API der obersten Ebene finden Sie unter:
Die Tutorials können auch in einer interaktiven Notebook-Umgebung über Binder oder Docker ausgeführt werden:
Beachten Sie, dass es auf Binder langsam läuft, da die Maschine auf Binder nicht leistungsstark genug für ein verteiltes Framework wie Spark ist. Parallele Ausführungen können sequentiell werden, sodass einige der Leistungsvergleichsbeispiele nicht die richtigen Zahlen liefern.
Alternativ sollten Sie eine anständige Leistung erzielen, indem Sie dieses Docker-Image auf Ihrem eigenen Computer ausführen:
docker run -p 8888:8888 fugueproject/tutorials:latest
Es gibt eine begleitende Notebook-Erweiterung für FugueSQL, mit der Benutzer die Zellenmagie %%fsql
verwenden können. Die Erweiterung bietet auch Syntaxhervorhebung für FugueSQL-Zellen. Es funktioniert sowohl für klassische Notebooks als auch für Jupyter Lab. Weitere Details finden Sie in der Installationsanleitung.
Da es sich um eine Abstraktionsschicht handelt, kann Fugue nahtlos mit vielen anderen Open-Source-Projekten verwendet werden.
Python-Backends:
FugueSQL-Backends:
Fugue ist als Backend verfügbar oder kann in die folgenden Projekte integriert werden:
Zu den registrierten Erweiterungen von Drittanbietern (hauptsächlich für Fugue SQL) gehören:
Schreiben Sie uns gerne eine Nachricht auf Slack. Wir haben auch Anleitungen zum Mitmachen.
Sehen Sie sich einige unserer neuesten Konferenzpräsentationen und -inhalte an. Eine vollständigere Liste finden Sie auf der Inhaltsseite in den Tutorials.