โปรเจ็กต์นี้มีส่วนขยายสำหรับโปรเจ็กต์ Apache Spark ใน Scala และ Python:
ความแตกต่าง: การแปลง diff
และแอปพลิเคชันสำหรับ Dataset
ที่จะคำนวณความแตกต่างระหว่างชุดข้อมูลสองชุด กล่าวคือ แถวใดที่จะ เพิ่ม ลบ หรือ เปลี่ยนแปลง เพื่อรับจากชุดข้อมูลหนึ่งไปยังอีกชุดหนึ่ง
SortedGroups: การแปลง groupByKey
ที่จัดกลุ่มแถวด้วยคีย์ในขณะที่จัดเตรียมตัววนซ้ำ ที่เรียงลำดับ สำหรับแต่ละกลุ่ม คล้ายกับ Dataset.groupByKey.flatMapGroups
แต่มีการรับประกันคำสั่งซื้อสำหรับผู้วนซ้ำ
ฮิสโตแกรม [*] : การแปลง histogram
ที่คำนวณฮิสโตแกรม DataFrame สำหรับคอลัมน์ค่า
Global Row Number [*] : การแปลง withRowNumbers
ที่ให้หมายเลขแถวส่วนกลางแสดงลำดับปัจจุบันของชุดข้อมูลหรือลำดับใดๆ ที่กำหนด ตรงกันข้ามกับฟังก์ชัน SQL ที่มีอยู่ row_number
ซึ่งต้องใช้ข้อมูลจำเพาะของหน้าต่าง การแปลงนี้จะให้หมายเลขแถวทั่วทั้งชุดข้อมูลโดยไม่มีปัญหาในการปรับขนาด
การเขียนแบบแบ่งพาร์ติชัน: การดำเนินการ writePartitionedBy
จะเขียน Dataset
ของคุณโดยแบ่งพาร์ติชันและจัดวางอย่างมีประสิทธิภาพด้วยการดำเนินการเพียงครั้งเดียว
ตรวจสอบไฟล์ Parquet [*] : สามารถตรวจสอบโครงสร้างของไฟล์ Parquet (ข้อมูลเมตา ไม่ใช่ข้อมูลที่จัดเก็บไว้ใน Parquet) สามารถตรวจสอบได้คล้ายกับ parquet-tools หรือ parquet-cli โดยการอ่านจากแหล่งข้อมูล Spark แบบธรรมดา ช่วยให้ระบุได้ง่ายขึ้นว่าเหตุใดไฟล์ Parquet บางไฟล์จึงไม่สามารถแบ่งโดย Spark ออกเป็นพาร์ติชันที่ปรับขนาดได้
ติดตั้งแพ็คเกจ Python ในงาน PySpark [*] : ติดตั้งการพึ่งพา Python ผ่าน PIP หรือ Poetry โดยทางโปรแกรมในงาน PySpark ที่กำลังรันอยู่ (PySpark ≥ 3.1.0):
# noinspection PyUnresolvedReferences
from gresearch . spark import *
# using PIP
spark . install_pip_package ( "pandas==1.4.3" , "pyarrow" )
spark . install_pip_package ( "-r" , "requirements.txt" )
# using Poetry
spark . install_poetry_project ( "../my-poetry-project/" , poetry_python = "../venv-poetry/bin/python" )
การเรียกเมธอดอย่างคล่องแคล่ว: T.call(transformation: T => R): R
: เปลี่ยนการแปลง T => R
ซึ่งไม่ได้เป็นส่วนหนึ่งของ T
เป็นการเรียกเมธอดอย่างคล่องแคล่วบน T
ช่วยให้เขียนโค้ดได้คล่อง เช่น:
import uk . co . gresearch . _
i.doThis()
.doThat()
.call(transformation)
.doMore()
การเรียกเมธอดแบบมีเงื่อนไขอย่างคล่องแคล่ว: T.when(condition: Boolean).call(transformation: T => T): T
: ทำการแปลงอย่างคล่องเฉพาะในกรณีที่เงื่อนไขที่กำหนดเป็นจริง ช่วยให้เขียนโค้ดได้คล่อง เช่น:
import uk . co . gresearch . _
i.doThis()
.doThat()
.when(condition).call(transformation)
.doMore()
ทางลัดสำหรับ groupBy.as : ควรเลือกใช้การเรียก Dataset.groupBy(Column*).as[K, T]
มากกว่าการเรียก Dataset.groupByKey(V => K)
ทุกครั้งที่เป็นไปได้ แบบแรกอนุญาตให้ Catalyst ใช้ประโยชน์จากการแบ่งพาร์ติชันที่มีอยู่และการเรียงลำดับชุดข้อมูล ในขณะที่แบบหลังซ่อนจาก Catalyst ว่าคอลัมน์ใดที่ใช้ในการสร้างคีย์ สิ่งนี้อาจมีโทษด้านประสิทธิภาพอย่างมาก
เมธอด groupByKey[K](Column*)
ที่ใช้นิพจน์คอลัมน์ใหม่ช่วยให้จัดกลุ่มตามคีย์นิพจน์คอลัมน์ได้ง่ายขึ้น แทน
ds.groupBy($"id").as[Int, V]
ใช้:
ds.groupByKey[Int]($"id")
Backticks: backticks(string: String, strings: String*): String)
: ล้อมรอบชื่อคอลัมน์ที่กำหนดด้วย backticks ( `
) เมื่อจำเป็น นี่เป็นวิธีที่สะดวกเพื่อให้แน่ใจว่าชื่อคอลัมน์ที่มีอักขระพิเศษ เช่น จุด ( .
) จะทำงานร่วมกับ col()
หรือ select()
ได้
นับค่า Null: count_null(e: Column)
: ฟังก์ชันการรวม เช่น count
ที่นับค่า Null ในคอลัมน์ e
สิ่งนี้เทียบเท่ากับการโทร count(when(e.isNull, lit(1)))
.Net DateTime.Ticks [*] : แปลง .Net (C#, F#, Visual Basic) DateTime.Ticks
เป็นการประทับเวลา Spark วินาที และนาโนวินาที
// Scala
dotNetTicksToTimestamp( Column ) : Column // returns timestamp as TimestampType
dotNetTicksToUnixEpoch( Column ) : Column // returns Unix epoch seconds as DecimalType
dotNetTicksToUnixEpochNanos( Column ) : Column // returns Unix epoch nanoseconds as LongType
การย้อนกลับจัดทำโดย (เห็บ LongType
.Net ที่ส่งคืนทั้งหมด):
// Scala
timestampToDotNetTicks( Column ) : Column
unixEpochToDotNetTicks( Column ) : Column
unixEpochNanosToDotNetTicks( Column ) : Column
วิธีการเหล่านี้มีอยู่ใน Python ด้วย:
# Python
dotnet_ticks_to_timestamp ( column_or_name ) # returns timestamp as TimestampType
dotnet_ticks_to_unix_epoch ( column_or_name ) # returns Unix epoch seconds as DecimalType
dotnet_ticks_to_unix_epoch_nanos ( column_or_name ) # returns Unix epoch nanoseconds as LongType
timestamp_to_dotnet_ticks ( column_or_name )
unix_epoch_to_dotnet_ticks ( column_or_name )
unix_epoch_nanos_to_dotnet_ticks ( column_or_name )
Spark ไดเร็กทอรีชั่วคราว [*] : สร้างไดเร็กทอรีชั่วคราวที่จะถูกลบออกเมื่อปิดแอปพลิเคชัน Spark
สกาล่า:
import uk . co . gresearch . spark . createTemporaryDir
val dir = createTemporaryDir( " prefix " )
หลาม:
# noinspection PyUnresolvedReferences
from gresearch . spark import *
dir = spark . create_temporary_dir ( "prefix" )
คำอธิบายงาน Spark [*] : ตั้งค่าคำอธิบายงาน Spark สำหรับงาน Spark ทั้งหมดภายในบริบท
import uk . co . gresearch . spark . _
implicit val session : SparkSession = spark
withJobDescription( " parquet file " ) {
val df = spark.read.parquet( " data.parquet " )
val count = appendJobDescription( " count " ) {
df.count
}
appendJobDescription( " write " ) {
df.write.csv( " data.csv " )
}
}
โดยไม่มีรายละเอียดงาน | พร้อมรายละเอียดงาน |
---|---|
โปรดทราบว่าการตั้งค่าคำอธิบายในเธรดหนึ่งในขณะที่เรียกใช้การดำเนินการ (เช่น .count
) ในเธรดอื่นจะไม่ทำงาน เว้นแต่เธรดอื่นจะถูกสร้างขึ้นจากเธรดปัจจุบัน หลังจาก ตั้งค่าคำอธิบายแล้ว
ตัวอย่างการทำงานกับคอลเลกชันแบบขนาน:
import java . util . concurrent . ForkJoinPool
import scala . collection . parallel . CollectionConverters . seqIsParallelizable
import scala . collection . parallel . ForkJoinTaskSupport
val files = Seq ( " data1.csv " , " data2.csv " ).par
val counts = withJobDescription( " Counting rows " ) {
// new thread pool required to spawn new threads from this thread
// so that the job description is actually used
files.tasksupport = new ForkJoinTaskSupport ( new ForkJoinPool ())
files.map(filename => spark.read.csv(filename).count).sum
}(spark)
แพ็คเกจ spark-extension
มีให้ใน Spark 3.2, 3.3, 3.4 และ 3.5 ทุกรุ่น Spark เวอร์ชันก่อนหน้าบางเวอร์ชันอาจได้รับการรองรับเช่นกัน เวอร์ชันแพ็คเกจมีความหมายดังต่อไปนี้: spark-extension_{SCALA_COMPAT_VERSION}-{VERSION}-{SPARK_COMPAT_VERSION}
:
SCALA_COMPAT_VERSION
: เวอร์ชันความเข้ากันได้ของไบนารี Scala (รอง) มีให้เลือกทั้ง 2.12
และ 2.13
SPARK_COMPAT_VERSION
: เวอร์ชันความเข้ากันได้ของไบนารี Apache Spark (รอง) มีให้เลือกทั้ง 3.2
, 3.3
, 3.4
และ 3.5
VERSION
: เวอร์ชันแพ็คเกจ เช่น 2.10.0
เพิ่มบรรทัดนี้ลงในไฟล์ build.sbt
ของคุณ:
libraryDependencies + = " uk.co.gresearch.spark " %% " spark-extension " % " 2.13.0-3.5 "
เพิ่มการพึ่งพานี้ให้กับไฟล์ pom.xml
ของคุณ:
< dependency >
< groupId >uk.co.gresearch.spark</ groupId >
< artifactId >spark-extension_2.12</ artifactId >
< version >2.13.0-3.5</ version >
</ dependency >
เพิ่มการพึ่งพานี้ให้กับไฟล์ build.gradle
ของคุณ:
dependencies {
implementation " uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5 "
}
ส่งแอป Spark ของคุณที่มีการพึ่งพา Spark Extension (เวอร์ชัน ≥1.1.0) ดังนี้:
spark-submit --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5 [jar]
หมายเหตุ: เลือกเวอร์ชัน Scala ที่ถูกต้อง (ที่นี่ 2.12) และเวอร์ชัน Spark (ที่นี่ 3.5) ขึ้นอยู่กับเวอร์ชัน Spark ของคุณ
เรียกใช้ Spark Shell ด้วยการพึ่งพา Spark Extension (เวอร์ชัน ≥1.1.0) ดังนี้:
spark-shell --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5
หมายเหตุ: เลือกเวอร์ชัน Scala ที่ถูกต้อง (ที่นี่ 2.12) และเวอร์ชัน Spark (ที่นี่ 3.5) ขึ้นอยู่กับเวอร์ชัน Spark Shell ของคุณ
เริ่มเซสชัน PySpark ด้วยการพึ่งพา Spark Extension (เวอร์ชัน ≥1.1.0) ดังนี้:
from pyspark . sql import SparkSession
spark = SparkSession
. builder
. config ( "spark.jars.packages" , "uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5" )
. getOrCreate ()
หมายเหตุ: เลือกเวอร์ชัน Scala ที่ถูกต้อง (ที่นี่ 2.12) และเวอร์ชัน Spark (ที่นี่ 3.5) ขึ้นอยู่กับเวอร์ชัน PySpark ของคุณ
เรียกใช้ Python Spark REPL ด้วยการพึ่งพา Spark Extension (เวอร์ชัน ≥1.1.0) ดังนี้:
pyspark --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5
หมายเหตุ: เลือกเวอร์ชัน Scala ที่ถูกต้อง (ที่นี่ 2.12) และเวอร์ชัน Spark (ที่นี่ 3.5) ขึ้นอยู่กับเวอร์ชัน PySpark ของคุณ
spark-submit
รันสคริปต์ Python ของคุณที่ใช้ PySpark ผ่าน spark-submit
:
spark-submit --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5 [script.py]
หมายเหตุ: เลือกเวอร์ชัน Scala ที่ถูกต้อง (ที่นี่ 2.12) และเวอร์ชัน Spark (ที่นี่ 3.5) ขึ้นอยู่กับเวอร์ชัน Spark ของคุณ
คุณอาจต้องการติดตั้งแพ็คเกจ pyspark-extension
python จาก PyPi ลงในสภาพแวดล้อมการพัฒนาของคุณ ซึ่งจะช่วยให้คุณสามารถกรอกโค้ด พิมพ์ และทดสอบความสามารถในระหว่างขั้นตอนการพัฒนาของคุณได้
การเรียกใช้แอปพลิเคชัน Python บนคลัสเตอร์ Spark ยังคงต้องใช้วิธีใดวิธีหนึ่งข้างต้นในการเพิ่มแพ็คเกจ Scala ลงในสภาพแวดล้อม Spark
pip install pyspark-extension==2.13.0.3.5
หมายเหตุ: เลือกเวอร์ชัน Spark ที่ถูกต้อง (ที่นี่ 3.5) ขึ้นอยู่กับเวอร์ชัน PySpark ของคุณ
มีสมุดบันทึก Data Science มากมายอยู่รอบตัว หากต้องการใช้ไลบรารีนี้ ให้เพิ่ม การพึ่งพา jar ให้กับสมุดบันทึกของคุณโดยใช้ พิกัด Maven เหล่านี้ :
uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5
หรือดาวน์โหลด jar แล้ววางลงบนระบบไฟล์ที่โน้ตบุ๊กสามารถเข้าถึงได้ และอ้างอิงไฟล์ jar นั้นโดยตรง
ตรวจสอบเอกสารประกอบของสมุดบันทึกที่คุณชื่นชอบเพื่อเรียนรู้วิธีเพิ่ม Jars ให้กับสภาพแวดล้อม Spark ของคุณ
คุณสมบัติส่วนใหญ่ไม่รองรับ Python เมื่อใช้ร่วมกับเซิร์ฟเวอร์ Spark Connect สิ่งนี้ยังใช้สำหรับสภาพแวดล้อม Databricks Runtime 13.x ขึ้นไป รายละเอียดสามารถพบได้ในบล็อกนี้
การเรียกคุณสมบัติใดๆ เหล่านั้นเมื่อเชื่อมต่อกับเซิร์ฟเวอร์ Spark Connect จะทำให้เกิดข้อผิดพลาดนี้:
This feature is not supported for Spark Connect.
ใช้การเชื่อมต่อแบบคลาสสิกกับคลัสเตอร์ Spark แทน
คุณสามารถสร้างโปรเจ็กต์นี้โดยใช้ Spark และ Scala เวอร์ชันต่างๆ ได้
หากคุณต้องการสร้างเวอร์ชัน Spark หรือ Scala ที่แตกต่างจากที่กำหนดไว้ในไฟล์ pom.xml
ให้รัน
sh set-version.sh [SPARK-VERSION] [SCALA-VERSION]
ตัวอย่างเช่น เปลี่ยนไปใช้ Spark 3.5.0 และ Scala 2.13.8 โดยเรียกใช้ sh set-version.sh 3.5.0 2.13.8
จากนั้นดำเนินการ mvn package
เพื่อสร้าง jar จากแหล่งที่มา สามารถพบได้ใน target/
.
รันการทดสอบ Scala ผ่าน mvn test
ในการรันการทดสอบ Python ให้ตั้งค่าสภาพแวดล้อม Python ดังต่อไปนี้ (แทนที่ [SCALA-COMPAT-VERSION]
และ [SPARK-COMPAT-VERSION]
ด้วยค่าตามลำดับ):
virtualenv -p python3 venv
source venv/bin/activate
pip install -r python/requirements-[SPARK-COMPAT-VERSION]_[SCALA-COMPAT-VERSION].txt
pip install pytest
รันการทดสอบ Python ผ่าน env PYTHONPATH=python:python/test python -m pytest python/test
หมายเหตุ: คุณต้องสร้างซอร์ส Scala ก่อน
รันลำดับของคำสั่งต่อไปนี้ในไดเร็กทอรีรากของโปรเจ็กต์:
mkdir -p python/pyspark/jars/
cp -v target/spark-extension_ * - * .jar python/pyspark/jars/
pip install build
จากนั้นดำเนินการ python -m build python/
เพื่อสร้าง whl จากแหล่งที่มา มันสามารถพบได้ใน python/dist/