튜토리얼 | API 문서 | Slack에서 우리와 채팅해보세요! |
---|---|---|
Fugue는 사용자가 최소한의 재작성으로 Spark, Dask 및 Ray에서 Python, Pandas 및 SQL 코드를 실행할 수 있게 해주는 분산 컴퓨팅을 위한 통합 인터페이스입니다 .
푸가는 다음과 같은 경우에 가장 일반적으로 사용됩니다.
Fugue가 dbt, Arrow, Ibis, PySpark Pandas와 같은 다른 프레임워크와 어떻게 비교되는지 보려면 비교를 참조하세요.
Fugue API는 Pandas, Spark, Dask 및 Ray에서 실행할 수 있는 함수 모음입니다. Fugue를 사용하는 가장 간단한 방법은 transform()
함수입니다. 이를 통해 사용자는 단일 기능을 Spark, Dask 또는 Ray로 가져와서 실행을 병렬화할 수 있습니다. 아래 예에서 map_letter_to_food()
함수는 매핑을 가져와 이를 열에 적용합니다. 이것은 지금까지 (Fuge 없이) Pandas와 Python뿐입니다.
import pandas as pd
from typing import Dict
input_df = pd . DataFrame ({ "id" :[ 0 , 1 , 2 ], "value" : ([ "A" , "B" , "C" ])})
map_dict = { "A" : "Apple" , "B" : "Banana" , "C" : "Carrot" }
def map_letter_to_food ( df : pd . DataFrame , mapping : Dict [ str , str ]) -> pd . DataFrame :
df [ "value" ] = df [ "value" ]. map ( mapping )
return df
이제 Fugue의 transform()
함수를 호출하여 map_letter_to_food()
함수를 Spark 실행 엔진으로 가져옵니다. 출력 schema
와 params
transform()
호출에 전달됩니다. schema
분산 프레임워크의 요구 사항이므로 필요합니다. 아래의 "*"
스키마는 모든 입력 열이 출력에 있음을 의미합니다.
from pyspark . sql import SparkSession
from fugue import transform
spark = SparkSession . builder . getOrCreate ()
sdf = spark . createDataFrame ( input_df )
out = transform ( sdf ,
map_letter_to_food ,
schema = "*" ,
params = dict ( mapping = map_dict ),
)
# out is a Spark DataFrame
out . show ()
+---+------+
| id| value |
+---+------+
| 0| Apple |
| 1|Banana |
| 2|Carrot |
+---+------+
from typing import Iterator , Union
from pyspark . sql . types import StructType
from pyspark . sql import DataFrame , SparkSession
spark_session = SparkSession . builder . getOrCreate ()
def mapping_wrapper ( dfs : Iterator [ pd . DataFrame ], mapping ):
for df in dfs :
yield map_letter_to_food ( df , mapping )
def run_map_letter_to_food ( input_df : Union [ DataFrame , pd . DataFrame ], mapping ):
# conversion
if isinstance ( input_df , pd . DataFrame ):
sdf = spark_session . createDataFrame ( input_df . copy ())
else :
sdf = input_df . copy ()
schema = StructType ( list ( sdf . schema . fields ))
return sdf . mapInPandas ( lambda dfs : mapping_wrapper ( dfs , mapping ),
schema = schema )
result = run_map_letter_to_food ( input_df , map_dict )
result . show ()
이 구문은 PySpark에 상응하는 구문보다 더 간단하고 깔끔하며 유지 관리가 더 쉽습니다. 동시에 Spark로 가져오기 위해 원래 Pandas 기반 기능을 편집하지 않았습니다. Pandas DataFrames에서는 계속 사용할 수 있습니다. Fugue transform()
기본 Pandas 기반 엔진과 함께 실행 엔진으로 Dask 및 Ray도 지원합니다.
Fugue API에는 Spark, Dask 및 Ray와도 호환되는 더 광범위한 기능 모음이 있습니다. 예를 들어, load()
및 save()
사용하여 Spark, Dask 및 Ray와 호환되는 엔드투엔드 워크플로를 생성할 수 있습니다. 전체 함수 목록은 최상위 수준 API를 참조하세요.
import fugue . api as fa
def run ( engine = None ):
with fa . engine_context ( engine ):
df = fa . load ( "/path/to/file.parquet" )
out = fa . transform ( df , map_letter_to_food , schema = "*" )
fa . save ( out , "/path/to/output_file.parquet" )
run () # runs on Pandas
run ( engine = "spark" ) # runs on Spark
run ( engine = "dask" ) # runs on Dask
컨텍스트 아래의 모든 기능은 지정된 백엔드에서 실행됩니다. 이를 통해 로컬 실행과 분산 실행 사이를 쉽게 전환할 수 있습니다.
FugueSQL은 Pandas, Spark 및 Dask를 기반으로 엔드투엔드 데이터 워크플로우를 표현할 수 있는 SQL 기반 언어입니다. 위의 map_letter_to_food()
함수는 아래 SQL 표현식에서 사용됩니다. 표준 SQL SELECT
문과 함께 Python 정의 함수를 사용하는 방법입니다.
from fugue . api import fugue_sql
import json
query = """
SELECT id, value
FROM input_df
TRANSFORM USING map_letter_to_food(mapping={{mapping}}) SCHEMA *
"""
map_dict_str = json . dumps ( map_dict )
# returns Pandas DataFrame
fugue_sql ( query , mapping = map_dict_str )
# returns Spark DataFrame
fugue_sql ( query , mapping = map_dict_str , engine = "spark" )
Fugue는 pip나 conda를 통해 설치할 수 있습니다. 예를 들어:
pip install fugue
Fugue SQL을 사용하려면 sql
Extra를 설치하는 것이 좋습니다.
pip install fugue[sql]
또한 다음과 같은 설치 추가 기능도 있습니다.
예를 들어 일반적인 사용 사례는 다음과 같습니다.
pip install " fugue[duckdb,spark] "
Spark 또는 DuckDB를 이미 독립적으로 설치한 경우 Fugue는 추가 항목을 설치하지 않고도 자동으로 사용할 수 있습니다.
Fugue를 시작하는 가장 좋은 방법은 10분 튜토리얼을 통해 작업하는 것입니다.
최상위 API에 대해서는 다음을 참조하세요.
튜토리얼은 바인더나 Docker를 통해 대화형 노트북 환경에서 실행할 수도 있습니다.
바인더의 컴퓨터는 Spark와 같은 분산 프레임워크에 비해 강력하지 않기 때문에 바인더에서 느리게 실행됩니다 . 병렬 실행은 순차적으로 실행될 수 있으므로 일부 성능 비교 예제에서는 올바른 수치를 제공하지 않습니다.
또는 자체 머신에서 이 Docker 이미지를 실행하여 적절한 성능을 얻을 수 있습니다.
docker run -p 8888:8888 fugueproject/tutorials:latest
사용자가 %%fsql
셀 매직을 사용할 수 있게 해주는 FugueSQL용 노트북 확장이 함께 제공됩니다. 확장 기능은 FugueSQL 셀에 대한 구문 강조도 제공합니다. 클래식 노트북과 Jupyter Lab 모두에서 작동합니다. 자세한 내용은 설치 지침에서 확인할 수 있습니다.
추상화 계층으로서 Fugue는 다른 많은 오픈 소스 프로젝트와 원활하게 사용할 수 있습니다.
Python 백엔드:
FugueSQL 백엔드:
Fugue는 백엔드로 사용 가능하거나 다음 프로젝트와 통합될 수 있습니다.
등록된 타사 확장(주로 Fugue SQL용)에는 다음이 포함됩니다.
Slack으로 언제든지 메시지를 보내주세요. 기여 지침도 있습니다.
최신 컨퍼런스 프레젠테이션 및 콘텐츠를 확인하세요. 더 완전한 목록을 보려면 튜토리얼의 콘텐츠 페이지를 확인하세요.