Ce projet fournit des extensions au projet Apache Spark en Scala et Python :
Diff : une transformation diff
et une application pour Dataset
qui calculent les différences entre deux ensembles de données, c'est-à-dire quelles lignes ajouter , supprimer ou modifier pour passer d'un ensemble de données à l'autre.
SortedGroups : une transformation groupByKey
qui regroupe les lignes par clé tout en fournissant un itérateur trié pour chaque groupe. Similaire à Dataset.groupByKey.flatMapGroups
, mais avec des garanties d'ordre pour l'itérateur.
Histogramme [*] : Une transformation histogram
qui calcule l'histogramme DataFrame pour une colonne de valeur.
Global Row Number [*] : une transformation withRowNumbers
qui fournit le numéro de ligne global par rapport à l'ordre actuel de l'ensemble de données, ou à tout ordre donné. Contrairement à la fonction SQL existante row_number
, qui nécessite une spécification de fenêtre, cette transformation fournit le numéro de ligne sur l'ensemble de données sans problèmes de mise à l'échelle.
Écriture partitionnée : l'action writePartitionedBy
écrit votre Dataset
partitionné et organisé efficacement en une seule opération.
Inspecter les fichiers Parquet [*] : La structure des fichiers Parquet (les métadonnées, pas les données stockées dans Parquet) peut être inspectée de la même manière que parquet-tools ou parquet-cli en lisant à partir d'une simple source de données Spark. Cela simplifie l'identification des raisons pour lesquelles certains fichiers Parquet ne peuvent pas être divisés par Spark en partitions évolutives.
Installez les packages Python dans le travail PySpark [*] : installez les dépendances Python via PIP ou Poetry par programmation dans votre travail PySpark en cours d'exécution (PySpark ≥ 3.1.0) :
# noinspection PyUnresolvedReferences
from gresearch . spark import *
# using PIP
spark . install_pip_package ( "pandas==1.4.3" , "pyarrow" )
spark . install_pip_package ( "-r" , "requirements.txt" )
# using Poetry
spark . install_poetry_project ( "../my-poetry-project/" , poetry_python = "../venv-poetry/bin/python" )
Appel de méthode fluide : T.call(transformation: T => R): R
: Transforme une transformation T => R
, qui ne fait pas partie de T
en un appel de méthode fluide sur T
. Cela permet d'écrire du code fluide comme :
import uk . co . gresearch . _
i.doThis()
.doThat()
.call(transformation)
.doMore()
Appel de méthode conditionnelle fluide : T.when(condition: Boolean).call(transformation: T => T): T
: Effectuer une transformation couramment uniquement si la condition donnée est vraie. Cela permet d'écrire du code fluide comme :
import uk . co . gresearch . _
i.doThis()
.doThat()
.when(condition).call(transformation)
.doMore()
Raccourci pour groupBy.as : l'appel de Dataset.groupBy(Column*).as[K, T]
doit être préféré à l'appel Dataset.groupByKey(V => K)
chaque fois que cela est possible. Le premier permet à Catalyst d'exploiter le partitionnement et l'ordre existants de l'ensemble de données, tandis que le second cache à Catalyst les colonnes utilisées pour créer les clés. Cela peut entraîner une baisse significative des performances.
La nouvelle méthode groupByKey[K](Column*)
basée sur l'expression de colonne facilite le regroupement par clé d'expression de colonne. Au lieu de
ds.groupBy($"id").as[Int, V]
utiliser:
ds.groupByKey[Int]($"id")
Backticks: backticks(string: String, strings: String*): String)
: entoure le nom de colonne donné avec des backticks ( `
) si nécessaire. C'est un moyen pratique de garantir que les noms de colonnes avec des caractères spéciaux comme des points ( .
) fonctionnent avec col()
ou select()
.
Compter les valeurs nulles : count_null(e: Column)
: une fonction d'agrégation comme count
qui compte les valeurs nulles dans la colonne e
. Cela équivaut à appeler count(when(e.isNull, lit(1)))
.
.Net DateTime.Ticks [*] : Convertissez .Net (C#, F#, Visual Basic) DateTime.Ticks
en horodatages Spark, secondes et nanosecondes.
// Scala
dotNetTicksToTimestamp( Column ) : Column // returns timestamp as TimestampType
dotNetTicksToUnixEpoch( Column ) : Column // returns Unix epoch seconds as DecimalType
dotNetTicksToUnixEpochNanos( Column ) : Column // returns Unix epoch nanoseconds as LongType
L’inverse est fourni par (tous renvoient des ticks LongType
.Net) :
// Scala
timestampToDotNetTicks( Column ) : Column
unixEpochToDotNetTicks( Column ) : Column
unixEpochNanosToDotNetTicks( Column ) : Column
Ces méthodes sont également disponibles en Python :
# Python
dotnet_ticks_to_timestamp ( column_or_name ) # returns timestamp as TimestampType
dotnet_ticks_to_unix_epoch ( column_or_name ) # returns Unix epoch seconds as DecimalType
dotnet_ticks_to_unix_epoch_nanos ( column_or_name ) # returns Unix epoch nanoseconds as LongType
timestamp_to_dotnet_ticks ( column_or_name )
unix_epoch_to_dotnet_ticks ( column_or_name )
unix_epoch_nanos_to_dotnet_ticks ( column_or_name )
Répertoire temporaire Spark [*] : Créez un répertoire temporaire qui sera supprimé à l'arrêt de l'application Spark.
Échelle :
import uk . co . gresearch . spark . createTemporaryDir
val dir = createTemporaryDir( " prefix " )
Python:
# noinspection PyUnresolvedReferences
from gresearch . spark import *
dir = spark . create_temporary_dir ( "prefix" )
Description du travail Spark [*] : Définissez la description du travail Spark pour tous les travaux Spark dans un contexte.
import uk . co . gresearch . spark . _
implicit val session : SparkSession = spark
withJobDescription( " parquet file " ) {
val df = spark.read.parquet( " data.parquet " )
val count = appendJobDescription( " count " ) {
df.count
}
appendJobDescription( " write " ) {
df.write.csv( " data.csv " )
}
}
Sans description de poste | Avec description de poste |
---|---|
Notez que définir une description dans un thread tout en appelant l'action (par exemple .count
) dans un autre thread ne fonctionne pas, à moins que le thread différent ne soit généré à partir du thread actuel après que la description a été définie.
Exemple de travail avec des collections parallèles :
import java . util . concurrent . ForkJoinPool
import scala . collection . parallel . CollectionConverters . seqIsParallelizable
import scala . collection . parallel . ForkJoinTaskSupport
val files = Seq ( " data1.csv " , " data2.csv " ).par
val counts = withJobDescription( " Counting rows " ) {
// new thread pool required to spawn new threads from this thread
// so that the job description is actually used
files.tasksupport = new ForkJoinTaskSupport ( new ForkJoinPool ())
files.map(filename => spark.read.csv(filename).count).sum
}(spark)
Le package spark-extension
est disponible pour toutes les versions Spark 3.2, 3.3, 3.4 et 3.5. Certaines versions antérieures de Spark peuvent également être prises en charge. La version du package a la sémantique suivante : spark-extension_{SCALA_COMPAT_VERSION}-{VERSION}-{SPARK_COMPAT_VERSION}
:
SCALA_COMPAT_VERSION
: version (mineure) de compatibilité binaire Scala. 2.12
et 2.13
sont disponibles.SPARK_COMPAT_VERSION
: version (mineure) de compatibilité binaire d'Apache Spark. 3.2
, 3.3
, 3.4
et 3.5
sont disponibles.VERSION
: La version du package, par exemple 2.10.0
. Ajoutez cette ligne à votre fichier build.sbt
:
libraryDependencies + = " uk.co.gresearch.spark " %% " spark-extension " % " 2.13.0-3.5 "
Ajoutez cette dépendance à votre fichier pom.xml
:
< dependency >
< groupId >uk.co.gresearch.spark</ groupId >
< artifactId >spark-extension_2.12</ artifactId >
< version >2.13.0-3.5</ version >
</ dependency >
Ajoutez cette dépendance à votre fichier build.gradle
:
dependencies {
implementation " uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5 "
}
Soumettez votre application Spark avec la dépendance Spark Extension (version ≥1.1.0) comme suit :
spark-submit --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5 [jar]
Remarque : Choisissez la bonne version Scala (ici 2.12) et la bonne version Spark (ici 3.5) en fonction de votre version Spark.
Lancez un Spark Shell avec la dépendance Spark Extension (version ≥1.1.0) comme suit :
spark-shell --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5
Remarque : Choisissez la bonne version de Scala (ici 2.12) et de Spark (ici 3.5) en fonction de votre version de Spark Shell.
Démarrez une session PySpark avec la dépendance Spark Extension (version ≥1.1.0) comme suit :
from pyspark . sql import SparkSession
spark = SparkSession
. builder
. config ( "spark.jars.packages" , "uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5" )
. getOrCreate ()
Remarque : Choisissez la bonne version de Scala (ici 2.12) et de Spark (ici 3.5) en fonction de votre version de PySpark.
Lancez Python Spark REPL avec la dépendance Spark Extension (version ≥1.1.0) comme suit :
pyspark --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5
Remarque : Choisissez la bonne version de Scala (ici 2.12) et de Spark (ici 3.5) en fonction de votre version de PySpark.
spark-submit
PySpark Exécutez vos scripts Python qui utilisent PySpark via spark-submit
:
spark-submit --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5 [script.py]
Remarque : Choisissez la bonne version Scala (ici 2.12) et la bonne version Spark (ici 3.5) en fonction de votre version Spark.
Vous souhaiterez peut-être installer le package python pyspark-extension
de PyPi dans votre environnement de développement. Cela vous offre des capacités de complétion de code, de saisie et de test pendant votre phase de développement.
L'exécution de votre application Python sur un cluster Spark nécessitera toujours l'une des méthodes ci-dessus pour ajouter le package Scala à l'environnement Spark.
pip install pyspark-extension==2.13.0.3.5
Remarque : Choisissez la bonne version de Spark (ici 3.5) en fonction de votre version de PySpark.
Il existe de nombreux cahiers de science des données. Pour utiliser cette bibliothèque, ajoutez une dépendance jar à votre notebook en utilisant ces coordonnées Maven :
uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5
Ou téléchargez le fichier jar et placez-le sur un système de fichiers où il est accessible par le bloc-notes, et référencez directement ce fichier jar.
Consultez la documentation de votre notebook préféré pour savoir comment ajouter des fichiers jar à votre environnement Spark.
La plupart des fonctionnalités ne sont pas prises en charge dans Python en conjonction avec un serveur Spark Connect. Cela vaut également pour l’environnement Databricks Runtime 13.x et supérieur. Les détails peuvent être trouvés sur ce blog.
L’appel de l’une de ces fonctionnalités lorsque vous êtes connecté à un serveur Spark Connect générera cette erreur :
This feature is not supported for Spark Connect.
Utilisez plutôt une connexion classique à un cluster Spark.
Vous pouvez créer ce projet avec différentes versions de Spark et Scala.
Si vous souhaitez créer pour une version Spark ou Scala différente de celle définie dans le fichier pom.xml
, exécutez
sh set-version.sh [SPARK-VERSION] [SCALA-VERSION]
Par exemple, passez à Spark 3.5.0 et Scala 2.13.8 en exécutant sh set-version.sh 3.5.0 2.13.8
.
Exécutez ensuite mvn package
pour créer un fichier jar à partir des sources. Il peut être trouvé dans target/
.
Exécutez les tests Scala via mvn test
.
Afin d'exécuter les tests Python, configurez un environnement Python comme suit (remplacez [SCALA-COMPAT-VERSION]
et [SPARK-COMPAT-VERSION]
par les valeurs respectives) :
virtualenv -p python3 venv
source venv/bin/activate
pip install -r python/requirements-[SPARK-COMPAT-VERSION]_[SCALA-COMPAT-VERSION].txt
pip install pytest
Exécutez les tests Python via env PYTHONPATH=python:python/test python -m pytest python/test
.
Remarque : vous devez d'abord créer les sources Scala.
Exécutez la séquence de commandes suivante dans le répertoire racine du projet :
mkdir -p python/pyspark/jars/
cp -v target/spark-extension_ * - * .jar python/pyspark/jars/
pip install build
Exécutez ensuite python -m build python/
pour créer un whl à partir des sources. Il peut être trouvé dans python/dist/
.