此專案提供了 Scala 和 Python 中 Apache Spark 專案的擴充:
Diff: Dataset
的diff
轉換和應用程序,用於計算兩個資料集之間的差異,即從一個資料集到另一個資料集要新增、刪除或更改哪些行。
SortedGroups: groupByKey
轉換,按鍵將行分組,同時為每個群組提供排序迭代器。與Dataset.groupByKey.flatMapGroups
類似,但具有迭代器的順序保證。
直方圖[*] :計算值列的直方圖 DataFrame 的histogram
轉換。
全域行號[*] : withRowNumbers
轉換,提供資料集目前順序或任何給定順序的全域行號。與需要視窗規範的現有 SQL 函數row_number
相比,此轉換提供整個資料集中的行號,而不會出現縮放問題。
分區寫入: writePartitionedBy
操作可透過單一操作對Dataset
集進行分區和高效佈局。
檢查 Parquet 檔案[*] :可以從簡單的 Spark 資料來源讀取來檢查 Parquet 檔案的結構(元數據,而不是儲存在 Parquet 中的資料),類似於 parquet-tools 或 parquet-cli。這簡化了識別 Spark 無法將某些 Parquet 檔案分割為可擴展分區的原因。
將 Python 套件安裝到 PySpark 作業中[*] :透過 PIP 或 Poetry 以程式設計方式將 Python 依賴項安裝到正在執行的 PySpark 作業中 (PySpark ≥ 3.1.0):
# noinspection PyUnresolvedReferences
from gresearch . spark import *
# using PIP
spark . install_pip_package ( "pandas==1.4.3" , "pyarrow" )
spark . install_pip_package ( "-r" , "requirements.txt" )
# using Poetry
spark . install_poetry_project ( "../my-poetry-project/" , poetry_python = "../venv-poetry/bin/python" )
流暢的方法呼叫: T.call(transformation: T => R): R
:將不屬於T
的轉換T => R
轉換為T
上的流暢方法呼叫。這允許編寫流暢的程式碼,例如:
import uk . co . gresearch . _
i.doThis()
.doThat()
.call(transformation)
.doMore()
流暢的條件方法呼叫: T.when(condition: Boolean).call(transformation: T => T): T
:僅當給定條件為 true 時才能流暢地執行轉換。這允許編寫流暢的程式碼,例如:
import uk . co . gresearch . _
i.doThis()
.doThat()
.when(condition).call(transformation)
.doMore()
groupBy.as 的捷徑:只要可能,應優先呼叫Dataset.groupBy(Column*).as[K, T]
而不是呼叫Dataset.groupByKey(V => K)
。前者允許 Catalyst 利用資料集的現有分區和排序,而後者向 Catalyst 隱藏哪些欄位用於建立鍵。這可能會嚴重影響效能。
新的基於清單達式的groupByKey[K](Column*)
方法可以更輕鬆地按下清單達式鍵進行分組。而不是
ds.groupBy($"id").as[Int, V]
使用:
ds.groupByKey[Int]($"id")
Backticks: backticks(string: String, strings: String*): String)
:需要時用反引號 ( `
) 將給定的列名括起來。這是確保有點 ( .
) 等特殊字元的列名與col()
或select()
配合使用的便捷方法。
計算空值: count_null(e: Column)
:類似count
聚合函數,用於計算列e
中的空值。這相當於呼叫count(when(e.isNull, lit(1)))
。
.Net DateTime.Ticks [*] :將 .Net(C#、F#、Visual Basic) DateTime.Ticks
轉換為 Spark 時間戳、秒和奈秒。
// Scala
dotNetTicksToTimestamp( Column ) : Column // returns timestamp as TimestampType
dotNetTicksToUnixEpoch( Column ) : Column // returns Unix epoch seconds as DecimalType
dotNetTicksToUnixEpochNanos( Column ) : Column // returns Unix epoch nanoseconds as LongType
反向操作由以下方式提供(所有返回LongType
.Net 刻度):
// Scala
timestampToDotNetTicks( Column ) : Column
unixEpochToDotNetTicks( Column ) : Column
unixEpochNanosToDotNetTicks( Column ) : Column
這些方法在 Python 中也可用:
# Python
dotnet_ticks_to_timestamp ( column_or_name ) # returns timestamp as TimestampType
dotnet_ticks_to_unix_epoch ( column_or_name ) # returns Unix epoch seconds as DecimalType
dotnet_ticks_to_unix_epoch_nanos ( column_or_name ) # returns Unix epoch nanoseconds as LongType
timestamp_to_dotnet_ticks ( column_or_name )
unix_epoch_to_dotnet_ticks ( column_or_name )
unix_epoch_nanos_to_dotnet_ticks ( column_or_name )
Spark 暫存目錄[*] :建立一個暫存目錄,該目錄會在 Spark 應用程式關閉時刪除。
斯卡拉:
import uk . co . gresearch . spark . createTemporaryDir
val dir = createTemporaryDir( " prefix " )
Python:
# noinspection PyUnresolvedReferences
from gresearch . spark import *
dir = spark . create_temporary_dir ( "prefix" )
Spark 作業描述[*] :為所有上下文中的 Spark 作業設定 Spark 作業描述。
import uk . co . gresearch . spark . _
implicit val session : SparkSession = spark
withJobDescription( " parquet file " ) {
val df = spark.read.parquet( " data.parquet " )
val count = appendJobDescription( " count " ) {
df.count
}
appendJobDescription( " write " ) {
df.write.csv( " data.csv " )
}
}
沒有職位描述 | 有職位描述 |
---|---|
請注意,在一個執行緒中設定描述,同時在另一個執行緒中呼叫操作(例如.count
)是不起作用的,除非在設定描述後從目前執行緒產生不同的執行緒。
並行集合的工作範例:
import java . util . concurrent . ForkJoinPool
import scala . collection . parallel . CollectionConverters . seqIsParallelizable
import scala . collection . parallel . ForkJoinTaskSupport
val files = Seq ( " data1.csv " , " data2.csv " ).par
val counts = withJobDescription( " Counting rows " ) {
// new thread pool required to spawn new threads from this thread
// so that the job description is actually used
files.tasksupport = new ForkJoinTaskSupport ( new ForkJoinPool ())
files.map(filename => spark.read.csv(filename).count).sum
}(spark)
spark-extension
套件適用於所有 Spark 3.2、3.3、3.4 和 3.5 版本。一些早期的 Spark 版本也可能受支援。套件版本有以下語意: spark-extension_{SCALA_COMPAT_VERSION}-{VERSION}-{SPARK_COMPAT_VERSION}
:
SCALA_COMPAT_VERSION
:Scala 二進位相容性(次要)版本。可用版本有2.12
和2.13
。SPARK_COMPAT_VERSION
:Apache Spark 二進位相容性(次要)版本。可用的有3.2
3.4
3.3
3.5
VERSION
:軟體包版本,例如2.10.0
。將此行新增至您的build.sbt
檔案:
libraryDependencies + = " uk.co.gresearch.spark " %% " spark-extension " % " 2.13.0-3.5 "
將此依賴項新增至您的pom.xml
檔案:
< dependency >
< groupId >uk.co.gresearch.spark</ groupId >
< artifactId >spark-extension_2.12</ artifactId >
< version >2.13.0-3.5</ version >
</ dependency >
將此依賴項新增至您的build.gradle
檔案:
dependencies {
implementation " uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5 "
}
提交具有 Spark 擴充依賴項(版本 ≥1.1.0)的 Spark 應用程序,如下所示:
spark-submit --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5 [jar]
注意:根據您的 Spark 版本選擇正確的 Scala 版本(此處為 2.12)和 Spark 版本(此處為 3.5)。
啟動具有 Spark 擴充依賴項(版本 ≥1.1.0)的 Spark Shell,如下所示:
spark-shell --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5
注意:根據您的 Spark Shell 版本選擇正確的 Scala 版本(此處為 2.12)和 Spark 版本(此處為 3.5)。
使用 Spark 擴充依賴項(版本 ≥1.1.0)啟動 PySpark 會話,如下所示:
from pyspark . sql import SparkSession
spark = SparkSession
. builder
. config ( "spark.jars.packages" , "uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5" )
. getOrCreate ()
注意:根據您的 PySpark 版本選擇正確的 Scala 版本(此處為 2.12)和 Spark 版本(此處為 3.5)。
啟動具有 Spark 擴充依賴項(版本 ≥1.1.0)的 Python Spark REPL,如下所示:
pyspark --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5
注意:根據您的 PySpark 版本選擇正確的 Scala 版本(此處為 2.12)和 Spark 版本(此處為 3.5)。
spark-submit
透過spark-submit
運行使用PySpark的Python腳本:
spark-submit --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5 [script.py]
注意:根據您的 Spark 版本選擇正確的 Scala 版本(此處為 2.12)和 Spark 版本(此處為 3.5)。
您可能需要將 PyPi 中的pyspark-extension
python 套件安裝到您的開發環境中。這為您提供了開發階段的程式碼補全、鍵入和測試功能。
在 Spark 叢集上運行 Python 應用程式仍需要使用上述方法之一將 Scala 套件新增至 Spark 環境。
pip install pyspark-extension==2.13.0.3.5
注意:根據您的 PySpark 版本選擇正確的 Spark 版本(此處為 3.5)。
周圍有很多數據科學筆記本。若要使用此庫,請使用下列Maven 座標將jar 依賴項新增至您的筆記本:
uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5
或者下載 jar 並將其放置在筆記本可以存取的檔案系統上,然後直接引用該 jar 檔案。
檢查您最喜歡的筆記本的文檔,了解如何將 jar 添加到您的 Spark 環境。
Python 中的大多數功能不支援與 Spark Connect 伺服器結合使用。這也適用於 Databricks 運行時環境 13.x 及更高版本。詳細資訊可以在這個部落格中找到。
連接到 Spark Connect 伺服器時呼叫任何這些功能都會引發此錯誤:
This feature is not supported for Spark Connect.
請改用 Spark 叢集的經典連線。
您可以針對不同版本的 Spark 和 Scala 建立此專案。
如果您想要建立與pom.xml
檔案中定義的版本不同的 Spark 或 Scala 版本,請執行
sh set-version.sh [SPARK-VERSION] [SCALA-VERSION]
例如,透過運行sh set-version.sh 3.5.0 2.13.8
切換到 Spark 3.5.0 和 Scala 2.13.8 。
然後執行mvn package
從來源建立一個 jar。它可以在target/
中找到。
透過mvn test
運行 Scala 測試。
為了執行 Python 測試,請如下所示設定 Python 環境(將[SCALA-COMPAT-VERSION]
和[SPARK-COMPAT-VERSION]
替換為對應的值):
virtualenv -p python3 venv
source venv/bin/activate
pip install -r python/requirements-[SPARK-COMPAT-VERSION]_[SCALA-COMPAT-VERSION].txt
pip install pytest
透過env PYTHONPATH=python:python/test python -m pytest python/test
運行 Python 測試。
注意:您首先必須建立 Scala 原始碼。
在專案根目錄中執行以下命令序列:
mkdir -p python/pyspark/jars/
cp -v target/spark-extension_ * - * .jar python/pyspark/jars/
pip install build
然後執行python -m build python/
從來源建立一個whl。它可以在python/dist/
中找到。