教學 | API文件 | 在 Slack 上與我們聊天! |
---|---|---|
Fugue 是分散式運算的統一接口,允許使用者在 Spark、Dask 和 Ray 上執行 Python、Pandas 和 SQL 程式碼,只需最少的重寫。
賦格曲最常用於:
要了解 Fugue 與 dbt、Arrow、Ibis、PySpark Pandas 等其他框架的比較,請參閱比較
Fugue API 是能夠在 Pandas、Spark、Dask 和 Ray 上運行的函數集合。使用Fugue 最簡單的方法是transform()
函數。這使得用戶可以透過將單一函數引入 Spark、Dask 或 Ray 來並行執行該函數。在下面的範例中, map_letter_to_food()
函數接受映射並將其應用於列。到目前為止,這只是 Pandas 和 Python(沒有 Fugue)。
import pandas as pd
from typing import Dict
input_df = pd . DataFrame ({ "id" :[ 0 , 1 , 2 ], "value" : ([ "A" , "B" , "C" ])})
map_dict = { "A" : "Apple" , "B" : "Banana" , "C" : "Carrot" }
def map_letter_to_food ( df : pd . DataFrame , mapping : Dict [ str , str ]) -> pd . DataFrame :
df [ "value" ] = df [ "value" ]. map ( mapping )
return df
現在, map_letter_to_food()
函數透過呼叫Fugue的transform()
函數被帶入Spark執行引擎。輸出schema
和params
傳遞給transform()
呼叫。需要該schema
,因為它是分散式框架的要求。下面的"*"
模式表示所有輸入列都在輸出中。
from pyspark . sql import SparkSession
from fugue import transform
spark = SparkSession . builder . getOrCreate ()
sdf = spark . createDataFrame ( input_df )
out = transform ( sdf ,
map_letter_to_food ,
schema = "*" ,
params = dict ( mapping = map_dict ),
)
# out is a Spark DataFrame
out . show ()
+---+------+
| id| value |
+---+------+
| 0| Apple |
| 1|Banana |
| 2|Carrot |
+---+------+
from typing import Iterator , Union
from pyspark . sql . types import StructType
from pyspark . sql import DataFrame , SparkSession
spark_session = SparkSession . builder . getOrCreate ()
def mapping_wrapper ( dfs : Iterator [ pd . DataFrame ], mapping ):
for df in dfs :
yield map_letter_to_food ( df , mapping )
def run_map_letter_to_food ( input_df : Union [ DataFrame , pd . DataFrame ], mapping ):
# conversion
if isinstance ( input_df , pd . DataFrame ):
sdf = spark_session . createDataFrame ( input_df . copy ())
else :
sdf = input_df . copy ()
schema = StructType ( list ( sdf . schema . fields ))
return sdf . mapInPandas ( lambda dfs : mapping_wrapper ( dfs , mapping ),
schema = schema )
result = run_map_letter_to_food ( input_df , map_dict )
result . show ()
此語法比 PySpark 的等效語法更簡單、更清晰且更易於維護。同時,沒有對原始的基於 Pandas 的函數進行任何編輯以將其引入 Spark。它仍然可以在 Pandas DataFrames 上使用。 Fugue transform()
也支援 Dask 和 Ray 作為執行引擎以及預設的基於 Pandas 的引擎。
Fugue API 擁有更廣泛的函數集合,這些函數也與 Spark、Dask 和 Ray 相容。例如,我們可以使用load()
和save()
來建立與 Spark、Dask 和 Ray 相容的端對端工作流程。有關函數的完整列表,請參閱頂級 API
import fugue . api as fa
def run ( engine = None ):
with fa . engine_context ( engine ):
df = fa . load ( "/path/to/file.parquet" )
out = fa . transform ( df , map_letter_to_food , schema = "*" )
fa . save ( out , "/path/to/output_file.parquet" )
run () # runs on Pandas
run ( engine = "spark" ) # runs on Spark
run ( engine = "dask" ) # runs on Dask
上下文下的所有函數都將在指定的後端上運行。這使得在本地執行和分散式執行之間切換變得容易。
FugueSQL 是一種基於 SQL 的語言,能夠在 Pandas、Spark 和 Dask 之上表達端對端資料工作流程。下面的 SQL 表達式中使用了上面的map_letter_to_food()
函數。這是如何將 Python 定義的函數與標準 SQL SELECT
語句一起使用。
from fugue . api import fugue_sql
import json
query = """
SELECT id, value
FROM input_df
TRANSFORM USING map_letter_to_food(mapping={{mapping}}) SCHEMA *
"""
map_dict_str = json . dumps ( map_dict )
# returns Pandas DataFrame
fugue_sql ( query , mapping = map_dict_str )
# returns Spark DataFrame
fugue_sql ( query , mapping = map_dict_str , engine = "spark" )
Fugue可以透過pip或conda安裝。例如:
pip install fugue
為了使用 Fugue SQL,強烈建議安裝sql
extra:
pip install fugue[sql]
它還具有以下安裝附加功能:
例如,一個常見的用例是:
pip install " fugue[duckdb,spark] "
請注意,如果您已經獨立安裝了 Spark 或 DuckDB,Fugue 能夠自動使用它們,而無需安裝附加元件。
開始使用 Fugue 的最佳方法是完成 10 分鐘的教學:
對於頂級 API,請參閱:
這些教程還可以透過binder或Docker在互動式筆記本環境中運行:
請注意,它在 Binder 上運行緩慢,因為 Binder 上的機器對於 Spark 等分散式框架來說不夠強大。並行執行可能會變成順序執行,因此某些效能比較範例不會為您提供正確的數字。
或者,您應該透過在自己的電腦上執行此 Docker 映像來獲得不錯的效能:
docker run -p 8888:8888 fugueproject/tutorials:latest
FugueSQL 有一個隨附的筆記本擴展,可讓使用者使用%%fsql
單元魔法。該擴充功能還為 FugueSQL 單元提供語法突出顯示。它適用於經典筆記本和 Jupyter Lab。更多詳細資訊可以在安裝說明中找到。
作為一個抽象層,Fugue 可以與許多其他開源專案無縫地使用。
Python 後端:
FugueSQL 後端:
Fugue 可用作後端,或可與下列項目整合:
已註冊的第 3 方擴充(主要針對 Fugue SQL)包括:
請隨時在 Slack 上給我們留言。我們還有貢獻說明。
查看我們的一些最新會議演示和內容。有關更完整的列表,請查看教程中的內容頁面。