Tutoriais | Documentação da API | Converse conosco no Slack! |
---|---|---|
Fugue é uma interface unificada para computação distribuída que permite aos usuários executar código Python, Pandas e SQL em Spark, Dask e Ray com reescritas mínimas .
A fuga é mais comumente usada para:
Para ver como o Fugue se compara a outros frameworks como dbt, Arrow, Ibis, PySpark Pandas, veja as comparações
A API Fugue é uma coleção de funções que podem ser executadas em Pandas, Spark, Dask e Ray. A maneira mais simples de usar Fugue é a função transform()
. Isso permite que os usuários paralelizem a execução de uma única função trazendo-a para Spark, Dask ou Ray. No exemplo abaixo, a função map_letter_to_food()
pega um mapeamento e o aplica em uma coluna. Até agora são apenas Pandas e Python (sem Fuga).
import pandas as pd
from typing import Dict
input_df = pd . DataFrame ({ "id" :[ 0 , 1 , 2 ], "value" : ([ "A" , "B" , "C" ])})
map_dict = { "A" : "Apple" , "B" : "Banana" , "C" : "Carrot" }
def map_letter_to_food ( df : pd . DataFrame , mapping : Dict [ str , str ]) -> pd . DataFrame :
df [ "value" ] = df [ "value" ]. map ( mapping )
return df
Agora, a função map_letter_to_food()
é trazida para o mecanismo de execução do Spark invocando a função transform()
do Fugue. O schema
de saída e params
são passados para a chamada transform()
. O schema
é necessário porque é um requisito para estruturas distribuídas. Um esquema de "*"
abaixo significa que todas as colunas de entrada estão na saída.
from pyspark . sql import SparkSession
from fugue import transform
spark = SparkSession . builder . getOrCreate ()
sdf = spark . createDataFrame ( input_df )
out = transform ( sdf ,
map_letter_to_food ,
schema = "*" ,
params = dict ( mapping = map_dict ),
)
# out is a Spark DataFrame
out . show ()
+---+------+
| id| value |
+---+------+
| 0| Apple |
| 1|Banana |
| 2|Carrot |
+---+------+
from typing import Iterator , Union
from pyspark . sql . types import StructType
from pyspark . sql import DataFrame , SparkSession
spark_session = SparkSession . builder . getOrCreate ()
def mapping_wrapper ( dfs : Iterator [ pd . DataFrame ], mapping ):
for df in dfs :
yield map_letter_to_food ( df , mapping )
def run_map_letter_to_food ( input_df : Union [ DataFrame , pd . DataFrame ], mapping ):
# conversion
if isinstance ( input_df , pd . DataFrame ):
sdf = spark_session . createDataFrame ( input_df . copy ())
else :
sdf = input_df . copy ()
schema = StructType ( list ( sdf . schema . fields ))
return sdf . mapInPandas ( lambda dfs : mapping_wrapper ( dfs , mapping ),
schema = schema )
result = run_map_letter_to_food ( input_df , map_dict )
result . show ()
Essa sintaxe é mais simples, mais limpa e mais fácil de manter do que o equivalente do PySpark. Ao mesmo tempo, nenhuma edição foi feita na função original baseada no Pandas para trazê-la para o Spark. Ainda pode ser usado em Pandas DataFrames. Fugue transform()
também suporta Dask e Ray como mecanismos de execução junto com o mecanismo padrão baseado em Pandas.
A API Fugue possui uma coleção mais ampla de funções que também são compatíveis com Spark, Dask e Ray. Por exemplo, podemos usar load()
e save()
para criar um fluxo de trabalho ponta a ponta compatível com Spark, Dask e Ray. Para a lista completa de funções, consulte a API de nível superior
import fugue . api as fa
def run ( engine = None ):
with fa . engine_context ( engine ):
df = fa . load ( "/path/to/file.parquet" )
out = fa . transform ( df , map_letter_to_food , schema = "*" )
fa . save ( out , "/path/to/output_file.parquet" )
run () # runs on Pandas
run ( engine = "spark" ) # runs on Spark
run ( engine = "dask" ) # runs on Dask
Todas as funções abaixo do contexto serão executadas no backend especificado. Isso facilita alternar entre execução local e execução distribuída.
FugueSQL é uma linguagem baseada em SQL capaz de expressar fluxos de trabalho de dados ponta a ponta em cima de Pandas, Spark e Dask. A função map_letter_to_food()
acima é usada na expressão SQL abaixo. Veja como usar uma função definida em Python junto com a instrução SQL SELECT
padrão.
from fugue . api import fugue_sql
import json
query = """
SELECT id, value
FROM input_df
TRANSFORM USING map_letter_to_food(mapping={{mapping}}) SCHEMA *
"""
map_dict_str = json . dumps ( map_dict )
# returns Pandas DataFrame
fugue_sql ( query , mapping = map_dict_str )
# returns Spark DataFrame
fugue_sql ( query , mapping = map_dict_str , engine = "spark" )
A fuga pode ser instalada através de pip ou conda. Por exemplo:
pip install fugue
Para usar o Fugue SQL, é altamente recomendável instalar o sql
extra:
pip install fugue[sql]
Também possui os seguintes extras de instalação:
Por exemplo, um caso de uso comum é:
pip install " fugue[duckdb,spark] "
Observe que se você já instalou o Spark ou o DuckDB de forma independente, o Fugue é capaz de usá-los automaticamente sem instalar os extras.
A melhor maneira de começar a usar o Fugue é seguir os tutoriais de 10 minutos:
Para a API de nível superior, consulte:
Os tutoriais também podem ser executados em um ambiente de notebook interativo através do Binder ou Docker:
Observe que ele funciona lentamente no fichário porque a máquina no fichário não é poderosa o suficiente para uma estrutura distribuída como o Spark. As execuções paralelas podem se tornar sequenciais, portanto, alguns exemplos de comparação de desempenho não fornecerão os números corretos.
Alternativamente, você deve obter um desempenho decente executando esta imagem Docker em sua própria máquina:
docker run -p 8888:8888 fugueproject/tutorials:latest
Há uma extensão de notebook para FugueSQL que permite aos usuários usar a magia da célula %%fsql
. A extensão também fornece destaque de sintaxe para células FugueSQL. Funciona tanto para notebook clássico quanto para Jupyter Lab. Mais detalhes podem ser encontrados nas instruções de instalação.
Por ser uma camada de abstração, o Fugue pode ser usado perfeitamente com muitos outros projetos de código aberto.
Back-ends Python:
Back-ends do FugueSQL:
O Fugue está disponível como backend ou pode ser integrado aos seguintes projetos:
Extensões de terceiros registradas (principalmente para Fugue SQL) incluem:
Sinta-se à vontade para nos enviar uma mensagem no Slack. Também temos instruções de contribuição.
Veja algumas de nossas apresentações e conteúdos de conferências mais recentes. Para uma lista mais completa, verifique a página Conteúdo nos tutoriais.