บทช่วยสอน | เอกสาร API | พูดคุยกับเราในหย่อน! |
---|---|---|
Fugue เป็นอินเทอร์เฟซแบบรวมสำหรับการประมวลผลแบบกระจายที่ให้ผู้ใช้สามารถรันโค้ด Python, Pandas และ SQL บน Spark, Dask และ Ray ด้วยการเขียนซ้ำเพียงเล็กน้อย
Fugue มักใช้สำหรับ:
หากต้องการดูว่า Fugue เปรียบเทียบกับเฟรมเวิร์กอื่นๆ เช่น dbt, Arrow, Ibis, PySpark Pandas อย่างไร โปรดดูการเปรียบเทียบ
Fugue API คือชุดของฟังก์ชันที่สามารถทำงานบน Pandas, Spark, Dask และ Ray วิธีที่ง่ายที่สุดในการใช้ Fugue คือฟังก์ชัน transform()
ซึ่งช่วยให้ผู้ใช้สามารถขนานการทำงานของฟังก์ชันเดียวได้โดยนำไปที่ Spark, Dask หรือ Ray ในตัวอย่างด้านล่าง ฟังก์ชัน map_letter_to_food()
รับการแมปและนำไปใช้กับคอลัมน์ จนถึงตอนนี้เป็นเพียง Pandas และ Python (ไม่มี Fugue)
import pandas as pd
from typing import Dict
input_df = pd . DataFrame ({ "id" :[ 0 , 1 , 2 ], "value" : ([ "A" , "B" , "C" ])})
map_dict = { "A" : "Apple" , "B" : "Banana" , "C" : "Carrot" }
def map_letter_to_food ( df : pd . DataFrame , mapping : Dict [ str , str ]) -> pd . DataFrame :
df [ "value" ] = df [ "value" ]. map ( mapping )
return df
ตอนนี้ ฟังก์ชัน map_letter_to_food()
ถูกนำไปยังเอ็นจิ้นการประมวลผล Spark โดยการเรียกใช้ฟังก์ชัน transform()
ของ Fugue schema
และ params
เอาต์พุตจะถูกส่งผ่านไปยังการเรียก transform()
จำเป็นต้องมี schema
เนื่องจากเป็นข้อกำหนดสำหรับเฟรมเวิร์กแบบกระจาย สคีมาของ "*"
ด้านล่างหมายความว่าคอลัมน์อินพุตทั้งหมดอยู่ในเอาต์พุต
from pyspark . sql import SparkSession
from fugue import transform
spark = SparkSession . builder . getOrCreate ()
sdf = spark . createDataFrame ( input_df )
out = transform ( sdf ,
map_letter_to_food ,
schema = "*" ,
params = dict ( mapping = map_dict ),
)
# out is a Spark DataFrame
out . show ()
+---+------+
| id| value |
+---+------+
| 0| Apple |
| 1|Banana |
| 2|Carrot |
+---+------+
from typing import Iterator , Union
from pyspark . sql . types import StructType
from pyspark . sql import DataFrame , SparkSession
spark_session = SparkSession . builder . getOrCreate ()
def mapping_wrapper ( dfs : Iterator [ pd . DataFrame ], mapping ):
for df in dfs :
yield map_letter_to_food ( df , mapping )
def run_map_letter_to_food ( input_df : Union [ DataFrame , pd . DataFrame ], mapping ):
# conversion
if isinstance ( input_df , pd . DataFrame ):
sdf = spark_session . createDataFrame ( input_df . copy ())
else :
sdf = input_df . copy ()
schema = StructType ( list ( sdf . schema . fields ))
return sdf . mapInPandas ( lambda dfs : mapping_wrapper ( dfs , mapping ),
schema = schema )
result = run_map_letter_to_food ( input_df , map_dict )
result . show ()
ไวยากรณ์นี้ง่ายกว่า สะอาดกว่า และบำรุงรักษาได้ดีกว่า PySpark ที่เทียบเท่า ในเวลาเดียวกัน ไม่มีการแก้ไขฟังก์ชันที่ใช้ Pandas ดั้งเดิมเพื่อนำมาสู่ Spark มันยังคงใช้งานได้บน Pandas DataFrames Fugue transform()
ยังรองรับ Dask และ Ray เป็นเครื่องมือดำเนินการควบคู่ไปกับกลไกเริ่มต้นที่ใช้ Pandas
Fugue API มีคอลเลกชันฟังก์ชันที่กว้างขึ้นซึ่งเข้ากันได้กับ Spark, Dask และ Ray ตัวอย่างเช่น เราสามารถใช้ load()
และ save()
เพื่อสร้างเวิร์กโฟลว์แบบ end-to-end ที่เข้ากันได้กับ Spark, Dask และ Ray สำหรับรายการฟังก์ชันทั้งหมด โปรดดูที่ Top Level API
import fugue . api as fa
def run ( engine = None ):
with fa . engine_context ( engine ):
df = fa . load ( "/path/to/file.parquet" )
out = fa . transform ( df , map_letter_to_food , schema = "*" )
fa . save ( out , "/path/to/output_file.parquet" )
run () # runs on Pandas
run ( engine = "spark" ) # runs on Spark
run ( engine = "dask" ) # runs on Dask
ฟังก์ชันทั้งหมดภายใต้บริบทจะทำงานบนแบ็กเอนด์ที่ระบุ ทำให้ง่ายต่อการสลับระหว่างการดำเนินการภายในและการดำเนินการแบบกระจาย
FugueSQL เป็นภาษา SQL ที่สามารถแสดงเวิร์กโฟลว์ข้อมูลแบบ end-to-end นอกเหนือจาก Pandas, Spark และ Dask ฟังก์ชัน map_letter_to_food()
ด้านบนใช้ในนิพจน์ SQL ด้านล่าง นี่คือวิธีการใช้ฟังก์ชันที่กำหนดโดย Python ร่วมกับคำสั่ง SQL SELECT
มาตรฐาน
from fugue . api import fugue_sql
import json
query = """
SELECT id, value
FROM input_df
TRANSFORM USING map_letter_to_food(mapping={{mapping}}) SCHEMA *
"""
map_dict_str = json . dumps ( map_dict )
# returns Pandas DataFrame
fugue_sql ( query , mapping = map_dict_str )
# returns Spark DataFrame
fugue_sql ( query , mapping = map_dict_str , engine = "spark" )
Fugue สามารถติดตั้งผ่าน pip หรือ conda ตัวอย่างเช่น:
pip install fugue
เพื่อที่จะใช้ Fugue SQL ขอแนะนำอย่างยิ่งให้ติดตั้ง sql
เพิ่มเติม:
pip install fugue[sql]
นอกจากนี้ยังมีคุณสมบัติพิเศษในการติดตั้งดังต่อไปนี้:
ตัวอย่างเช่น กรณีการใช้งานทั่วไปคือ:
pip install " fugue[duckdb,spark] "
โปรดทราบว่าหากคุณติดตั้ง Spark หรือ DuckDB แยกจากกันแล้ว Fugue จะสามารถใช้งานได้โดยอัตโนมัติโดยไม่ต้องติดตั้งส่วนเสริม
วิธีที่ดีที่สุดในการเริ่มต้นใช้งาน Fugue คือการใช้บทช่วยสอนความยาว 10 นาที:
สำหรับ API ระดับบนสุด โปรดดูที่:
บทช่วยสอนยังสามารถเรียกใช้ในสภาพแวดล้อมสมุดบันทึกแบบโต้ตอบผ่าน Binder หรือ Docker:
โปรดทราบว่ามันทำงานช้าบน Binder เนื่องจากเครื่องบน Binder ไม่ทรงพลังเพียงพอสำหรับเฟรมเวิร์กแบบกระจายเช่น Spark การดำเนินการแบบขนานอาจกลายเป็นลำดับได้ ดังนั้นตัวอย่างการเปรียบเทียบประสิทธิภาพบางตัวอย่างจะไม่ให้ตัวเลขที่ถูกต้องแก่คุณ
หรืออีกทางหนึ่ง คุณควรได้รับประสิทธิภาพที่เหมาะสมโดยการรันอิมเมจ Docker นี้บนเครื่องของคุณเอง:
docker run -p 8888:8888 fugueproject/tutorials:latest
มีส่วนขยายโน้ตบุ๊กที่มาพร้อมกับ FugueSQL ที่ให้ผู้ใช้สามารถใช้เซลล์เวทย์มนตร์ %%fsql
ได้ ส่วนขยายยังจัดให้มีการเน้นไวยากรณ์สำหรับเซลล์ FugueSQL ใช้ได้กับทั้งโน้ตบุ๊กคลาสสิกและ Jupyter Lab รายละเอียดเพิ่มเติมสามารถพบได้ในคำแนะนำในการติดตั้ง
ด้วยการเป็นเลเยอร์นามธรรม ทำให้ Fugue สามารถใช้กับโปรเจ็กต์โอเพ่นซอร์สอื่นๆ มากมายได้อย่างราบรื่น
แบ็กเอนด์หลาม:
แบ็กเอนด์ FugueSQL:
Fugue พร้อมใช้งานเป็นแบ็กเอนด์หรือสามารถรวมเข้ากับโปรเจ็กต์ต่อไปนี้:
ส่วนขยายของบุคคลที่สามที่ลงทะเบียน (ส่วนใหญ่สำหรับ Fugue SQL) ประกอบด้วย:
อย่าลังเลที่จะส่งข้อความถึงเราบน Slack เรายังมีคำแนะนำในการร่วมให้ข้อมูลด้วย
ดูการนำเสนอและเนื้อหาการประชุมล่าสุดของเรา หากต้องการดูรายการที่สมบูรณ์ยิ่งขึ้น โปรดดูที่หน้าเนื้อหาในบทช่วยสอน