このプロジェクトは、Scala と Python の Apache Spark プロジェクトへの拡張機能を提供します。
Diff: 2 つのデータセット間の差異、つまり、一方のデータセットから他方のデータセットに取得するためにどの行を追加、削除、または変更するかを計算するDataset
のdiff
変換およびアプリケーション。
SortedGroups:各グループにソートされたイテレータを提供しながら、キーによって行をgroupByKey
変換。 Dataset.groupByKey.flatMapGroups
に似ていますが、反復子の順序が保証されています。
Histogram [*] :値列のヒストグラム DataFrame を計算するhistogram
変換。
Global Row Number [*] :データセットの現在の順序、または任意の順序に関するグローバル行番号を提供するwithRowNumbers
変換。ウィンドウ仕様を必要とする既存の SQL 関数row_number
とは対照的に、この変換はスケーリングの問題を発生させることなくデータセット全体の行番号を提供します。
パーティション化された書き込み: writePartitionedBy
アクションは、 Dataset
をパーティション化して書き込み、1 回の操作で効率的にレイアウトします。
Parquet ファイルの検査[*] : Parquet ファイルの構造 (Parquet に保存されているデータではなくメタデータ) は、単純な Spark データ ソースから読み取ることで、parquet-tools または parquet-cli と同様に検査できます。これにより、一部の Parquet ファイルを Spark でスケーラブルなパーティションに分割できない理由の特定が簡単になります。
Python パッケージを PySpark ジョブにインストール[*] : PIP または Poetry を介して、実行中の PySpark ジョブにプログラム的に Python の依存関係をインストールします (PySpark ≥ 3.1.0)。
# noinspection PyUnresolvedReferences
from gresearch . spark import *
# using PIP
spark . install_pip_package ( "pandas==1.4.3" , "pyarrow" )
spark . install_pip_package ( "-r" , "requirements.txt" )
# using Poetry
spark . install_poetry_project ( "../my-poetry-project/" , poetry_python = "../venv-poetry/bin/python" )
流暢なメソッド呼び出し: T.call(transformation: T => R): R
: T
の一部ではない変換T => R
T
での流暢なメソッド呼び出しに変換します。これにより、次のような流暢なコードを書くことができます。
import uk . co . gresearch . _
i.doThis()
.doThat()
.call(transformation)
.doMore()
流暢な条件付きメソッド呼び出し: T.when(condition: Boolean).call(transformation: T => T): T
: 指定された条件が true の場合にのみ、変換を流暢に実行します。これにより、次のような流暢なコードを書くことができます。
import uk . co . gresearch . _
i.doThis()
.doThat()
.when(condition).call(transformation)
.doMore()
groupBy.as のショートカット: 可能な限り、 Dataset.groupByKey(V => K)
を呼び出すよりもDataset.groupBy(Column*).as[K, T]
呼び出すことを優先する必要があります。前者は Catalyst がデータセットの既存のパーティショニングと順序付けを利用できるようにしますが、後者はキーの作成にどの列が使用されるかを Catalyst から隠します。これにより、パフォーマンスが大幅に低下する可能性があります。
新しい列式ベースのgroupByKey[K](Column*)
メソッドを使用すると、列式キーによるグループ化が簡単になります。の代わりに
ds.groupBy($"id").as[Int, V]
使用:
ds.groupByKey[Int]($"id")
バッククォート: backticks(string: String, strings: String*): String)
: 必要に応じて、指定された列名をバッククォート ( `
) で囲みます。これは、ドット ( .
) などの特殊文字を含む列名がcol()
またはselect()
で確実に機能するようにする便利な方法です。
null 値をカウントする: count_null(e: Column)
: 列e
の null 値をカウントするcount
のような集計関数。これはcount(when(e.isNull, lit(1)))
を呼び出すことと同じです。
.Net DateTime.Ticks [*] : .Net (C#、F#、Visual Basic) DateTime.Ticks
を Spark タイムスタンプ、秒、およびナノ秒に変換します。
// Scala
dotNetTicksToTimestamp( Column ) : Column // returns timestamp as TimestampType
dotNetTicksToUnixEpoch( Column ) : Column // returns Unix epoch seconds as DecimalType
dotNetTicksToUnixEpochNanos( Column ) : Column // returns Unix epoch nanoseconds as LongType
逆は次のようになります (すべてLongType
.Net ティックを返します)。
// Scala
timestampToDotNetTicks( Column ) : Column
unixEpochToDotNetTicks( Column ) : Column
unixEpochNanosToDotNetTicks( Column ) : Column
これらのメソッドは Python でも使用できます。
# Python
dotnet_ticks_to_timestamp ( column_or_name ) # returns timestamp as TimestampType
dotnet_ticks_to_unix_epoch ( column_or_name ) # returns Unix epoch seconds as DecimalType
dotnet_ticks_to_unix_epoch_nanos ( column_or_name ) # returns Unix epoch nanoseconds as LongType
timestamp_to_dotnet_ticks ( column_or_name )
unix_epoch_to_dotnet_ticks ( column_or_name )
unix_epoch_nanos_to_dotnet_ticks ( column_or_name )
Spark 一時ディレクトリ[*] : Spark アプリケーションのシャットダウン時に削除される一時ディレクトリを作成します。
スカラ:
import uk . co . gresearch . spark . createTemporaryDir
val dir = createTemporaryDir( " prefix " )
パイソン:
# noinspection PyUnresolvedReferences
from gresearch . spark import *
dir = spark . create_temporary_dir ( "prefix" )
Spark ジョブの説明[*] :コンテキスト内のすべての Spark ジョブの Spark ジョブの説明を設定します。
import uk . co . gresearch . spark . _
implicit val session : SparkSession = spark
withJobDescription( " parquet file " ) {
val df = spark.read.parquet( " data.parquet " )
val count = appendJobDescription( " count " ) {
df.count
}
appendJobDescription( " write " ) {
df.write.csv( " data.csv " )
}
}
職務記述書なし | 仕事内容説明付き |
---|---|
説明を設定した後に別のスレッドが現在のスレッドから生成されない限り、あるスレッドで説明を設定しながら別のスレッドでアクション (例: .count
) を呼び出すことは機能しないことに注意してください。
並列コレクションを使用した実際の例:
import java . util . concurrent . ForkJoinPool
import scala . collection . parallel . CollectionConverters . seqIsParallelizable
import scala . collection . parallel . ForkJoinTaskSupport
val files = Seq ( " data1.csv " , " data2.csv " ).par
val counts = withJobDescription( " Counting rows " ) {
// new thread pool required to spawn new threads from this thread
// so that the job description is actually used
files.tasksupport = new ForkJoinTaskSupport ( new ForkJoinPool ())
files.map(filename => spark.read.csv(filename).count).sum
}(spark)
spark-extension
パッケージは、Spark 3.2、3.3、3.4、および 3.5 のすべてのバージョンで利用できます。一部の以前の Spark バージョンもサポートされている場合があります。パッケージのバージョンのセマンティクスは次のとおりです: spark-extension_{SCALA_COMPAT_VERSION}-{VERSION}-{SPARK_COMPAT_VERSION}
:
SCALA_COMPAT_VERSION
: Scala バイナリ互換性 (マイナー) バージョン。利用可能なバージョンは2.12
と2.13
です。SPARK_COMPAT_VERSION
: Apache Spark バイナリ互換性 (マイナー) バージョン。利用可能なのは3.2
、 3.3
、 3.4
および3.5
です。VERSION
: パッケージのバージョン (例: 2.10.0
。次の行をbuild.sbt
ファイルに追加します。
libraryDependencies + = " uk.co.gresearch.spark " %% " spark-extension " % " 2.13.0-3.5 "
この依存関係をpom.xml
ファイルに追加します。
< dependency >
< groupId >uk.co.gresearch.spark</ groupId >
< artifactId >spark-extension_2.12</ artifactId >
< version >2.13.0-3.5</ version >
</ dependency >
この依存関係をbuild.gradle
ファイルに追加します。
dependencies {
implementation " uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5 "
}
次のように、Spark 拡張機能の依存関係 (バージョン ≥1.1.0) を使用して Spark アプリを送信します。
spark-submit --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5 [jar]
注: Spark のバージョンに応じて、適切な Scala バージョン (ここでは 2.12) と Spark バージョン (ここでは 3.5) を選択してください。
次のように、Spark Extension の依存関係 (バージョン 1.1.0 以上) を使用して Spark Shell を起動します。
spark-shell --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5
注: Spark Shell のバージョンに応じて、適切な Scala バージョン (ここでは 2.12) と Spark バージョン (ここでは 3.5) を選択します。
次のように、Spark Extension の依存関係 (バージョン ≥1.1.0) を使用して PySpark セッションを開始します。
from pyspark . sql import SparkSession
spark = SparkSession
. builder
. config ( "spark.jars.packages" , "uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5" )
. getOrCreate ()
注: PySpark のバージョンに応じて、適切な Scala バージョン (ここでは 2.12) と Spark バージョン (ここでは 3.5) を選択してください。
次のように、Spark Extension の依存関係 (バージョン ≥1.1.0) を使用して Python Spark REPL を起動します。
pyspark --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5
注: PySpark のバージョンに応じて、適切な Scala バージョン (ここでは 2.12) と Spark バージョン (ここでは 3.5) を選択してください。
spark-submit
spark-submit
経由で PySpark を使用する Python スクリプトを実行します。
spark-submit --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5 [script.py]
注: Spark のバージョンに応じて、適切な Scala バージョン (ここでは 2.12) と Spark バージョン (ここでは 3.5) を選択してください。
PyPi から開発環境にpyspark-extension
Python パッケージをインストールすることもできます。これにより、開発段階でコード補完、入力、テストの機能が提供されます。
Spark クラスター上で Python アプリケーションを実行するには、上記のいずれかの方法で Scala パッケージを Spark 環境に追加する必要があります。
pip install pyspark-extension==2.13.0.3.5
注: PySpark のバージョンに応じて、適切な Spark バージョン (ここでは 3.5) を選択してください。
世の中にはデータ サイエンス ノートブックがたくさんあります。このライブラリを使用するには、次のMaven 座標を使用して、 jar 依存関係をノートブックに追加します。
uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5
または、jar をダウンロードして、ノートブックからアクセスできるファイル システムに配置し、その jar ファイルを直接参照します。
Spark 環境に jar を追加する方法については、お気に入りのノートブックのドキュメントを確認してください。
ほとんどの機能は、Spark Connect サーバーと組み合わせたPython ではサポートされていません。これは、Databricks ランタイム環境 13.x 以降にも当てはまります。詳細はこのブログでご覧いただけます。
Spark Connect サーバーに接続しているときにこれらの機能のいずれかを呼び出すと、次のエラーが発生します。
This feature is not supported for Spark Connect.
代わりに、Spark クラスターへのクラシック接続を使用してください。
このプロジェクトは、Spark と Scala のさまざまなバージョンに対してビルドできます。
pom.xml
ファイルで定義されているものとは異なる Spark または Scala バージョン用にビルドする場合は、次を実行します。
sh set-version.sh [SPARK-VERSION] [SCALA-VERSION]
たとえば、 sh set-version.sh 3.5.0 2.13.8
を実行して、Spark 3.5.0 および Scala 2.13.8 に切り替えます。
次に、 mvn package
実行してソースから jar を作成します。これはtarget/
にあります。
mvn test
経由で Scala テストを実行します。
Python テストを実行するには、次のように Python 環境をセットアップします ( [SCALA-COMPAT-VERSION]
と[SPARK-COMPAT-VERSION]
をそれぞれの値に置き換えます)。
virtualenv -p python3 venv
source venv/bin/activate
pip install -r python/requirements-[SPARK-COMPAT-VERSION]_[SCALA-COMPAT-VERSION].txt
pip install pytest
env PYTHONPATH=python:python/test python -m pytest python/test
を介して Python テストを実行します。
注: 最初に Scala ソースをビルドする必要があります。
プロジェクトのルート ディレクトリで次の一連のコマンドを実行します。
mkdir -p python/pyspark/jars/
cp -v target/spark-extension_ * - * .jar python/pyspark/jars/
pip install build
次に、 python -m build python/
を実行して、ソースから whl を作成します。これはpython/dist/
にあります。