Tutorial | Dokumentasi API | Ngobrol dengan kami saat santai! |
---|---|---|
Fugue adalah antarmuka terpadu untuk komputasi terdistribusi yang memungkinkan pengguna mengeksekusi kode Python, Pandas, dan SQL pada Spark, Dask, dan Ray dengan penulisan ulang minimal .
Fugue paling sering digunakan untuk:
Untuk melihat bagaimana Fugue dibandingkan dengan framework lain seperti dbt, Arrow, Ibis, PySpark Pandas, lihat perbandingannya
Fugue API adalah kumpulan fungsi yang mampu berjalan di Pandas, Spark, Dask, dan Ray. Cara paling sederhana untuk menggunakan Fugue adalah fungsi transform()
. Hal ini memungkinkan pengguna memparalelkan eksekusi satu fungsi dengan membawanya ke Spark, Dask, atau Ray. Pada contoh di bawah, fungsi map_letter_to_food()
mengambil pemetaan dan menerapkannya pada kolom. Sejauh ini hanya Panda dan Python (tanpa Fugue).
import pandas as pd
from typing import Dict
input_df = pd . DataFrame ({ "id" :[ 0 , 1 , 2 ], "value" : ([ "A" , "B" , "C" ])})
map_dict = { "A" : "Apple" , "B" : "Banana" , "C" : "Carrot" }
def map_letter_to_food ( df : pd . DataFrame , mapping : Dict [ str , str ]) -> pd . DataFrame :
df [ "value" ] = df [ "value" ]. map ( mapping )
return df
Sekarang, fungsi map_letter_to_food()
dibawa ke mesin eksekusi Spark dengan memanggil fungsi transform()
dari Fugue. schema
dan params
keluaran diteruskan ke panggilan transform()
. schema
ini diperlukan karena merupakan persyaratan untuk kerangka kerja terdistribusi. Skema "*"
di bawah berarti semua kolom masukan ada di keluaran.
from pyspark . sql import SparkSession
from fugue import transform
spark = SparkSession . builder . getOrCreate ()
sdf = spark . createDataFrame ( input_df )
out = transform ( sdf ,
map_letter_to_food ,
schema = "*" ,
params = dict ( mapping = map_dict ),
)
# out is a Spark DataFrame
out . show ()
+---+------+
| id| value |
+---+------+
| 0| Apple |
| 1|Banana |
| 2|Carrot |
+---+------+
from typing import Iterator , Union
from pyspark . sql . types import StructType
from pyspark . sql import DataFrame , SparkSession
spark_session = SparkSession . builder . getOrCreate ()
def mapping_wrapper ( dfs : Iterator [ pd . DataFrame ], mapping ):
for df in dfs :
yield map_letter_to_food ( df , mapping )
def run_map_letter_to_food ( input_df : Union [ DataFrame , pd . DataFrame ], mapping ):
# conversion
if isinstance ( input_df , pd . DataFrame ):
sdf = spark_session . createDataFrame ( input_df . copy ())
else :
sdf = input_df . copy ()
schema = StructType ( list ( sdf . schema . fields ))
return sdf . mapInPandas ( lambda dfs : mapping_wrapper ( dfs , mapping ),
schema = schema )
result = run_map_letter_to_food ( input_df , map_dict )
result . show ()
Sintaks ini lebih sederhana, lebih bersih, dan lebih mudah dipelihara dibandingkan padanan PySpark. Pada saat yang sama, tidak ada pengeditan yang dilakukan pada fungsi asli berbasis Pandas untuk membawanya ke Spark. Itu masih dapat digunakan di Pandas DataFrames. Fugue transform()
juga mendukung Dask dan Ray sebagai mesin eksekusi bersama dengan mesin default berbasis Pandas.
Fugue API memiliki koleksi fungsi yang lebih luas yang juga kompatibel dengan Spark, Dask, dan Ray. Misalnya, kita dapat menggunakan load()
dan save()
untuk membuat alur kerja end-to-end yang kompatibel dengan Spark, Dask, dan Ray. Untuk daftar lengkap fungsi, lihat Top Level API
import fugue . api as fa
def run ( engine = None ):
with fa . engine_context ( engine ):
df = fa . load ( "/path/to/file.parquet" )
out = fa . transform ( df , map_letter_to_food , schema = "*" )
fa . save ( out , "/path/to/output_file.parquet" )
run () # runs on Pandas
run ( engine = "spark" ) # runs on Spark
run ( engine = "dask" ) # runs on Dask
Semua fungsi di bawah konteks akan berjalan pada backend yang ditentukan. Hal ini memudahkan peralihan antara eksekusi lokal dan eksekusi terdistribusi.
FugueSQL adalah bahasa berbasis SQL yang mampu mengekspresikan alur kerja data ujung ke ujung di atas Pandas, Spark, dan Dask. Fungsi map_letter_to_food()
di atas digunakan dalam ekspresi SQL di bawah. Ini adalah cara menggunakan fungsi yang ditentukan Python bersama dengan pernyataan SQL SELECT
standar.
from fugue . api import fugue_sql
import json
query = """
SELECT id, value
FROM input_df
TRANSFORM USING map_letter_to_food(mapping={{mapping}}) SCHEMA *
"""
map_dict_str = json . dumps ( map_dict )
# returns Pandas DataFrame
fugue_sql ( query , mapping = map_dict_str )
# returns Spark DataFrame
fugue_sql ( query , mapping = map_dict_str , engine = "spark" )
Fugue dapat diinstal melalui pip atau conda. Misalnya:
pip install fugue
Untuk menggunakan Fugue SQL, sangat disarankan untuk menginstal sql
ekstra:
pip install fugue[sql]
Ia juga memiliki tambahan instalasi berikut:
Misalnya kasus penggunaan yang umum adalah:
pip install " fugue[duckdb,spark] "
Catatan jika Anda sudah menginstal Spark atau DuckDB secara mandiri, Fugue dapat menggunakannya secara otomatis tanpa menginstal tambahan.
Cara terbaik untuk memulai Fugue adalah dengan mengikuti tutorial 10 menit:
Untuk API tingkat atas, lihat:
Tutorial juga dapat dijalankan di lingkungan notebook interaktif melalui binder atau Docker:
Perhatikan bahwa ini berjalan lambat pada binder karena mesin pada binder tidak cukup kuat untuk kerangka kerja terdistribusi seperti Spark. Eksekusi paralel dapat menjadi berurutan, sehingga beberapa contoh perbandingan kinerja tidak akan memberikan Anda angka yang benar.
Alternatifnya, Anda harus mendapatkan kinerja yang layak dengan menjalankan image Docker ini di mesin Anda sendiri:
docker run -p 8888:8888 fugueproject/tutorials:latest
Ada ekstensi notebook yang menyertainya untuk FugueSQL yang memungkinkan pengguna menggunakan keajaiban sel %%fsql
. Ekstensi ini juga menyediakan penyorotan sintaksis untuk sel FugueSQL. Ini berfungsi untuk notebook klasik dan Jupyter Lab. Detail lebih lanjut dapat ditemukan di petunjuk instalasi.
Dengan menjadi lapisan abstraksi, Fugue dapat digunakan dengan banyak proyek sumber terbuka lainnya dengan lancar.
Backend Python:
Backend FugueSQL:
Fugue tersedia sebagai backend atau dapat diintegrasikan dengan proyek berikut:
Ekstensi pihak ketiga yang terdaftar (terutama untuk Fugue SQL) meliputi:
Jangan ragu untuk mengirim pesan kepada kami di Slack. Kami juga memiliki instruksi kontribusi.
Lihat beberapa presentasi dan konten konferensi terbaru kami. Untuk daftar lebih lengkap, periksa halaman Konten di tutorial.