Tutoriels | Documentation API | Discutez avec nous sur Slack ! |
---|---|---|
Fugue est une interface unifiée pour l'informatique distribuée qui permet aux utilisateurs d'exécuter du code Python, Pandas et SQL sur Spark, Dask et Ray avec un minimum de réécritures .
La fugue est le plus souvent utilisée pour :
Pour voir comment Fugue se compare à d'autres frameworks comme dbt, Arrow, Ibis, PySpark Pandas, consultez les comparaisons
L'API Fugue est un ensemble de fonctions capables de s'exécuter sur Pandas, Spark, Dask et Ray. La façon la plus simple d'utiliser Fugue est la fonction transform()
. Cela permet aux utilisateurs de paralléliser l'exécution d'une seule fonction en l'apportant à Spark, Dask ou Ray. Dans l'exemple ci-dessous, la fonction map_letter_to_food()
récupère un mappage et l'applique sur une colonne. Ce ne sont que Pandas et Python jusqu'à présent (sans Fugue).
import pandas as pd
from typing import Dict
input_df = pd . DataFrame ({ "id" :[ 0 , 1 , 2 ], "value" : ([ "A" , "B" , "C" ])})
map_dict = { "A" : "Apple" , "B" : "Banana" , "C" : "Carrot" }
def map_letter_to_food ( df : pd . DataFrame , mapping : Dict [ str , str ]) -> pd . DataFrame :
df [ "value" ] = df [ "value" ]. map ( mapping )
return df
Désormais, la fonction map_letter_to_food()
est apportée au moteur d'exécution Spark en appelant la fonction transform()
de Fugue. Le schema
de sortie et params
sont transmis à l'appel transform()
. Le schema
est nécessaire car il constitue une exigence pour les frameworks distribués. Un schéma de "*"
ci-dessous signifie que toutes les colonnes d'entrée sont dans la sortie.
from pyspark . sql import SparkSession
from fugue import transform
spark = SparkSession . builder . getOrCreate ()
sdf = spark . createDataFrame ( input_df )
out = transform ( sdf ,
map_letter_to_food ,
schema = "*" ,
params = dict ( mapping = map_dict ),
)
# out is a Spark DataFrame
out . show ()
+---+------+
| id| value |
+---+------+
| 0| Apple |
| 1|Banana |
| 2|Carrot |
+---+------+
from typing import Iterator , Union
from pyspark . sql . types import StructType
from pyspark . sql import DataFrame , SparkSession
spark_session = SparkSession . builder . getOrCreate ()
def mapping_wrapper ( dfs : Iterator [ pd . DataFrame ], mapping ):
for df in dfs :
yield map_letter_to_food ( df , mapping )
def run_map_letter_to_food ( input_df : Union [ DataFrame , pd . DataFrame ], mapping ):
# conversion
if isinstance ( input_df , pd . DataFrame ):
sdf = spark_session . createDataFrame ( input_df . copy ())
else :
sdf = input_df . copy ()
schema = StructType ( list ( sdf . schema . fields ))
return sdf . mapInPandas ( lambda dfs : mapping_wrapper ( dfs , mapping ),
schema = schema )
result = run_map_letter_to_food ( input_df , map_dict )
result . show ()
Cette syntaxe est plus simple, plus propre et plus maintenable que l'équivalent PySpark. Dans le même temps, aucune modification n’a été apportée à la fonction originale basée sur Pandas pour l’intégrer à Spark. Il est toujours utilisable sur les Pandas DataFrames. Fugue transform()
prend également en charge Dask et Ray comme moteurs d'exécution aux côtés du moteur par défaut basé sur Pandas.
L'API Fugue dispose d'une collection plus large de fonctions qui sont également compatibles avec Spark, Dask et Ray. Par exemple, nous pouvons utiliser load()
et save()
pour créer un workflow de bout en bout compatible avec Spark, Dask et Ray. Pour la liste complète des fonctions, consultez l'API de niveau supérieur
import fugue . api as fa
def run ( engine = None ):
with fa . engine_context ( engine ):
df = fa . load ( "/path/to/file.parquet" )
out = fa . transform ( df , map_letter_to_food , schema = "*" )
fa . save ( out , "/path/to/output_file.parquet" )
run () # runs on Pandas
run ( engine = "spark" ) # runs on Spark
run ( engine = "dask" ) # runs on Dask
Toutes les fonctions sous le contexte s'exécuteront sur le backend spécifié. Cela facilite le basculement entre l’exécution locale et l’exécution distribuée.
FugueSQL est un langage basé sur SQL capable d'exprimer des flux de données de bout en bout au-dessus de Pandas, Spark et Dask. La fonction map_letter_to_food()
ci-dessus est utilisée dans l'expression SQL ci-dessous. Voici comment utiliser une fonction définie par Python avec l'instruction SQL SELECT
standard.
from fugue . api import fugue_sql
import json
query = """
SELECT id, value
FROM input_df
TRANSFORM USING map_letter_to_food(mapping={{mapping}}) SCHEMA *
"""
map_dict_str = json . dumps ( map_dict )
# returns Pandas DataFrame
fugue_sql ( query , mapping = map_dict_str )
# returns Spark DataFrame
fugue_sql ( query , mapping = map_dict_str , engine = "spark" )
Fugue peut être installé via pip ou conda. Par exemple:
pip install fugue
Afin d'utiliser Fugue SQL, il est fortement recommandé d'installer le supplément sql
:
pip install fugue[sql]
Il dispose également des extras d'installation suivants :
Par exemple, un cas d'utilisation courant est :
pip install " fugue[duckdb,spark] "
Notez que si vous avez déjà installé Spark ou DuckDB indépendamment, Fugue est capable de les utiliser automatiquement sans installer les extras.
La meilleure façon de débuter avec Fugue est de suivre les tutoriels de 10 minutes :
Pour l'API de niveau supérieur, voir :
Les didacticiels peuvent également être exécutés dans un environnement de bloc-notes interactif via Binder ou Docker :
Notez qu'il fonctionne lentement sur Binder car la machine sur Binder n'est pas assez puissante pour un framework distribué tel que Spark. Les exécutions parallèles peuvent devenir séquentielles, de sorte que certains exemples de comparaison de performances ne vous donneront pas les chiffres corrects.
Alternativement, vous devriez obtenir des performances décentes en exécutant cette image Docker sur votre propre ordinateur :
docker run -p 8888:8888 fugueproject/tutorials:latest
Il existe une extension de bloc-notes pour FugueSQL qui permet aux utilisateurs d'utiliser la magie des cellules %%fsql
. L'extension fournit également une coloration syntaxique pour les cellules FugueSQL. Il fonctionne à la fois pour le notebook classique et pour Jupyter Lab. Plus de détails peuvent être trouvés dans les instructions d'installation.
En tant que couche d'abstraction, Fugue peut être utilisé de manière transparente avec de nombreux autres projets open source.
Moteurs Python :
Backends FugueSQL :
Fugue est disponible en backend ou peut s'intégrer aux projets suivants :
Les extensions tierces enregistrées (principalement pour Fugue SQL) incluent :
N'hésitez pas à nous envoyer un message sur Slack. Nous avons également des instructions de contribution.
Consultez certaines de nos dernières présentations et contenus de conférences. Pour une liste plus complète, consultez la page Contenu dans les didacticiels.