教程 | API文档 | 在 Slack 上与我们聊天! |
---|---|---|
Fugue 是分布式计算的统一接口,允许用户在 Spark、Dask 和 Ray 上执行 Python、Pandas 和 SQL 代码,只需最少的重写。
赋格曲最常用于:
要了解 Fugue 与 dbt、Arrow、Ibis、PySpark Pandas 等其他框架的比较,请参阅比较
Fugue API 是能够在 Pandas、Spark、Dask 和 Ray 上运行的函数集合。使用Fugue 最简单的方法是transform()
函数。这使得用户可以通过将单个函数引入 Spark、Dask 或 Ray 来并行执行该函数。在下面的示例中, map_letter_to_food()
函数接受映射并将其应用于列。到目前为止,这只是 Pandas 和 Python(没有 Fugue)。
import pandas as pd
from typing import Dict
input_df = pd . DataFrame ({ "id" :[ 0 , 1 , 2 ], "value" : ([ "A" , "B" , "C" ])})
map_dict = { "A" : "Apple" , "B" : "Banana" , "C" : "Carrot" }
def map_letter_to_food ( df : pd . DataFrame , mapping : Dict [ str , str ]) -> pd . DataFrame :
df [ "value" ] = df [ "value" ]. map ( mapping )
return df
现在, map_letter_to_food()
函数通过调用Fugue的transform()
函数被带入Spark执行引擎。输出schema
和params
被传递给transform()
调用。需要该schema
,因为它是分布式框架的要求。下面的"*"
模式表示所有输入列都在输出中。
from pyspark . sql import SparkSession
from fugue import transform
spark = SparkSession . builder . getOrCreate ()
sdf = spark . createDataFrame ( input_df )
out = transform ( sdf ,
map_letter_to_food ,
schema = "*" ,
params = dict ( mapping = map_dict ),
)
# out is a Spark DataFrame
out . show ()
+---+------+
| id| value |
+---+------+
| 0| Apple |
| 1|Banana |
| 2|Carrot |
+---+------+
from typing import Iterator , Union
from pyspark . sql . types import StructType
from pyspark . sql import DataFrame , SparkSession
spark_session = SparkSession . builder . getOrCreate ()
def mapping_wrapper ( dfs : Iterator [ pd . DataFrame ], mapping ):
for df in dfs :
yield map_letter_to_food ( df , mapping )
def run_map_letter_to_food ( input_df : Union [ DataFrame , pd . DataFrame ], mapping ):
# conversion
if isinstance ( input_df , pd . DataFrame ):
sdf = spark_session . createDataFrame ( input_df . copy ())
else :
sdf = input_df . copy ()
schema = StructType ( list ( sdf . schema . fields ))
return sdf . mapInPandas ( lambda dfs : mapping_wrapper ( dfs , mapping ),
schema = schema )
result = run_map_letter_to_food ( input_df , map_dict )
result . show ()
此语法比 PySpark 的等效语法更简单、更清晰且更易于维护。同时,没有对原始的基于 Pandas 的函数进行任何编辑以将其引入 Spark。它仍然可以在 Pandas DataFrames 上使用。 Fugue transform()
还支持 Dask 和 Ray 作为执行引擎以及默认的基于 Pandas 的引擎。
Fugue API 拥有更广泛的函数集合,这些函数也与 Spark、Dask 和 Ray 兼容。例如,我们可以使用load()
和save()
创建与 Spark、Dask 和 Ray 兼容的端到端工作流程。有关函数的完整列表,请参阅顶级 API
import fugue . api as fa
def run ( engine = None ):
with fa . engine_context ( engine ):
df = fa . load ( "/path/to/file.parquet" )
out = fa . transform ( df , map_letter_to_food , schema = "*" )
fa . save ( out , "/path/to/output_file.parquet" )
run () # runs on Pandas
run ( engine = "spark" ) # runs on Spark
run ( engine = "dask" ) # runs on Dask
上下文下的所有函数都将在指定的后端上运行。这使得在本地执行和分布式执行之间切换变得容易。
FugueSQL 是一种基于 SQL 的语言,能够在 Pandas、Spark 和 Dask 之上表达端到端数据工作流。下面的 SQL 表达式中使用了上面的map_letter_to_food()
函数。这是如何将 Python 定义的函数与标准 SQL SELECT
语句一起使用。
from fugue . api import fugue_sql
import json
query = """
SELECT id, value
FROM input_df
TRANSFORM USING map_letter_to_food(mapping={{mapping}}) SCHEMA *
"""
map_dict_str = json . dumps ( map_dict )
# returns Pandas DataFrame
fugue_sql ( query , mapping = map_dict_str )
# returns Spark DataFrame
fugue_sql ( query , mapping = map_dict_str , engine = "spark" )
Fugue可以通过pip或conda安装。例如:
pip install fugue
为了使用 Fugue SQL,强烈建议安装sql
extra:
pip install fugue[sql]
它还具有以下安装附加功能:
例如,一个常见的用例是:
pip install " fugue[duckdb,spark] "
请注意,如果您已经独立安装了 Spark 或 DuckDB,Fugue 能够自动使用它们,而无需安装附加组件。
开始使用 Fugue 的最佳方法是完成 10 分钟的教程:
对于顶级 API,请参阅:
这些教程还可以通过binder或Docker在交互式笔记本环境中运行:
请注意,它在 Binder 上运行缓慢,因为 Binder 上的机器对于 Spark 等分布式框架来说不够强大。并行执行可能会变成顺序执行,因此某些性能比较示例不会为您提供正确的数字。
或者,您应该通过在自己的计算机上运行此 Docker 映像来获得不错的性能:
docker run -p 8888:8888 fugueproject/tutorials:latest
FugueSQL 有一个随附的笔记本扩展,可让用户使用%%fsql
单元魔法。该扩展还为 FugueSQL 单元提供语法突出显示。它适用于经典笔记本和 Jupyter Lab。更多详细信息可以在安装说明中找到。
作为一个抽象层,Fugue 可以与许多其他开源项目无缝地使用。
Python 后端:
FugueSQL 后端:
Fugue 可用作后端,或者可以与以下项目集成:
已注册的第 3 方扩展(主要针对 Fugue SQL)包括:
请随时在 Slack 上给我们留言。我们还有贡献说明。
查看我们的一些最新会议演示和内容。有关更完整的列表,请查看教程中的内容页面。