Dieses Projekt bietet Erweiterungen zum Apache Spark-Projekt in Scala und Python:
Diff: Eine diff
-Transformation und Anwendung für Dataset
, die die Unterschiede zwischen zwei Datasets berechnet, d. h. welche Zeilen hinzugefügt , gelöscht oder geändert werden müssen, um von einem Dataset zum anderen zu gelangen.
SortedGroups: Eine groupByKey
-Transformation, die Zeilen nach einem Schlüssel gruppiert und gleichzeitig einen sortierten Iterator für jede Gruppe bereitstellt. Ähnlich wie Dataset.groupByKey.flatMapGroups
, jedoch mit Reihenfolgegarantien für den Iterator.
Histogramm [*] : Eine histogram
, die den Histogramm-DataFrame für eine Wertespalte berechnet.
Globale Zeilennummer [*] : Eine withRowNumbers
Transformation, die die globale Zeilennummer in Bezug auf die aktuelle Reihenfolge des Datensatzes oder eine beliebige bestimmte Reihenfolge bereitstellt. Im Gegensatz zur bestehenden SQL-Funktion row_number
, die eine Fensterspezifikation erfordert, stellt diese Transformation die Zeilennummer im gesamten Datensatz ohne Skalierungsprobleme bereit.
Partitioniertes Schreiben: Die Aktion writePartitionedBy
schreibt Ihren Dataset
partitioniert und effizient angeordnet mit einem einzigen Vorgang.
Parquet-Dateien prüfen [*] : Die Struktur von Parquet-Dateien (die Metadaten, nicht die in Parquet gespeicherten Daten) kann ähnlich wie bei parquet-tools oder parquet-cli durch Lesen aus einer einfachen Spark-Datenquelle überprüft werden. Dies vereinfacht die Identifizierung, warum einige Parquet-Dateien von Spark nicht in skalierbare Partitionen aufgeteilt werden können.
Installieren Sie Python-Pakete im PySpark-Job [*] : Installieren Sie Python-Abhängigkeiten über PIP oder Poetry programmgesteuert in Ihren laufenden PySpark-Job (PySpark ≥ 3.1.0):
# noinspection PyUnresolvedReferences
from gresearch . spark import *
# using PIP
spark . install_pip_package ( "pandas==1.4.3" , "pyarrow" )
spark . install_pip_package ( "-r" , "requirements.txt" )
# using Poetry
spark . install_poetry_project ( "../my-poetry-project/" , poetry_python = "../venv-poetry/bin/python" )
Fluent-Methodenaufruf: T.call(transformation: T => R): R
: Wandelt eine Transformation T => R
, die nicht Teil von T
ist, in einen Fluent-Methodenaufruf für T
um. Dies ermöglicht das Schreiben von flüssigem Code wie:
import uk . co . gresearch . _
i.doThis()
.doThat()
.call(transformation)
.doMore()
Fließender bedingter Methodenaufruf: T.when(condition: Boolean).call(transformation: T => T): T
: Führen Sie eine Transformation nur dann fließend durch, wenn die gegebene Bedingung wahr ist. Dies ermöglicht das Schreiben von flüssigem Code wie:
import uk . co . gresearch . _
i.doThis()
.doThat()
.when(condition).call(transformation)
.doMore()
Abkürzung für groupBy.as : Der Aufruf Dataset.groupBy(Column*).as[K, T]
sollte nach Möglichkeit dem Aufruf Dataset.groupByKey(V => K)
vorgezogen werden. Ersteres ermöglicht es Catalyst, die vorhandene Partitionierung und Reihenfolge des Datensatzes auszunutzen, während letzteres Catalyst verbirgt, welche Spalten zum Erstellen der Schlüssel verwendet werden. Dies kann zu erheblichen Leistungseinbußen führen.
Die neue auf Spaltenausdrücken basierende Methode groupByKey[K](Column*)
erleichtert die Gruppierung nach einem Spaltenausdrucksschlüssel. Anstatt
ds.groupBy($"id").as[Int, V]
verwenden:
ds.groupByKey[Int]($"id")
Backticks: backticks(string: String, strings: String*): String)
: Schließt den angegebenen Spaltennamen bei Bedarf mit Backticks ( `
) ein. Dies ist eine praktische Methode, um sicherzustellen, dass Spaltennamen mit Sonderzeichen wie Punkten ( .
) mit col()
oder select()
funktionieren.
Nullwerte zählen: count_null(e: Column)
: eine Aggregationsfunktion wie count
die Nullwerte in Spalte e
zählt. Dies entspricht dem Aufruf von count(when(e.isNull, lit(1)))
.
.Net DateTime.Ticks [*] : Konvertieren Sie .Net (C#, F#, Visual Basic) DateTime.Ticks
in Spark-Zeitstempel, Sekunden und Nanosekunden.
// Scala
dotNetTicksToTimestamp( Column ) : Column // returns timestamp as TimestampType
dotNetTicksToUnixEpoch( Column ) : Column // returns Unix epoch seconds as DecimalType
dotNetTicksToUnixEpochNanos( Column ) : Column // returns Unix epoch nanoseconds as LongType
Das Gegenteil wird durch (alle geben LongType
.Net-Ticks zurück) bereitgestellt:
// Scala
timestampToDotNetTicks( Column ) : Column
unixEpochToDotNetTicks( Column ) : Column
unixEpochNanosToDotNetTicks( Column ) : Column
Diese Methoden sind auch in Python verfügbar:
# Python
dotnet_ticks_to_timestamp ( column_or_name ) # returns timestamp as TimestampType
dotnet_ticks_to_unix_epoch ( column_or_name ) # returns Unix epoch seconds as DecimalType
dotnet_ticks_to_unix_epoch_nanos ( column_or_name ) # returns Unix epoch nanoseconds as LongType
timestamp_to_dotnet_ticks ( column_or_name )
unix_epoch_to_dotnet_ticks ( column_or_name )
unix_epoch_nanos_to_dotnet_ticks ( column_or_name )
Temporäres Spark-Verzeichnis [*] : Erstellen Sie ein temporäres Verzeichnis, das beim Herunterfahren der Spark-Anwendung entfernt wird.
Scala:
import uk . co . gresearch . spark . createTemporaryDir
val dir = createTemporaryDir( " prefix " )
Python:
# noinspection PyUnresolvedReferences
from gresearch . spark import *
dir = spark . create_temporary_dir ( "prefix" )
Spark-Jobbeschreibung [*] : Legen Sie die Spark-Jobbeschreibung für alle Spark-Jobs innerhalb eines Kontexts fest.
import uk . co . gresearch . spark . _
implicit val session : SparkSession = spark
withJobDescription( " parquet file " ) {
val df = spark.read.parquet( " data.parquet " )
val count = appendJobDescription( " count " ) {
df.count
}
appendJobDescription( " write " ) {
df.write.csv( " data.csv " )
}
}
Ohne Stellenbeschreibung | Mit Stellenbeschreibung |
---|---|
Beachten Sie, dass das Festlegen einer Beschreibung in einem Thread beim Aufrufen der Aktion (z. B. .count
) in einem anderen Thread nicht funktioniert, es sei denn, der andere Thread wird aus dem aktuellen Thread erzeugt, nachdem die Beschreibung festgelegt wurde.
Arbeitsbeispiel mit parallelen Sammlungen:
import java . util . concurrent . ForkJoinPool
import scala . collection . parallel . CollectionConverters . seqIsParallelizable
import scala . collection . parallel . ForkJoinTaskSupport
val files = Seq ( " data1.csv " , " data2.csv " ).par
val counts = withJobDescription( " Counting rows " ) {
// new thread pool required to spawn new threads from this thread
// so that the job description is actually used
files.tasksupport = new ForkJoinTaskSupport ( new ForkJoinPool ())
files.map(filename => spark.read.csv(filename).count).sum
}(spark)
Das spark-extension
ist für alle Spark-Versionen 3.2, 3.3, 3.4 und 3.5 verfügbar. Möglicherweise werden auch einige frühere Spark-Versionen unterstützt. Die Paketversion hat die folgende Semantik: spark-extension_{SCALA_COMPAT_VERSION}-{VERSION}-{SPARK_COMPAT_VERSION}
:
SCALA_COMPAT_VERSION
: Scala-Binärkompatibilitätsversion (Nebenversion). Verfügbar sind 2.12
und 2.13
.SPARK_COMPAT_VERSION
: Apache Spark-Binärkompatibilitätsversion (Nebenversion). Verfügbar sind 3.2
, 3.3
, 3.4
und 3.5
.VERSION
: Die Paketversion, z. B. 2.10.0
. Fügen Sie diese Zeile zu Ihrer build.sbt
Datei hinzu:
libraryDependencies + = " uk.co.gresearch.spark " %% " spark-extension " % " 2.13.0-3.5 "
Fügen Sie diese Abhängigkeit zu Ihrer pom.xml
Datei hinzu:
< dependency >
< groupId >uk.co.gresearch.spark</ groupId >
< artifactId >spark-extension_2.12</ artifactId >
< version >2.13.0-3.5</ version >
</ dependency >
Fügen Sie diese Abhängigkeit zu Ihrer build.gradle
Datei hinzu:
dependencies {
implementation " uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5 "
}
Senden Sie Ihre Spark-App mit der Spark-Erweiterungsabhängigkeit (Version ≥1.1.0) wie folgt:
spark-submit --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5 [jar]
Hinweis: Wählen Sie abhängig von Ihrer Spark-Version die richtige Scala-Version (hier 2.12) und Spark-Version (hier 3.5).
Starten Sie eine Spark-Shell mit der Spark-Erweiterungsabhängigkeit (Version ≥1.1.0) wie folgt:
spark-shell --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5
Hinweis: Wählen Sie abhängig von Ihrer Spark-Shell-Version die richtige Scala-Version (hier 2.12) und Spark-Version (hier 3.5).
Starten Sie eine PySpark-Sitzung mit der Spark Extension-Abhängigkeit (Version ≥1.1.0) wie folgt:
from pyspark . sql import SparkSession
spark = SparkSession
. builder
. config ( "spark.jars.packages" , "uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5" )
. getOrCreate ()
Hinweis: Wählen Sie abhängig von Ihrer PySpark-Version die richtige Scala-Version (hier 2.12) und Spark-Version (hier 3.5).
Starten Sie die Python Spark REPL mit der Spark Extension-Abhängigkeit (Version ≥1.1.0) wie folgt:
pyspark --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5
Hinweis: Wählen Sie abhängig von Ihrer PySpark-Version die richtige Scala-Version (hier 2.12) und Spark-Version (hier 3.5).
spark-submit
Führen Sie Ihre Python-Skripte, die PySpark verwenden, über spark-submit
aus:
spark-submit --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5 [script.py]
Hinweis: Wählen Sie abhängig von Ihrer Spark-Version die richtige Scala-Version (hier 2.12) und Spark-Version (hier 3.5).
Möglicherweise möchten Sie das Python-Paket pyspark-extension
von PyPi in Ihrer Entwicklungsumgebung installieren. Dies bietet Ihnen während Ihrer Entwicklungsphase Code-Vervollständigungs-, Typisierungs- und Testfunktionen.
Zum Ausführen Ihrer Python-Anwendung auf einem Spark-Cluster ist weiterhin eine der oben genannten Möglichkeiten erforderlich, um das Scala-Paket zur Spark-Umgebung hinzuzufügen.
pip install pyspark-extension==2.13.0.3.5
Hinweis: Wählen Sie abhängig von Ihrer PySpark-Version die richtige Spark-Version (hier 3.5).
Es gibt viele Data-Science-Notizbücher. Um diese Bibliothek zu verwenden, fügen Sie Ihrem Notebook eine JAR-Abhängigkeit hinzu, indem Sie die folgenden Maven-Koordinaten verwenden:
uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5
Oder laden Sie die JAR-Datei herunter und platzieren Sie sie in einem Dateisystem, in dem das Notebook darauf zugreifen kann, und verweisen Sie direkt auf die JAR-Datei.
Sehen Sie sich die Dokumentation Ihres bevorzugten Notebooks an, um zu erfahren, wie Sie Ihrer Spark-Umgebung JAR-Dateien hinzufügen.
Die meisten Funktionen werden in Python in Verbindung mit einem Spark Connect-Server nicht unterstützt. Dies gilt auch für die Databricks Runtime-Umgebung 13.x und höher. Details finden Sie in diesem Blog.
Der Aufruf einer dieser Funktionen bei Verbindung mit einem Spark Connect-Server führt zu folgendem Fehler:
This feature is not supported for Spark Connect.
Verwenden Sie stattdessen eine klassische Verbindung zu einem Spark-Cluster.
Sie können dieses Projekt mit verschiedenen Versionen von Spark und Scala erstellen.
Wenn Sie für eine Spark- oder Scala-Version erstellen möchten, die sich von der in der Datei pom.xml
definierten unterscheidet, führen Sie den Befehl aus
sh set-version.sh [SPARK-VERSION] [SCALA-VERSION]
Wechseln Sie beispielsweise zu Spark 3.5.0 und Scala 2.13.8, indem Sie sh set-version.sh 3.5.0 2.13.8
ausführen.
Führen Sie dann mvn package
aus, um aus den Quellen ein JAR zu erstellen. Es kann in target/
gefunden werden.
Führen Sie die Scala-Tests über mvn test
aus.
Um die Python-Tests auszuführen, richten Sie eine Python-Umgebung wie folgt ein (ersetzen Sie [SCALA-COMPAT-VERSION]
und [SPARK-COMPAT-VERSION]
durch die entsprechenden Werte):
virtualenv -p python3 venv
source venv/bin/activate
pip install -r python/requirements-[SPARK-COMPAT-VERSION]_[SCALA-COMPAT-VERSION].txt
pip install pytest
Führen Sie die Python-Tests über env PYTHONPATH=python:python/test python -m pytest python/test
aus.
Hinweis: Sie müssen zuerst die Scala-Quellen erstellen.
Führen Sie die folgende Befehlsfolge im Projektstammverzeichnis aus:
mkdir -p python/pyspark/jars/
cp -v target/spark-extension_ * - * .jar python/pyspark/jars/
pip install build
Führen Sie dann python -m build python/
aus, um aus den Quellen eine WHL zu erstellen. Es kann in python/dist/
gefunden werden.