دروس | وثائق واجهة برمجة التطبيقات | الدردشة معنا على الركود! |
---|---|---|
Fugue هي واجهة موحدة للحوسبة الموزعة تتيح للمستخدمين تنفيذ تعليمات Python وPandas وSQL البرمجية على Spark وDask وRay بأقل قدر من عمليات إعادة الكتابة .
يستخدم الشرود بشكل شائع من أجل:
لمعرفة كيفية مقارنة Fugue بأطر العمل الأخرى مثل dbt وArrow وIbis وPySpark Pandas، راجع المقارنات
Fugue API عبارة عن مجموعة من الوظائف القادرة على التشغيل على Pandas وSpark وDask وRay. إن أبسط طريقة لاستخدام Fugue هي وظيفة transform()
. يتيح ذلك للمستخدمين موازنة تنفيذ وظيفة واحدة عن طريق إحضارها إلى Spark أو Dask أو Ray. في المثال أدناه، تقوم الدالة map_letter_to_food()
بأخذ التعيين وتطبيقه على عمود. هذه مجرد Pandas وPython حتى الآن (بدون Fugue).
import pandas as pd
from typing import Dict
input_df = pd . DataFrame ({ "id" :[ 0 , 1 , 2 ], "value" : ([ "A" , "B" , "C" ])})
map_dict = { "A" : "Apple" , "B" : "Banana" , "C" : "Carrot" }
def map_letter_to_food ( df : pd . DataFrame , mapping : Dict [ str , str ]) -> pd . DataFrame :
df [ "value" ] = df [ "value" ]. map ( mapping )
return df
الآن، يتم إحضار وظيفة map_letter_to_food()
إلى محرك تنفيذ Spark عن طريق استدعاء وظيفة transform()
الخاصة بـ Fugue. يتم تمرير schema
الإخراج params
إلى استدعاء transform()
. schema
مطلوب لأنه متطلب للأطر الموزعة. المخطط "*"
أدناه يعني أن جميع أعمدة الإدخال موجودة في الإخراج.
from pyspark . sql import SparkSession
from fugue import transform
spark = SparkSession . builder . getOrCreate ()
sdf = spark . createDataFrame ( input_df )
out = transform ( sdf ,
map_letter_to_food ,
schema = "*" ,
params = dict ( mapping = map_dict ),
)
# out is a Spark DataFrame
out . show ()
+---+------+
| id| value |
+---+------+
| 0| Apple |
| 1|Banana |
| 2|Carrot |
+---+------+
from typing import Iterator , Union
from pyspark . sql . types import StructType
from pyspark . sql import DataFrame , SparkSession
spark_session = SparkSession . builder . getOrCreate ()
def mapping_wrapper ( dfs : Iterator [ pd . DataFrame ], mapping ):
for df in dfs :
yield map_letter_to_food ( df , mapping )
def run_map_letter_to_food ( input_df : Union [ DataFrame , pd . DataFrame ], mapping ):
# conversion
if isinstance ( input_df , pd . DataFrame ):
sdf = spark_session . createDataFrame ( input_df . copy ())
else :
sdf = input_df . copy ()
schema = StructType ( list ( sdf . schema . fields ))
return sdf . mapInPandas ( lambda dfs : mapping_wrapper ( dfs , mapping ),
schema = schema )
result = run_map_letter_to_food ( input_df , map_dict )
result . show ()
يعد بناء الجملة هذا أبسط وأنظف وأكثر قابلية للصيانة من مكافئ PySpark. في الوقت نفسه، لم يتم إجراء أي تعديلات على الوظيفة الأصلية المستندة إلى Pandas لإحضارها إلى Spark. لا يزال قابلاً للاستخدام على Pandas DataFrames. يدعم Fugue transform()
أيضًا Dask وRay كمحركات تنفيذ إلى جانب المحرك الافتراضي المستند إلى Pandas.
تحتوي Fugue API على مجموعة واسعة من الوظائف المتوافقة أيضًا مع Spark وDask وRay. على سبيل المثال، يمكننا استخدام load()
save()
لإنشاء سير عمل شامل متوافق مع Spark وDask وRay. للحصول على القائمة الكاملة للوظائف، راجع Top Level API
import fugue . api as fa
def run ( engine = None ):
with fa . engine_context ( engine ):
df = fa . load ( "/path/to/file.parquet" )
out = fa . transform ( df , map_letter_to_food , schema = "*" )
fa . save ( out , "/path/to/output_file.parquet" )
run () # runs on Pandas
run ( engine = "spark" ) # runs on Spark
run ( engine = "dask" ) # runs on Dask
سيتم تشغيل جميع الوظائف الموجودة أسفل السياق على الواجهة الخلفية المحددة. وهذا يجعل من السهل التبديل بين التنفيذ المحلي والتنفيذ الموزع.
FugueSQL هي لغة قائمة على SQL قادرة على التعبير عن سير عمل البيانات الشاملة أعلى Pandas وSpark وDask. يتم استخدام الدالة map_letter_to_food()
أعلاه في تعبير SQL أدناه. هذه هي كيفية استخدام دالة معرفة بواسطة Python مع عبارة SQL SELECT
القياسية.
from fugue . api import fugue_sql
import json
query = """
SELECT id, value
FROM input_df
TRANSFORM USING map_letter_to_food(mapping={{mapping}}) SCHEMA *
"""
map_dict_str = json . dumps ( map_dict )
# returns Pandas DataFrame
fugue_sql ( query , mapping = map_dict_str )
# returns Spark DataFrame
fugue_sql ( query , mapping = map_dict_str , engine = "spark" )
يمكن تثبيت الشرود من خلال النقطة أو الكوندا. على سبيل المثال:
pip install fugue
من أجل استخدام Fugue SQL، يوصى بشدة بتثبيت sql
الإضافي:
pip install fugue[sql]
كما أن لديها إضافات التثبيت التالية:
على سبيل المثال حالة الاستخدام الشائع هي:
pip install " fugue[duckdb,spark] "
لاحظ أنه إذا قمت بالفعل بتثبيت Spark أو DuckDB بشكل مستقل، فإن Fugue قادر على استخدامها تلقائيًا دون تثبيت الإضافات.
أفضل طريقة للبدء مع Fugue هي العمل من خلال البرامج التعليمية التي تبلغ مدتها 10 دقائق:
للحصول على واجهة برمجة التطبيقات (API) ذات المستوى الأعلى، راجع:
يمكن أيضًا تشغيل البرامج التعليمية في بيئة دفتر ملاحظات تفاعلية من خلال Binder أو Docker:
لاحظ أنه يعمل ببطء على الموثق لأن الجهاز الموجود على الموثق ليس قويًا بدرجة كافية لإطار عمل موزع مثل Spark. يمكن أن تصبح عمليات التنفيذ المتوازية متسلسلة، لذا فإن بعض أمثلة مقارنة الأداء لن تعطيك الأرقام الصحيحة.
وبدلاً من ذلك، يجب أن تحصل على أداء جيد عن طريق تشغيل صورة Docker هذه على جهازك الخاص:
docker run -p 8888:8888 fugueproject/tutorials:latest
يوجد ملحق دفتر ملاحظات مصاحب لـ FugueSQL يتيح للمستخدمين استخدام سحر الخلية %%fsql
. يوفر الامتداد أيضًا تمييزًا لبناء الجملة لخلايا FugueSQL. إنه يعمل مع كل من أجهزة الكمبيوتر المحمولة الكلاسيكية وJupyter Lab. يمكن العثور على مزيد من التفاصيل في تعليمات التثبيت.
من خلال كونها طبقة تجريدية، يمكن استخدام Fugue مع الكثير من المشاريع الأخرى مفتوحة المصدر بسلاسة.
واجهات بايثون الخلفية:
الواجهات الخلفية لـ FugueSQL:
Fugue متاح كواجهة خلفية أو يمكن دمجه مع المشاريع التالية:
تتضمن ملحقات الطرف الثالث المسجلة (خاصة لـ Fugue SQL) ما يلي:
لا تتردد في مراسلتنا على سلاك. لدينا أيضًا تعليمات المساهمة.
شاهد بعضًا من أحدث العروض التقديمية والمحتوى الخاص بالمؤتمرات. للحصول على قائمة أكثر اكتمالا، راجع صفحة المحتوى في البرامج التعليمية.