该项目提供了 Scala 和 Python 中 Apache Spark 项目的扩展:
Diff: Dataset
的diff
转换和应用程序,用于计算两个数据集之间的差异,即从一个数据集到另一个数据集要添加、删除或更改哪些行。
SortedGroups: groupByKey
转换,按键对行进行分组,同时为每个组提供排序迭代器。与Dataset.groupByKey.flatMapGroups
类似,但具有迭代器的顺序保证。
直方图[*] :计算值列的直方图 DataFrame 的histogram
转换。
全局行号[*] : withRowNumbers
转换,提供数据集当前顺序或任何给定顺序的全局行号。与需要窗口规范的现有 SQL 函数row_number
相比,此转换提供整个数据集中的行号,而不会出现缩放问题。
分区写入: writePartitionedBy
操作可通过单个操作对Dataset
集进行分区和高效布局。
检查 Parquet 文件[*] :可以通过从简单的 Spark 数据源读取来检查 Parquet 文件的结构(元数据,而不是存储在 Parquet 中的数据),类似于 parquet-tools 或 parquet-cli。这简化了识别 Spark 无法将某些 Parquet 文件分割为可扩展分区的原因。
将 Python 包安装到 PySpark 作业中[*] :通过 PIP 或 Poetry 以编程方式将 Python 依赖项安装到正在运行的 PySpark 作业中 (PySpark ≥ 3.1.0):
# noinspection PyUnresolvedReferences
from gresearch . spark import *
# using PIP
spark . install_pip_package ( "pandas==1.4.3" , "pyarrow" )
spark . install_pip_package ( "-r" , "requirements.txt" )
# using Poetry
spark . install_poetry_project ( "../my-poetry-project/" , poetry_python = "../venv-poetry/bin/python" )
流畅的方法调用: T.call(transformation: T => R): R
:将不属于T
的转换T => R
转换为T
上的流畅方法调用。这允许编写流畅的代码,例如:
import uk . co . gresearch . _
i.doThis()
.doThat()
.call(transformation)
.doMore()
流畅的条件方法调用: T.when(condition: Boolean).call(transformation: T => T): T
:仅当给定条件为 true 时才能流畅地执行转换。这允许编写流畅的代码,例如:
import uk . co . gresearch . _
i.doThis()
.doThat()
.when(condition).call(transformation)
.doMore()
groupBy.as 的快捷方式:只要可能,应优先调用Dataset.groupBy(Column*).as[K, T]
而不是调用Dataset.groupByKey(V => K)
。前者允许 Catalyst 利用数据集的现有分区和排序,而后者向 Catalyst 隐藏哪些列用于创建键。这可能会严重影响性能。
新的基于列表达式的groupByKey[K](Column*)
方法可以更轻松地按列表达式键进行分组。而不是
ds.groupBy($"id").as[Int, V]
使用:
ds.groupByKey[Int]($"id")
Backticks: backticks(string: String, strings: String*): String)
:需要时用反引号 ( `
) 将给定的列名括起来。这是确保带有点 ( .
) 等特殊字符的列名与col()
或select()
配合使用的便捷方法。
计算空值: count_null(e: Column)
:类似于count
聚合函数,用于计算列e
中的空值。这相当于调用count(when(e.isNull, lit(1)))
。
.Net DateTime.Ticks [*] :将 .Net(C#、F#、Visual Basic) DateTime.Ticks
转换为 Spark 时间戳、秒和纳秒。
// Scala
dotNetTicksToTimestamp( Column ) : Column // returns timestamp as TimestampType
dotNetTicksToUnixEpoch( Column ) : Column // returns Unix epoch seconds as DecimalType
dotNetTicksToUnixEpochNanos( Column ) : Column // returns Unix epoch nanoseconds as LongType
反向操作由以下方式提供(所有返回LongType
.Net 刻度):
// Scala
timestampToDotNetTicks( Column ) : Column
unixEpochToDotNetTicks( Column ) : Column
unixEpochNanosToDotNetTicks( Column ) : Column
这些方法在 Python 中也可用:
# Python
dotnet_ticks_to_timestamp ( column_or_name ) # returns timestamp as TimestampType
dotnet_ticks_to_unix_epoch ( column_or_name ) # returns Unix epoch seconds as DecimalType
dotnet_ticks_to_unix_epoch_nanos ( column_or_name ) # returns Unix epoch nanoseconds as LongType
timestamp_to_dotnet_ticks ( column_or_name )
unix_epoch_to_dotnet_ticks ( column_or_name )
unix_epoch_nanos_to_dotnet_ticks ( column_or_name )
Spark 临时目录[*] :创建一个临时目录,该目录将在 Spark 应用程序关闭时删除。
斯卡拉:
import uk . co . gresearch . spark . createTemporaryDir
val dir = createTemporaryDir( " prefix " )
Python:
# noinspection PyUnresolvedReferences
from gresearch . spark import *
dir = spark . create_temporary_dir ( "prefix" )
Spark 作业描述[*] :为上下文中的所有 Spark 作业设置 Spark 作业描述。
import uk . co . gresearch . spark . _
implicit val session : SparkSession = spark
withJobDescription( " parquet file " ) {
val df = spark.read.parquet( " data.parquet " )
val count = appendJobDescription( " count " ) {
df.count
}
appendJobDescription( " write " ) {
df.write.csv( " data.csv " )
}
}
没有职位描述 | 有职位描述 |
---|---|
请注意,在一个线程中设置描述,同时在另一线程中调用操作(例如.count
)是不起作用的,除非在设置描述后从当前线程生成不同的线程。
并行集合的工作示例:
import java . util . concurrent . ForkJoinPool
import scala . collection . parallel . CollectionConverters . seqIsParallelizable
import scala . collection . parallel . ForkJoinTaskSupport
val files = Seq ( " data1.csv " , " data2.csv " ).par
val counts = withJobDescription( " Counting rows " ) {
// new thread pool required to spawn new threads from this thread
// so that the job description is actually used
files.tasksupport = new ForkJoinTaskSupport ( new ForkJoinPool ())
files.map(filename => spark.read.csv(filename).count).sum
}(spark)
spark-extension
包适用于所有 Spark 3.2、3.3、3.4 和 3.5 版本。一些早期的 Spark 版本也可能受支持。包版本具有以下语义: spark-extension_{SCALA_COMPAT_VERSION}-{VERSION}-{SPARK_COMPAT_VERSION}
:
SCALA_COMPAT_VERSION
:Scala 二进制兼容性(次要)版本。可用版本有2.12
和2.13
。SPARK_COMPAT_VERSION
:Apache Spark 二进制兼容性(次要)版本。可用的有3.2
3.4
3.3
3.5
VERSION
:软件包版本,例如2.10.0
。将此行添加到您的build.sbt
文件中:
libraryDependencies + = " uk.co.gresearch.spark " %% " spark-extension " % " 2.13.0-3.5 "
将此依赖项添加到您的pom.xml
文件中:
< dependency >
< groupId >uk.co.gresearch.spark</ groupId >
< artifactId >spark-extension_2.12</ artifactId >
< version >2.13.0-3.5</ version >
</ dependency >
将此依赖项添加到您的build.gradle
文件中:
dependencies {
implementation " uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5 "
}
提交具有 Spark 扩展依赖项(版本 ≥1.1.0)的 Spark 应用程序,如下所示:
spark-submit --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5 [jar]
注意:根据您的 Spark 版本选择正确的 Scala 版本(此处为 2.12)和 Spark 版本(此处为 3.5)。
启动具有 Spark 扩展依赖项(版本 ≥1.1.0)的 Spark Shell,如下所示:
spark-shell --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5
注意:根据您的 Spark Shell 版本选择正确的 Scala 版本(此处为 2.12)和 Spark 版本(此处为 3.5)。
使用 Spark 扩展依赖项(版本 ≥1.1.0)启动 PySpark 会话,如下所示:
from pyspark . sql import SparkSession
spark = SparkSession
. builder
. config ( "spark.jars.packages" , "uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5" )
. getOrCreate ()
注意:根据您的 PySpark 版本选择正确的 Scala 版本(此处为 2.12)和 Spark 版本(此处为 3.5)。
启动具有 Spark 扩展依赖项(版本 ≥1.1.0)的 Python Spark REPL,如下所示:
pyspark --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5
注意:根据您的 PySpark 版本选择正确的 Scala 版本(此处为 2.12)和 Spark 版本(此处为 3.5)。
spark-submit
通过spark-submit
运行使用PySpark的Python脚本:
spark-submit --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5 [script.py]
注意:根据您的 Spark 版本选择正确的 Scala 版本(此处为 2.12)和 Spark 版本(此处为 3.5)。
您可能需要将 PyPi 中的pyspark-extension
python 包安装到您的开发环境中。这为您提供了开发阶段的代码补全、键入和测试功能。
在 Spark 集群上运行 Python 应用程序仍需要使用上述方法之一将 Scala 包添加到 Spark 环境。
pip install pyspark-extension==2.13.0.3.5
注意:根据您的 PySpark 版本选择正确的 Spark 版本(此处为 3.5)。
周围有很多数据科学笔记本。要使用此库,请使用以下Maven 坐标将jar 依赖项添加到您的笔记本:
uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5
或者下载 jar 并将其放置在笔记本可以访问的文件系统上,然后直接引用该 jar 文件。
检查您最喜欢的笔记本的文档,了解如何将 jar 添加到您的 Spark 环境。
Python 中的大多数功能不支持与 Spark Connect 服务器结合使用。这也适用于 Databricks 运行时环境 13.x 及更高版本。详细信息可以在这个博客中找到。
连接到 Spark Connect 服务器时调用任何这些功能都会引发此错误:
This feature is not supported for Spark Connect.
请改用 Spark 集群的经典连接。
您可以针对不同版本的 Spark 和 Scala 构建此项目。
如果您想要构建与pom.xml
文件中定义的版本不同的 Spark 或 Scala 版本,请运行
sh set-version.sh [SPARK-VERSION] [SCALA-VERSION]
例如,通过运行sh set-version.sh 3.5.0 2.13.8
切换到 Spark 3.5.0 和 Scala 2.13.8 。
然后执行mvn package
从源创建一个 jar。它可以在target/
中找到。
通过mvn test
运行 Scala 测试。
为了运行 Python 测试,请按如下所示设置 Python 环境(将[SCALA-COMPAT-VERSION]
和[SPARK-COMPAT-VERSION]
替换为相应的值):
virtualenv -p python3 venv
source venv/bin/activate
pip install -r python/requirements-[SPARK-COMPAT-VERSION]_[SCALA-COMPAT-VERSION].txt
pip install pytest
通过env PYTHONPATH=python:python/test python -m pytest python/test
运行 Python 测试。
注意:您首先必须构建 Scala 源代码。
在项目根目录中运行以下命令序列:
mkdir -p python/pyspark/jars/
cp -v target/spark-extension_ * - * .jar python/pyspark/jars/
pip install build
然后执行python -m build python/
从源创建一个whl。它可以在python/dist/
中找到。