Este proyecto proporciona extensiones al proyecto Apache Spark en Scala y Python:
Diff: una transformación y aplicación de diff
para Dataset
que calcula las diferencias entre dos conjuntos de datos, es decir, qué filas agregar , eliminar o cambiar para pasar de un conjunto de datos a otro.
SortedGroups: una transformación groupByKey
que agrupa filas por una clave y al mismo tiempo proporciona un iterador ordenado para cada grupo. Similar a Dataset.groupByKey.flatMapGroups
, pero con garantías de orden para el iterador.
Histograma [*] : una transformación histogram
que calcula el marco de datos del histograma para una columna de valor.
Número de fila global [*] : una transformación withRowNumbers
que proporciona el número de fila global en relación con el orden actual del conjunto de datos o cualquier orden determinado. A diferencia de la función SQL existente row_number
, que requiere una especificación de ventana, esta transformación proporciona el número de fila en todo el conjunto de datos sin problemas de escala.
Escritura particionada: la acción writePartitionedBy
escribe su Dataset
particionado y distribuido de manera eficiente con una sola operación.
Inspeccionar archivos Parquet [*] : la estructura de los archivos Parquet (los metadatos, no los datos almacenados en Parquet) se puede inspeccionar de manera similar a parquet-tools o parquet-cli leyendo desde una fuente de datos Spark simple. Esto simplifica la identificación de por qué Spark no puede dividir algunos archivos Parquet en particiones escalables.
Instale paquetes de Python en el trabajo de PySpark [*] : instale las dependencias de Python mediante PIP o Poetry mediante programación en su trabajo de PySpark en ejecución (PySpark ≥ 3.1.0):
# noinspection PyUnresolvedReferences
from gresearch . spark import *
# using PIP
spark . install_pip_package ( "pandas==1.4.3" , "pyarrow" )
spark . install_pip_package ( "-r" , "requirements.txt" )
# using Poetry
spark . install_poetry_project ( "../my-poetry-project/" , poetry_python = "../venv-poetry/bin/python" )
Llamada fluida al método: T.call(transformation: T => R): R
: Convierte una transformación T => R
, que no es parte de T
, en una llamada fluida al método en T
. Esto permite escribir código fluido como:
import uk . co . gresearch . _
i.doThis()
.doThat()
.call(transformation)
.doMore()
Llamada fluida al método condicional: T.when(condition: Boolean).call(transformation: T => T): T
: Realiza una transformación con fluidez solo si la condición dada es verdadera. Esto permite escribir código fluido como:
import uk . co . gresearch . _
i.doThis()
.doThat()
.when(condition).call(transformation)
.doMore()
Acceso directo a groupBy.as : Se debe preferir llamar Dataset.groupBy(Column*).as[K, T]
a llamar Dataset.groupByKey(V => K)
siempre que sea posible. El primero permite a Catalyst explotar la partición y el ordenamiento existentes del conjunto de datos, mientras que el segundo oculta a Catalyst qué columnas se utilizan para crear las claves. Esto puede tener una importante penalización en el rendimiento.
El nuevo método groupByKey[K](Column*)
basado en expresiones de columna facilita la agrupación por una clave de expresión de columna. En lugar de
ds.groupBy($"id").as[Int, V]
usar:
ds.groupByKey[Int]($"id")
Comillas invertidas: backticks(string: String, strings: String*): String)
: encierra el nombre de la columna dada con comillas invertidas ( `
) cuando sea necesario. Esta es una forma práctica de garantizar que los nombres de columnas con caracteres especiales como puntos ( .
) funcionen con col()
o select()
.
Contar valores nulos: count_null(e: Column)
: una función de agregación como count
que cuenta valores nulos en la columna e
. Esto es equivalente a llamar count(when(e.isNull, lit(1)))
.
.Net DateTime.Ticks [*] : convierte .Net (C#, F#, Visual Basic) DateTime.Ticks
en marcas de tiempo, segundos y nanosegundos de Spark.
// Scala
dotNetTicksToTimestamp( Column ) : Column // returns timestamp as TimestampType
dotNetTicksToUnixEpoch( Column ) : Column // returns Unix epoch seconds as DecimalType
dotNetTicksToUnixEpochNanos( Column ) : Column // returns Unix epoch nanoseconds as LongType
Lo contrario lo proporciona (todos devuelven ticks LongType
.Net):
// Scala
timestampToDotNetTicks( Column ) : Column
unixEpochToDotNetTicks( Column ) : Column
unixEpochNanosToDotNetTicks( Column ) : Column
Estos métodos también están disponibles en Python:
# Python
dotnet_ticks_to_timestamp ( column_or_name ) # returns timestamp as TimestampType
dotnet_ticks_to_unix_epoch ( column_or_name ) # returns Unix epoch seconds as DecimalType
dotnet_ticks_to_unix_epoch_nanos ( column_or_name ) # returns Unix epoch nanoseconds as LongType
timestamp_to_dotnet_ticks ( column_or_name )
unix_epoch_to_dotnet_ticks ( column_or_name )
unix_epoch_nanos_to_dotnet_ticks ( column_or_name )
Directorio temporal de Spark [*] : cree un directorio temporal que se eliminará al cerrar la aplicación Spark.
Escala:
import uk . co . gresearch . spark . createTemporaryDir
val dir = createTemporaryDir( " prefix " )
Pitón:
# noinspection PyUnresolvedReferences
from gresearch . spark import *
dir = spark . create_temporary_dir ( "prefix" )
Descripción del trabajo de Spark [*] : establece la descripción del trabajo de Spark para todos los trabajos de Spark dentro de un contexto.
import uk . co . gresearch . spark . _
implicit val session : SparkSession = spark
withJobDescription( " parquet file " ) {
val df = spark.read.parquet( " data.parquet " )
val count = appendJobDescription( " count " ) {
df.count
}
appendJobDescription( " write " ) {
df.write.csv( " data.csv " )
}
}
Sin descripción del trabajo | Con descripción del puesto |
---|---|
Tenga en cuenta que establecer una descripción en un hilo mientras se llama a la acción (por ejemplo, .count
) en un hilo diferente no funciona, a menos que el hilo diferente se genere a partir del hilo actual después de que se haya establecido la descripción.
Ejemplo de trabajo con colecciones paralelas:
import java . util . concurrent . ForkJoinPool
import scala . collection . parallel . CollectionConverters . seqIsParallelizable
import scala . collection . parallel . ForkJoinTaskSupport
val files = Seq ( " data1.csv " , " data2.csv " ).par
val counts = withJobDescription( " Counting rows " ) {
// new thread pool required to spawn new threads from this thread
// so that the job description is actually used
files.tasksupport = new ForkJoinTaskSupport ( new ForkJoinPool ())
files.map(filename => spark.read.csv(filename).count).sum
}(spark)
El paquete spark-extension
está disponible para todas las versiones Spark 3.2, 3.3, 3.4 y 3.5. Es posible que también se admitan algunas versiones anteriores de Spark. La versión del paquete tiene la siguiente semántica: spark-extension_{SCALA_COMPAT_VERSION}-{VERSION}-{SPARK_COMPAT_VERSION}
:
SCALA_COMPAT_VERSION
: versión (menor) de compatibilidad binaria de Scala. Disponibles son 2.12
y 2.13
.SPARK_COMPAT_VERSION
: versión (menor) de compatibilidad binaria de Apache Spark. Disponibles son 3.2
, 3.3
, 3.4
y 3.5
.VERSION
: La versión del paquete, por ejemplo, 2.10.0
. Agregue esta línea a su archivo build.sbt
:
libraryDependencies + = " uk.co.gresearch.spark " %% " spark-extension " % " 2.13.0-3.5 "
Agregue esta dependencia a su archivo pom.xml
:
< dependency >
< groupId >uk.co.gresearch.spark</ groupId >
< artifactId >spark-extension_2.12</ artifactId >
< version >2.13.0-3.5</ version >
</ dependency >
Agregue esta dependencia a su archivo build.gradle
:
dependencies {
implementation " uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5 "
}
Envíe su aplicación Spark con la dependencia de Spark Extension (versión ≥1.1.0) de la siguiente manera:
spark-submit --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5 [jar]
Nota: Elija la versión de Scala (aquí 2.12) y la versión de Spark (aquí 3.5) correctas según su versión de Spark.
Inicie Spark Shell con la dependencia de Spark Extension (versión ≥1.1.0) de la siguiente manera:
spark-shell --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5
Nota: Elija la versión de Scala (aquí 2.12) y la versión de Spark (aquí 3.5) correctas según su versión de Spark Shell.
Inicie una sesión de PySpark con la dependencia de Spark Extension (versión ≥1.1.0) de la siguiente manera:
from pyspark . sql import SparkSession
spark = SparkSession
. builder
. config ( "spark.jars.packages" , "uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5" )
. getOrCreate ()
Nota: Elija la versión de Scala (aquí 2.12) y la versión de Spark (aquí 3.5) correctas según su versión de PySpark.
Inicie Python Spark REPL con la dependencia de Spark Extension (versión ≥1.1.0) de la siguiente manera:
pyspark --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5
Nota: Elija la versión de Scala (aquí 2.12) y la versión de Spark (aquí 3.5) correctas según su versión de PySpark.
spark-submit
de PySpark Ejecute sus scripts de Python que usan PySpark mediante spark-submit
:
spark-submit --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5 [script.py]
Nota: Elija la versión correcta de Scala (aquí 2.12) y la versión de Spark (aquí 3.5) según su versión de Spark.
Es posible que desee instalar el paquete Python pyspark-extension
de PyPi en su entorno de desarrollo. Esto le proporciona capacidades de finalización, escritura y prueba de código durante su fase de desarrollo.
Ejecutar su aplicación Python en un clúster Spark aún requerirá una de las formas anteriores para agregar el paquete Scala al entorno Spark.
pip install pyspark-extension==2.13.0.3.5
Nota: Elija la versión correcta de Spark (aquí 3.5) según su versión de PySpark.
Hay muchos cuadernos de ciencia de datos disponibles. Para usar esta biblioteca, agregue una dependencia jar a su cuaderno usando estas coordenadas Maven :
uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5
O descargue el jar y colóquelo en un sistema de archivos al que pueda acceder el portátil, y haga referencia a ese archivo jar directamente.
Consulte la documentación de su cuaderno favorito para aprender cómo agregar archivos jar a su entorno Spark.
La mayoría de las funciones no son compatibles con Python junto con un servidor Spark Connect. Esto también es válido para el entorno de Databricks Runtime 13.x y versiones posteriores. Los detalles se pueden encontrar en este blog.
Llamar a cualquiera de esas funciones cuando esté conectado a un servidor Spark Connect generará este error:
This feature is not supported for Spark Connect.
Utilice una conexión clásica a un clúster Spark en su lugar.
Puede construir este proyecto con diferentes versiones de Spark y Scala.
Si desea compilar para una versión de Spark o Scala diferente a la definida en el archivo pom.xml
, ejecute
sh set-version.sh [SPARK-VERSION] [SCALA-VERSION]
Por ejemplo, cambie a Spark 3.5.0 y Scala 2.13.8 ejecutando sh set-version.sh 3.5.0 2.13.8
.
Luego ejecute mvn package
para crear un jar a partir de las fuentes. Se puede encontrar en target/
.
Ejecute las pruebas de Scala mediante mvn test
.
Para ejecutar las pruebas de Python, configure un entorno Python de la siguiente manera (reemplace [SCALA-COMPAT-VERSION]
y [SPARK-COMPAT-VERSION]
con los valores respectivos):
virtualenv -p python3 venv
source venv/bin/activate
pip install -r python/requirements-[SPARK-COMPAT-VERSION]_[SCALA-COMPAT-VERSION].txt
pip install pytest
Ejecute las pruebas de Python a través de env PYTHONPATH=python:python/test python -m pytest python/test
.
Nota: primero debes construir las fuentes de Scala.
Ejecute la siguiente secuencia de comandos en el directorio raíz del proyecto:
mkdir -p python/pyspark/jars/
cp -v target/spark-extension_ * - * .jar python/pyspark/jars/
pip install build
Luego ejecute python -m build python/
para crear un whl a partir de las fuentes. Se puede encontrar en python/dist/
.