チュートリアル | APIドキュメント | Slack でチャットしましょう! |
---|---|---|
Fugue は、ユーザーが最小限の書き換えで Python、Pandas、SQL コードを Spark、Dask、Ray 上で実行できるようにする分散コンピューティング用の統合インターフェイスです。
フーガは次の目的で最も一般的に使用されます。
Fugue が dbt、Arrow、Ibis、PySpark Pandas などの他のフレームワークとどのように比較されるかを確認するには、比較を参照してください。
Fugue API は、Pandas、Spark、Dask、Ray で実行できる関数のコレクションです。 Fugue を使用する最も簡単な方法は、 transform()
関数です。これにより、ユーザーは単一関数を Spark、Dask、または Ray に持ち込んで並列実行できるようになります。以下の例では、 map_letter_to_food()
関数がマッピングを受け取り、それを列に適用します。これは今のところ Pandas と Python だけです (Fugue はありません)。
import pandas as pd
from typing import Dict
input_df = pd . DataFrame ({ "id" :[ 0 , 1 , 2 ], "value" : ([ "A" , "B" , "C" ])})
map_dict = { "A" : "Apple" , "B" : "Banana" , "C" : "Carrot" }
def map_letter_to_food ( df : pd . DataFrame , mapping : Dict [ str , str ]) -> pd . DataFrame :
df [ "value" ] = df [ "value" ]. map ( mapping )
return df
これで、Fugue のtransform()
関数を呼び出すことによって、 map_letter_to_food()
関数が Spark 実行エンジンに取り込まれます。出力schema
とparams
、 transform()
呼び出しに渡されます。 schema
が必要なのは、分散フレームワークの要件であるためです。以下のスキーマ"*"
は、すべての入力列が出力に含まれることを意味します。
from pyspark . sql import SparkSession
from fugue import transform
spark = SparkSession . builder . getOrCreate ()
sdf = spark . createDataFrame ( input_df )
out = transform ( sdf ,
map_letter_to_food ,
schema = "*" ,
params = dict ( mapping = map_dict ),
)
# out is a Spark DataFrame
out . show ()
+---+------+
| id| value |
+---+------+
| 0| Apple |
| 1|Banana |
| 2|Carrot |
+---+------+
from typing import Iterator , Union
from pyspark . sql . types import StructType
from pyspark . sql import DataFrame , SparkSession
spark_session = SparkSession . builder . getOrCreate ()
def mapping_wrapper ( dfs : Iterator [ pd . DataFrame ], mapping ):
for df in dfs :
yield map_letter_to_food ( df , mapping )
def run_map_letter_to_food ( input_df : Union [ DataFrame , pd . DataFrame ], mapping ):
# conversion
if isinstance ( input_df , pd . DataFrame ):
sdf = spark_session . createDataFrame ( input_df . copy ())
else :
sdf = input_df . copy ()
schema = StructType ( list ( sdf . schema . fields ))
return sdf . mapInPandas ( lambda dfs : mapping_wrapper ( dfs , mapping ),
schema = schema )
result = run_map_letter_to_food ( input_df , map_dict )
result . show ()
この構文は、同等の PySpark よりもシンプルでわかりやすく、保守しやすくなっています。同時に、元の Pandas ベースの関数を Spark に導入するための編集は行われませんでした。 Pandas DataFrame では引き続き使用できます。 Fugue transform()
デフォルトの Pandas ベースのエンジンに加えて、実行エンジンとして Dask と Ray もサポートします。
Fugue API には、Spark、Dask、Ray とも互換性のある幅広い関数のコレクションがあります。たとえば、 load()
とsave()
使用して、Spark、Dask、Ray と互換性のあるエンドツーエンドのワークフローを作成できます。関数の完全なリストについては、トップレベル API を参照してください。
import fugue . api as fa
def run ( engine = None ):
with fa . engine_context ( engine ):
df = fa . load ( "/path/to/file.parquet" )
out = fa . transform ( df , map_letter_to_food , schema = "*" )
fa . save ( out , "/path/to/output_file.parquet" )
run () # runs on Pandas
run ( engine = "spark" ) # runs on Spark
run ( engine = "dask" ) # runs on Dask
コンテキストの下にあるすべての関数は、指定されたバックエンドで実行されます。これにより、ローカル実行と分散実行を簡単に切り替えることができます。
FugueSQL は、Pandas、Spark、および Dask 上でエンドツーエンドのデータ ワークフローを表現できる SQL ベースの言語です。上記のmap_letter_to_food()
関数は、以下の SQL 式で使用されています。これは、標準 SQL SELECT
ステートメントとともに Python 定義関数を使用する方法です。
from fugue . api import fugue_sql
import json
query = """
SELECT id, value
FROM input_df
TRANSFORM USING map_letter_to_food(mapping={{mapping}}) SCHEMA *
"""
map_dict_str = json . dumps ( map_dict )
# returns Pandas DataFrame
fugue_sql ( query , mapping = map_dict_str )
# returns Spark DataFrame
fugue_sql ( query , mapping = map_dict_str , engine = "spark" )
Fugue は pip または conda を通じてインストールできます。例えば:
pip install fugue
Fugue SQL を使用するには、追加のsql
をインストールすることを強くお勧めします。
pip install fugue[sql]
次のインストール用の追加機能もあります。
たとえば、一般的な使用例は次のとおりです。
pip install " fugue[duckdb,spark] "
すでに Spark または DuckDB を個別にインストールしている場合、Fugue は追加機能をインストールしなくてもそれらを自動的に使用できることに注意してください。
Fugue を使い始める最良の方法は、10 分間のチュートリアルに取り組むことです。
最上位 API については、以下を参照してください。
チュートリアルは、バインダーまたは Docker を介して対話型ノートブック環境で実行することもできます。
バインダー上のマシンは Spark などの分散フレームワークに対して十分強力ではないため、バインダー上では実行速度が遅いことに注意してください。並列実行は順次実行になる可能性があるため、一部のパフォーマンス比較例では正しい数値が得られません。
あるいは、この Docker イメージを自分のマシンで実行すると、適切なパフォーマンスが得られるはずです。
docker run -p 8888:8888 fugueproject/tutorials:latest
FugueSQL には、ユーザーが%%fsql
セル マジックを使用できるようにするノートブック拡張機能が付属しています。この拡張機能では、FugueSQL セルの構文の強調表示も提供します。クラシック ノートブックと Jupyter Lab の両方で機能します。詳細については、インストール手順を参照してください。
Fugue は抽象化レイヤーであるため、他の多くのオープンソース プロジェクトとシームレスに使用できます。
Python バックエンド:
FugueSQL バックエンド:
Fugue はバックエンドとして利用できるか、次のプロジェクトと統合できます。
登録されているサードパーティ拡張機能 (主に Fugue SQL 用) には次のものがあります。
Slack でお気軽にメッセージをお送りください。寄付に関する指示もあります。
最新のカンファレンスのプレゼンテーションとコンテンツの一部をご覧ください。より完全なリストについては、チュートリアルのコンテンツ ページを確認してください。