Tutoriales | Documentación API | ¡Chatea con nosotros en Slack! |
---|---|---|
Fugue es una interfaz unificada para informática distribuida que permite a los usuarios ejecutar código Python, Pandas y SQL en Spark, Dask y Ray con reescrituras mínimas .
La fuga se usa más comúnmente para:
Para ver cómo se compara Fugue con otros frameworks como dbt, Arrow, Ibis, PySpark Pandas, consulte las comparaciones.
La API Fugue es una colección de funciones que pueden ejecutarse en Pandas, Spark, Dask y Ray. La forma más sencilla de utilizar Fugue es la función transform()
. Esto permite a los usuarios paralelizar la ejecución de una única función llevándola a Spark, Dask o Ray. En el siguiente ejemplo, la función map_letter_to_food()
toma un mapeo y lo aplica en una columna. Hasta ahora, esto es solo Pandas y Python (sin Fugue).
import pandas as pd
from typing import Dict
input_df = pd . DataFrame ({ "id" :[ 0 , 1 , 2 ], "value" : ([ "A" , "B" , "C" ])})
map_dict = { "A" : "Apple" , "B" : "Banana" , "C" : "Carrot" }
def map_letter_to_food ( df : pd . DataFrame , mapping : Dict [ str , str ]) -> pd . DataFrame :
df [ "value" ] = df [ "value" ]. map ( mapping )
return df
Ahora, la función map_letter_to_food()
se lleva al motor de ejecución de Spark invocando la función transform()
de Fugue. El schema
de salida y params
se pasan a la llamada transform()
. El schema
es necesario porque es un requisito para los marcos distribuidos. Un esquema de "*"
a continuación significa que todas las columnas de entrada están en la salida.
from pyspark . sql import SparkSession
from fugue import transform
spark = SparkSession . builder . getOrCreate ()
sdf = spark . createDataFrame ( input_df )
out = transform ( sdf ,
map_letter_to_food ,
schema = "*" ,
params = dict ( mapping = map_dict ),
)
# out is a Spark DataFrame
out . show ()
+---+------+
| id| value |
+---+------+
| 0| Apple |
| 1|Banana |
| 2|Carrot |
+---+------+
from typing import Iterator , Union
from pyspark . sql . types import StructType
from pyspark . sql import DataFrame , SparkSession
spark_session = SparkSession . builder . getOrCreate ()
def mapping_wrapper ( dfs : Iterator [ pd . DataFrame ], mapping ):
for df in dfs :
yield map_letter_to_food ( df , mapping )
def run_map_letter_to_food ( input_df : Union [ DataFrame , pd . DataFrame ], mapping ):
# conversion
if isinstance ( input_df , pd . DataFrame ):
sdf = spark_session . createDataFrame ( input_df . copy ())
else :
sdf = input_df . copy ()
schema = StructType ( list ( sdf . schema . fields ))
return sdf . mapInPandas ( lambda dfs : mapping_wrapper ( dfs , mapping ),
schema = schema )
result = run_map_letter_to_food ( input_df , map_dict )
result . show ()
Esta sintaxis es más simple, limpia y fácil de mantener que el equivalente de PySpark. Al mismo tiempo, no se realizaron modificaciones en la función original basada en Pandas para llevarla a Spark. Todavía se puede utilizar en Pandas DataFrames. Fugue transform()
también admite Dask y Ray como motores de ejecución junto con el motor predeterminado basado en Pandas.
La API Fugue tiene una colección más amplia de funciones que también son compatibles con Spark, Dask y Ray. Por ejemplo, podemos usar load()
y save()
para crear un flujo de trabajo de un extremo a otro compatible con Spark, Dask y Ray. Para obtener la lista completa de funciones, consulte la API de nivel superior
import fugue . api as fa
def run ( engine = None ):
with fa . engine_context ( engine ):
df = fa . load ( "/path/to/file.parquet" )
out = fa . transform ( df , map_letter_to_food , schema = "*" )
fa . save ( out , "/path/to/output_file.parquet" )
run () # runs on Pandas
run ( engine = "spark" ) # runs on Spark
run ( engine = "dask" ) # runs on Dask
Todas las funciones debajo del contexto se ejecutarán en el backend especificado. Esto facilita alternar entre ejecución local y ejecución distribuida.
FugueSQL es un lenguaje basado en SQL capaz de expresar flujos de trabajo de datos de un extremo a otro sobre Pandas, Spark y Dask. La función map_letter_to_food()
anterior se utiliza en la expresión SQL siguiente. A continuación se explica cómo utilizar una función definida por Python junto con la instrucción SQL SELECT
estándar.
from fugue . api import fugue_sql
import json
query = """
SELECT id, value
FROM input_df
TRANSFORM USING map_letter_to_food(mapping={{mapping}}) SCHEMA *
"""
map_dict_str = json . dumps ( map_dict )
# returns Pandas DataFrame
fugue_sql ( query , mapping = map_dict_str )
# returns Spark DataFrame
fugue_sql ( query , mapping = map_dict_str , engine = "spark" )
La fuga se puede instalar mediante pip o conda. Por ejemplo:
pip install fugue
Para utilizar Fugue SQL, se recomienda encarecidamente instalar el sql
extra:
pip install fugue[sql]
Además dispone de los siguientes extras de instalación:
Por ejemplo, un caso de uso común es:
pip install " fugue[duckdb,spark] "
Tenga en cuenta que si ya instaló Spark o DuckDB de forma independiente, Fugue puede usarlos automáticamente sin instalar extras.
La mejor manera de comenzar con Fugue es seguir los tutoriales de 10 minutos:
Para la API de nivel superior, consulte:
Los tutoriales también se pueden ejecutar en un entorno de cuaderno interactivo a través de Binder o Docker:
Tenga en cuenta que funciona lento en Binder porque la máquina en Binder no es lo suficientemente potente para un marco distribuido como Spark. Las ejecuciones paralelas pueden volverse secuenciales, por lo que algunos de los ejemplos de comparación de rendimiento no le darán los números correctos.
Alternativamente, deberías obtener un rendimiento decente ejecutando esta imagen de Docker en tu propia máquina:
docker run -p 8888:8888 fugueproject/tutorials:latest
Hay una extensión de cuaderno adjunta para FugueSQL que permite a los usuarios usar la magia celular %%fsql
. La extensión también proporciona resaltado de sintaxis para celdas FugueSQL. Funciona tanto para portátiles clásicos como para Jupyter Lab. Se pueden encontrar más detalles en las instrucciones de instalación.
Al ser una capa de abstracción, Fugue se puede utilizar sin problemas con muchos otros proyectos de código abierto.
Motores de Python:
Motores de FugueSQL:
Fugue está disponible como backend o puede integrarse con los siguientes proyectos:
Las extensiones registradas de terceros (principalmente para Fugue SQL) incluyen:
No dudes en enviarnos un mensaje por Slack. También tenemos instrucciones de contribución.
Vea algunas de las presentaciones y el contenido de nuestras últimas conferencias. Para obtener una lista más completa, consulte la página de Contenido en los tutoriales.