Proyek ini menyediakan ekstensi ke proyek Apache Spark di Scala dan Python:
Diff: Transformasi dan aplikasi diff
untuk Dataset
yang menghitung perbedaan antara dua kumpulan data, yaitu baris mana yang akan ditambahkan , dihapus , atau diubah untuk berpindah dari satu kumpulan data ke kumpulan data lainnya.
SortedGroups: Transformasi groupByKey
yang mengelompokkan baris berdasarkan kunci sambil menyediakan iterator yang diurutkan untuk setiap grup. Mirip dengan Dataset.groupByKey.flatMapGroups
, tetapi dengan jaminan pesanan untuk iteratornya.
Histogram [*] : Transformasi histogram
yang menghitung histogram DataFrame untuk kolom nilai.
Nomor Baris Global [*] : Transformasi withRowNumbers
yang memberikan nomor baris global berdasarkan urutan Kumpulan Data saat ini, atau urutan tertentu. Berbeda dengan fungsi SQL row_number
yang sudah ada, yang memerlukan spesifikasi jendela, transformasi ini menyediakan nomor baris di seluruh Dataset tanpa masalah penskalaan.
Penulisan yang Dipartisi: Tindakan writePartitionedBy
menulis Dataset
Anda yang dipartisi dan ditata secara efisien dengan satu operasi.
Periksa file Parket [*] : Struktur file Parket (metadata, bukan data yang disimpan di Parket) dapat diperiksa mirip dengan alat parket atau parket-cli dengan membaca dari sumber data Spark sederhana. Hal ini menyederhanakan pengidentifikasian mengapa beberapa file Parket tidak dapat dipecah oleh Spark menjadi partisi yang dapat diskalakan.
Instal paket Python ke dalam pekerjaan PySpark [*] : Instal dependensi Python melalui PIP atau Puisi secara terprogram ke dalam pekerjaan PySpark Anda yang sedang berjalan (PySpark ≥ 3.1.0):
# noinspection PyUnresolvedReferences
from gresearch . spark import *
# using PIP
spark . install_pip_package ( "pandas==1.4.3" , "pyarrow" )
spark . install_pip_package ( "-r" , "requirements.txt" )
# using Poetry
spark . install_poetry_project ( "../my-poetry-project/" , poetry_python = "../venv-poetry/bin/python" )
Panggilan metode lancar: T.call(transformation: T => R): R
: Mengubah transformasi T => R
, yang bukan bagian dari T
menjadi pemanggilan metode lancar di T
. Ini memungkinkan penulisan kode yang lancar seperti:
import uk . co . gresearch . _
i.doThis()
.doThat()
.call(transformation)
.doMore()
Panggilan metode bersyarat lancar: T.when(condition: Boolean).call(transformation: T => T): T
: Melakukan transformasi dengan lancar hanya jika kondisi yang diberikan benar. Ini memungkinkan penulisan kode yang lancar seperti:
import uk . co . gresearch . _
i.doThis()
.doThat()
.when(condition).call(transformation)
.doMore()
Pintasan untuk groupBy.as : Memanggil Dataset.groupBy(Column*).as[K, T]
sebaiknya lebih disukai daripada memanggil Dataset.groupByKey(V => K)
bila memungkinkan. Yang pertama memungkinkan Catalyst untuk mengeksploitasi partisi dan pengurutan Dataset yang ada, sedangkan yang kedua menyembunyikan dari Catalyst kolom mana yang digunakan untuk membuat kunci. Hal ini dapat menimbulkan penalti kinerja yang signifikan.
Metode groupByKey[K](Column*)
berbasis ekspresi kolom yang baru mempermudah pengelompokan berdasarkan kunci ekspresi kolom. Alih-alih
ds.groupBy($"id").as[Int, V]
menggunakan:
ds.groupByKey[Int]($"id")
Backticks: backticks(string: String, strings: String*): String)
: Mengapit nama kolom tertentu dengan backticks ( `
) bila diperlukan. Ini adalah cara praktis untuk memastikan nama kolom dengan karakter khusus seperti titik ( .
) berfungsi dengan col()
atau select()
.
Hitung nilai nol: count_null(e: Column)
: fungsi agregasi seperti count
yang menghitung nilai nol di kolom e
. Ini setara dengan memanggil count(when(e.isNull, lit(1)))
.
.Net DateTime.Ticks [*] : Konversikan .Net (C#, F#, Visual Basic) DateTime.Ticks
menjadi stempel waktu, detik, dan nanodetik Spark.
// Scala
dotNetTicksToTimestamp( Column ) : Column // returns timestamp as TimestampType
dotNetTicksToUnixEpoch( Column ) : Column // returns Unix epoch seconds as DecimalType
dotNetTicksToUnixEpochNanos( Column ) : Column // returns Unix epoch nanoseconds as LongType
Kebalikannya disediakan oleh (semua kutu LongType
.Net yang dikembalikan):
// Scala
timestampToDotNetTicks( Column ) : Column
unixEpochToDotNetTicks( Column ) : Column
unixEpochNanosToDotNetTicks( Column ) : Column
Metode ini juga tersedia dengan Python:
# Python
dotnet_ticks_to_timestamp ( column_or_name ) # returns timestamp as TimestampType
dotnet_ticks_to_unix_epoch ( column_or_name ) # returns Unix epoch seconds as DecimalType
dotnet_ticks_to_unix_epoch_nanos ( column_or_name ) # returns Unix epoch nanoseconds as LongType
timestamp_to_dotnet_ticks ( column_or_name )
unix_epoch_to_dotnet_ticks ( column_or_name )
unix_epoch_nanos_to_dotnet_ticks ( column_or_name )
Direktori sementara Spark [*] : Membuat direktori sementara yang akan dihapus saat aplikasi Spark dimatikan.
skala:
import uk . co . gresearch . spark . createTemporaryDir
val dir = createTemporaryDir( " prefix " )
ular piton:
# noinspection PyUnresolvedReferences
from gresearch . spark import *
dir = spark . create_temporary_dir ( "prefix" )
Deskripsi pekerjaan Spark [*] : Tetapkan deskripsi pekerjaan Spark untuk semua pekerjaan Spark dalam suatu konteks.
import uk . co . gresearch . spark . _
implicit val session : SparkSession = spark
withJobDescription( " parquet file " ) {
val df = spark.read.parquet( " data.parquet " )
val count = appendJobDescription( " count " ) {
df.count
}
appendJobDescription( " write " ) {
df.write.csv( " data.csv " )
}
}
Tanpa deskripsi pekerjaan | Dengan deskripsi pekerjaan |
---|---|
Perhatikan bahwa mengatur deskripsi dalam satu thread sambil memanggil tindakan (misalnya .count
) di thread yang berbeda tidak akan berfungsi, kecuali thread yang berbeda dihasilkan dari thread saat ini setelah deskripsi ditetapkan.
Contoh kerja dengan koleksi paralel:
import java . util . concurrent . ForkJoinPool
import scala . collection . parallel . CollectionConverters . seqIsParallelizable
import scala . collection . parallel . ForkJoinTaskSupport
val files = Seq ( " data1.csv " , " data2.csv " ).par
val counts = withJobDescription( " Counting rows " ) {
// new thread pool required to spawn new threads from this thread
// so that the job description is actually used
files.tasksupport = new ForkJoinTaskSupport ( new ForkJoinPool ())
files.map(filename => spark.read.csv(filename).count).sum
}(spark)
Paket spark-extension
tersedia untuk semua versi Spark 3.2, 3.3, 3.4 dan 3.5. Beberapa versi Spark sebelumnya mungkin juga didukung. Versi paket memiliki semantik berikut: spark-extension_{SCALA_COMPAT_VERSION}-{VERSION}-{SPARK_COMPAT_VERSION}
:
SCALA_COMPAT_VERSION
: Versi kompatibilitas biner Scala (minor). Tersedia 2.12
dan 2.13
.SPARK_COMPAT_VERSION
: Versi kompatibilitas biner Apache Spark (minor). Tersedia 3.2
, 3.3
, 3.4
dan 3.5
.VERSION
: Versi paket, misalnya 2.10.0
. Tambahkan baris ini ke file build.sbt
Anda:
libraryDependencies + = " uk.co.gresearch.spark " %% " spark-extension " % " 2.13.0-3.5 "
Tambahkan ketergantungan ini ke file pom.xml
Anda:
< dependency >
< groupId >uk.co.gresearch.spark</ groupId >
< artifactId >spark-extension_2.12</ artifactId >
< version >2.13.0-3.5</ version >
</ dependency >
Tambahkan ketergantungan ini ke file build.gradle
Anda:
dependencies {
implementation " uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5 "
}
Kirimkan aplikasi Spark Anda dengan ketergantungan Spark Extension (versi ≥1.1.0) sebagai berikut:
spark-submit --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5 [jar]
Catatan: Pilih versi Scala yang tepat (di sini 2.12) dan versi Spark (di sini 3.5) bergantung pada versi Spark Anda.
Luncurkan Spark Shell dengan ketergantungan Spark Extension (versi ≥1.1.0) sebagai berikut:
spark-shell --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5
Catatan: Pilih versi Scala yang tepat (di sini 2.12) dan versi Spark (di sini 3.5) bergantung pada versi Spark Shell Anda.
Mulai sesi PySpark dengan ketergantungan Spark Extension (versi ≥1.1.0) sebagai berikut:
from pyspark . sql import SparkSession
spark = SparkSession
. builder
. config ( "spark.jars.packages" , "uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5" )
. getOrCreate ()
Catatan: Pilih versi Scala yang tepat (di sini 2.12) dan versi Spark (di sini 3.5) bergantung pada versi PySpark Anda.
Luncurkan Python Spark REPL dengan ketergantungan Spark Extension (versi ≥1.1.0) sebagai berikut:
pyspark --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5
Catatan: Pilih versi Scala yang tepat (di sini 2.12) dan versi Spark (di sini 3.5) bergantung pada versi PySpark Anda.
spark-submit
PySpark Jalankan skrip Python Anda yang menggunakan PySpark melalui spark-submit
:
spark-submit --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5 [script.py]
Catatan: Pilih versi Scala yang tepat (di sini 2.12) dan versi Spark (di sini 3.5) bergantung pada versi Spark Anda.
Anda mungkin ingin menginstal paket python pyspark-extension
dari PyPi ke lingkungan pengembangan Anda. Ini memberi Anda kemampuan penyelesaian kode, pengetikan, dan pengujian selama fase pengembangan Anda.
Menjalankan aplikasi Python Anda di kluster Spark masih memerlukan salah satu cara di atas untuk menambahkan paket Scala ke lingkungan Spark.
pip install pyspark-extension==2.13.0.3.5
Catatan: Pilih versi Spark yang tepat (di sini 3.5) bergantung pada versi PySpark Anda.
Ada banyak buku catatan Ilmu Data. Untuk menggunakan perpustakaan ini, tambahkan ketergantungan jar ke buku catatan Anda menggunakan koordinat Maven berikut :
uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5
Atau unduh jar dan letakkan di sistem file yang dapat diakses oleh notebook, dan referensikan file jar tersebut secara langsung.
Periksa dokumentasi buku catatan favorit Anda untuk mempelajari cara menambahkan jar ke lingkungan Spark Anda.
Sebagian besar fitur tidak didukung di Python bersama dengan server Spark Connect. Ini juga berlaku untuk lingkungan Databricks Runtime 13.x dan yang lebih baru. Detailnya dapat ditemukan di blog ini.
Memanggil salah satu fitur tersebut saat tersambung ke server Spark Connect akan memunculkan kesalahan ini:
This feature is not supported for Spark Connect.
Gunakan koneksi klasik ke kluster Spark sebagai gantinya.
Anda dapat membangun proyek ini dengan versi Spark dan Scala yang berbeda.
Jika Anda ingin membuat versi Spark atau Scala yang berbeda dengan yang ditentukan dalam file pom.xml
, jalankan
sh set-version.sh [SPARK-VERSION] [SCALA-VERSION]
Misalnya, beralih ke Spark 3.5.0 dan Scala 2.13.8 dengan menjalankan sh set-version.sh 3.5.0 2.13.8
.
Kemudian jalankan mvn package
untuk membuat toples dari sumbernya. Itu dapat ditemukan di target/
.
Jalankan tes Scala melalui mvn test
.
Untuk menjalankan pengujian Python, siapkan lingkungan Python sebagai berikut (ganti [SCALA-COMPAT-VERSION]
dan [SPARK-COMPAT-VERSION]
dengan nilai masing-masing):
virtualenv -p python3 venv
source venv/bin/activate
pip install -r python/requirements-[SPARK-COMPAT-VERSION]_[SCALA-COMPAT-VERSION].txt
pip install pytest
Jalankan tes Python melalui env PYTHONPATH=python:python/test python -m pytest python/test
.
Catatan: Anda harus membuat sumber Scala terlebih dahulu.
Jalankan urutan perintah berikut di direktori root proyek:
mkdir -p python/pyspark/jars/
cp -v target/spark-extension_ * - * .jar python/pyspark/jars/
pip install build
Kemudian jalankan python -m build python/
untuk membuat whl dari sumbernya. Itu dapat ditemukan di python/dist/
.