Учебники | API-документация | Общайтесь с нами в Slack! |
---|---|---|
Fugue — это унифицированный интерфейс для распределенных вычислений, который позволяет пользователям выполнять код Python, Pandas и SQL в Spark, Dask и Ray с минимальными перезаписями .
Фуга чаще всего используется для:
Чтобы сравнить Fugue с другими фреймворками, такими как dbt, Arrow, Ibis, PySpark Pandas, см. сравнения.
API Fugue — это набор функций, которые могут работать на Pandas, Spark, Dask и Ray. Самый простой способ использовать Fugue — это функция transform()
. Это позволяет пользователям распараллеливать выполнение одной функции, перенеся ее в Spark, Dask или Ray. В приведенном ниже примере функция map_letter_to_food()
принимает сопоставление и применяет его к столбцу. Пока это только Pandas и Python (без Fugue).
import pandas as pd
from typing import Dict
input_df = pd . DataFrame ({ "id" :[ 0 , 1 , 2 ], "value" : ([ "A" , "B" , "C" ])})
map_dict = { "A" : "Apple" , "B" : "Banana" , "C" : "Carrot" }
def map_letter_to_food ( df : pd . DataFrame , mapping : Dict [ str , str ]) -> pd . DataFrame :
df [ "value" ] = df [ "value" ]. map ( mapping )
return df
Теперь функция map_letter_to_food()
переносится в механизм выполнения Spark путем вызова функции transform()
Fugue. Выходная schema
и params
передаются в вызов transform()
. schema
необходима, потому что это требование для распределенных платформ. Схема со "*"
ниже означает, что все входные столбцы находятся в выходных данных.
from pyspark . sql import SparkSession
from fugue import transform
spark = SparkSession . builder . getOrCreate ()
sdf = spark . createDataFrame ( input_df )
out = transform ( sdf ,
map_letter_to_food ,
schema = "*" ,
params = dict ( mapping = map_dict ),
)
# out is a Spark DataFrame
out . show ()
+---+------+
| id| value |
+---+------+
| 0| Apple |
| 1|Banana |
| 2|Carrot |
+---+------+
from typing import Iterator , Union
from pyspark . sql . types import StructType
from pyspark . sql import DataFrame , SparkSession
spark_session = SparkSession . builder . getOrCreate ()
def mapping_wrapper ( dfs : Iterator [ pd . DataFrame ], mapping ):
for df in dfs :
yield map_letter_to_food ( df , mapping )
def run_map_letter_to_food ( input_df : Union [ DataFrame , pd . DataFrame ], mapping ):
# conversion
if isinstance ( input_df , pd . DataFrame ):
sdf = spark_session . createDataFrame ( input_df . copy ())
else :
sdf = input_df . copy ()
schema = StructType ( list ( sdf . schema . fields ))
return sdf . mapInPandas ( lambda dfs : mapping_wrapper ( dfs , mapping ),
schema = schema )
result = run_map_letter_to_food ( input_df , map_dict )
result . show ()
Этот синтаксис проще, понятнее и удобнее в обслуживании, чем эквивалент PySpark. В то же время в исходную функцию на основе Pandas не было внесено никаких изменений, чтобы перенести ее в Spark. Его по-прежнему можно использовать в Pandas DataFrames. Fugue transform()
также поддерживает Dask и Ray в качестве механизмов выполнения наряду со стандартным механизмом на основе Pandas.
API Fugue имеет более широкий набор функций, которые также совместимы со Spark, Dask и Ray. Например, мы можем использовать load()
и save()
для создания сквозного рабочего процесса, совместимого со Spark, Dask и Ray. Полный список функций см. в API верхнего уровня.
import fugue . api as fa
def run ( engine = None ):
with fa . engine_context ( engine ):
df = fa . load ( "/path/to/file.parquet" )
out = fa . transform ( df , map_letter_to_food , schema = "*" )
fa . save ( out , "/path/to/output_file.parquet" )
run () # runs on Pandas
run ( engine = "spark" ) # runs on Spark
run ( engine = "dask" ) # runs on Dask
Все функции в рамках контекста будут выполняться на указанном бэкэнде. Это позволяет легко переключаться между локальным и распределенным выполнением.
FugueSQL — это язык на основе SQL, способный выражать сквозные рабочие процессы обработки данных поверх Pandas, Spark и Dask. Приведенная выше функция map_letter_to_food()
используется в приведенном ниже выражении SQL. Вот как можно использовать функцию, определенную Python, вместе со стандартным оператором SQL SELECT
.
from fugue . api import fugue_sql
import json
query = """
SELECT id, value
FROM input_df
TRANSFORM USING map_letter_to_food(mapping={{mapping}}) SCHEMA *
"""
map_dict_str = json . dumps ( map_dict )
# returns Pandas DataFrame
fugue_sql ( query , mapping = map_dict_str )
# returns Spark DataFrame
fugue_sql ( query , mapping = map_dict_str , engine = "spark" )
Фугу можно установить через pip или conda. Например:
pip install fugue
Чтобы использовать Fugue SQL, настоятельно рекомендуется установить дополнительный модуль sql
:
pip install fugue[sql]
Он также имеет следующие дополнительные возможности установки:
Например, общий вариант использования:
pip install " fugue[duckdb,spark] "
Обратите внимание, что если вы уже установили Spark или DuckDB независимо, Fugue может автоматически использовать их без установки дополнений.
Лучший способ начать работу с Fugue — изучить 10-минутные руководства:
Для API верхнего уровня см.:
Учебные пособия также можно запускать в интерактивной среде блокнота через Binder или Docker:
Обратите внимание, что он работает медленно на связующем устройстве , поскольку машина на связующем устройстве недостаточно мощна для распределенной платформы, такой как Spark. Параллельное выполнение может стать последовательным, поэтому некоторые примеры сравнения производительности не дадут вам правильных цифр.
В качестве альтернативы вы должны получить достойную производительность, запустив этот образ Docker на своем компьютере:
docker run -p 8888:8888 fugueproject/tutorials:latest
Существует сопутствующее расширение записной книжки для FugueSQL, которое позволяет пользователям использовать магию ячеек %%fsql
. Расширение также обеспечивает подсветку синтаксиса для ячеек FugueSQL. Он работает как для классического блокнота, так и для Jupyter Lab. Более подробную информацию можно найти в инструкции по установке.
Будучи слоем абстракции, Fugue можно легко использовать со многими другими проектами с открытым исходным кодом.
Серверные части Python:
Серверные части FugueSQL:
Fugue доступна в качестве бэкэнда или может интегрироваться со следующими проектами:
Зарегистрированные сторонние расширения (в основном для Fugue SQL) включают:
Не стесняйтесь писать нам в Slack. У нас также есть инструкции по содействию.
Посмотрите некоторые из наших последних презентаций и материалов конференций. Более полный список можно найти на странице «Содержимое» в руководствах.