يوفر هذا المشروع امتدادات لمشروع Apache Spark في Scala وPython:
Diff: تحويل وتطبيق diff
Dataset
التي تحسب الاختلافات بين مجموعتي بيانات، أي الصفوف التي يجب إضافتها أو حذفها أو تغييرها للانتقال من مجموعة بيانات إلى أخرى.
SortedGroups: تحويل groupByKey
يقوم بتجميع الصفوف حسب مفتاح مع توفير مكرر تم فرزه لكل مجموعة. مشابه لـ Dataset.groupByKey.flatMapGroups
، ولكن مع ضمانات الطلب للمكرر.
الرسم البياني [*] : تحويل histogram
الذي يحسب DataFrame الرسم البياني لعمود القيمة.
رقم الصف العالمي [*] : تحويل withRowNumbers
الذي يوفر رقم الصف العام حسب الترتيب الحالي لمجموعة البيانات، أو أي ترتيب محدد. على النقيض من دالة SQL الموجودة row_number
، والتي تتطلب مواصفات نافذة، يوفر هذا التحويل رقم الصف عبر مجموعة البيانات بأكملها دون مشاكل في القياس.
الكتابة المقسمة: يقوم الإجراء writePartitionedBy
بكتابة Dataset
الخاصة بك مقسمة ومصممة بكفاءة من خلال عملية واحدة.
فحص ملفات Parquet [*] : يمكن فحص بنية ملفات Parquet (البيانات الوصفية، وليس البيانات المخزنة في Parquet) بشكل مشابه لأدوات الباركيه أو parquet-cli من خلال القراءة من مصدر بيانات Spark بسيط. يؤدي هذا إلى تبسيط عملية تحديد سبب عدم إمكانية تقسيم بعض ملفات Parquet بواسطة Spark إلى أقسام قابلة للتطوير.
تثبيت حزم Python في وظيفة PySpark [*] : قم بتثبيت تبعيات Python عبر PIP أو Poetry برمجيًا في وظيفة PySpark قيد التشغيل (PySpark ≥ 3.1.0):
# noinspection PyUnresolvedReferences
from gresearch . spark import *
# using PIP
spark . install_pip_package ( "pandas==1.4.3" , "pyarrow" )
spark . install_pip_package ( "-r" , "requirements.txt" )
# using Poetry
spark . install_poetry_project ( "../my-poetry-project/" , poetry_python = "../venv-poetry/bin/python" )
استدعاء أسلوب بطلاقة: T.call(transformation: T => R): R
: تحويل تحويل T => R
، الذي ليس جزءًا من T
إلى استدعاء أسلوب بطلاقة على T
. وهذا يسمح بكتابة تعليمات برمجية بطلاقة مثل:
import uk . co . gresearch . _
i.doThis()
.doThat()
.call(transformation)
.doMore()
استدعاء الطريقة الشرطية بطلاقة: T.when(condition: Boolean).call(transformation: T => T): T
: قم بإجراء التحويل بطلاقة فقط إذا كان الشرط المحدد صحيحًا. وهذا يسمح بكتابة تعليمات برمجية بطلاقة مثل:
import uk . co . gresearch . _
i.doThis()
.doThat()
.when(condition).call(transformation)
.doMore()
اختصار groupBy.as : يجب تفضيل استدعاء Dataset.groupBy(Column*).as[K, T]
بدلاً من استدعاء Dataset.groupByKey(V => K)
كلما أمكن ذلك. يسمح الأول لـ Catalyst باستغلال التقسيم والترتيب الحالي لمجموعة البيانات، بينما يخفي الأخير من Catalyst الأعمدة المستخدمة لإنشاء المفاتيح. يمكن أن يكون لهذا عقوبة أداء كبيرة.
تعمل الطريقة الجديدة groupByKey[K](Column*)
المستندة إلى تعبير العمود على تسهيل التجميع بواسطة مفتاح تعبير العمود. بدلاً من
ds.groupBy($"id").as[Int, V]
يستخدم:
ds.groupByKey[Int]($"id")
Backticks: backticks(string: String, strings: String*): String)
: يحيط اسم العمود المحدد بعلامات التحديد الخلفية ( `
) عند الحاجة. هذه طريقة سهلة للتأكد من أن أسماء الأعمدة ذات الأحرف الخاصة مثل النقاط ( .
) تعمل مع col()
أو select()
.
حساب القيم الخالية: count_null(e: Column)
: دالة تجميع مثل count
الذي يحسب القيم الخالية في العمود e
. وهذا يعادل استدعاء count(when(e.isNull, lit(1)))
.
.Net DateTime.Ticks [*] : تحويل .Net (C#، F#، Visual Basic) DateTime.Ticks
إلى طوابع زمنية Spark، وثواني ونانو ثانية.
// Scala
dotNetTicksToTimestamp( Column ) : Column // returns timestamp as TimestampType
dotNetTicksToUnixEpoch( Column ) : Column // returns Unix epoch seconds as DecimalType
dotNetTicksToUnixEpochNanos( Column ) : Column // returns Unix epoch nanoseconds as LongType
يتم توفير العكس من خلال (جميع علامات التجزئة LongType
.Net المرتجعة):
// Scala
timestampToDotNetTicks( Column ) : Column
unixEpochToDotNetTicks( Column ) : Column
unixEpochNanosToDotNetTicks( Column ) : Column
هذه الطرق متوفرة أيضًا في بايثون:
# Python
dotnet_ticks_to_timestamp ( column_or_name ) # returns timestamp as TimestampType
dotnet_ticks_to_unix_epoch ( column_or_name ) # returns Unix epoch seconds as DecimalType
dotnet_ticks_to_unix_epoch_nanos ( column_or_name ) # returns Unix epoch nanoseconds as LongType
timestamp_to_dotnet_ticks ( column_or_name )
unix_epoch_to_dotnet_ticks ( column_or_name )
unix_epoch_nanos_to_dotnet_ticks ( column_or_name )
Spark Temporary Directory [*] : قم بإنشاء دليل مؤقت سيتم إزالته عند إيقاف تشغيل تطبيق Spark.
سكالا:
import uk . co . gresearch . spark . createTemporaryDir
val dir = createTemporaryDir( " prefix " )
بايثون:
# noinspection PyUnresolvedReferences
from gresearch . spark import *
dir = spark . create_temporary_dir ( "prefix" )
وصف وظيفة Spark [*] : قم بتعيين الوصف الوظيفي لـ Spark لجميع وظائف Spark ضمن السياق.
import uk . co . gresearch . spark . _
implicit val session : SparkSession = spark
withJobDescription( " parquet file " ) {
val df = spark.read.parquet( " data.parquet " )
val count = appendJobDescription( " count " ) {
df.count
}
appendJobDescription( " write " ) {
df.write.csv( " data.csv " )
}
}
بدون وصف وظيفي | مع الوصف الوظيفي |
---|---|
لاحظ أن تعيين وصف في خيط واحد أثناء استدعاء الإجراء (على سبيل المثال .count
) في خيط مختلف لا يعمل، إلا إذا تم إنشاء خيط مختلف من الخيط الحالي بعد تعيين الوصف.
مثال عملي مع المجموعات المتوازية:
import java . util . concurrent . ForkJoinPool
import scala . collection . parallel . CollectionConverters . seqIsParallelizable
import scala . collection . parallel . ForkJoinTaskSupport
val files = Seq ( " data1.csv " , " data2.csv " ).par
val counts = withJobDescription( " Counting rows " ) {
// new thread pool required to spawn new threads from this thread
// so that the job description is actually used
files.tasksupport = new ForkJoinTaskSupport ( new ForkJoinPool ())
files.map(filename => spark.read.csv(filename).count).sum
}(spark)
تتوفر حزمة spark-extension
لجميع إصدارات Spark 3.2 و3.3 و3.4 و3.5. قد تكون بعض إصدارات Spark السابقة مدعومة أيضًا. يحتوي إصدار الحزمة على الدلالات التالية: spark-extension_{SCALA_COMPAT_VERSION}-{VERSION}-{SPARK_COMPAT_VERSION}
:
SCALA_COMPAT_VERSION
: إصدار التوافق الثنائي Scala (الثانوي). متوفر 2.12
و 2.13
.SPARK_COMPAT_VERSION
: إصدار التوافق الثنائي (الثانوي) من Apache Spark. المتوفرة 3.2
و 3.3
و 3.4
و 3.5
.VERSION
: إصدار الحزمة، على سبيل المثال 2.10.0
. أضف هذا السطر إلى ملف build.sbt
الخاص بك:
libraryDependencies + = " uk.co.gresearch.spark " %% " spark-extension " % " 2.13.0-3.5 "
أضف هذه التبعية إلى ملف pom.xml
الخاص بك:
< dependency >
< groupId >uk.co.gresearch.spark</ groupId >
< artifactId >spark-extension_2.12</ artifactId >
< version >2.13.0-3.5</ version >
</ dependency >
أضف هذه التبعية إلى ملف build.gradle
الخاص بك:
dependencies {
implementation " uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5 "
}
أرسل تطبيق Spark الخاص بك باستخدام ملحق Spark Extension (الإصدار ≥1.1.0) كما يلي:
spark-submit --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5 [jar]
ملاحظة: اختر إصدار Scala المناسب (هنا 2.12) وإصدار Spark (هنا 3.5) اعتمادًا على إصدار Spark الخاص بك.
قم بتشغيل Spark Shell مع تبعية Spark Extension (الإصدار ≥1.1.0) كما يلي:
spark-shell --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5
ملحوظة: اختر إصدار Scala المناسب (هنا 2.12) وإصدار Spark (هنا 3.5) اعتمادًا على إصدار Spark Shell الخاص بك.
ابدأ جلسة PySpark باستخدام ملحق Spark Extension (الإصدار ≥1.1.0) كما يلي:
from pyspark . sql import SparkSession
spark = SparkSession
. builder
. config ( "spark.jars.packages" , "uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5" )
. getOrCreate ()
ملحوظة: اختر إصدار Scala الصحيح (هنا 2.12) وإصدار Spark (هنا 3.5) اعتمادًا على إصدار PySpark الخاص بك.
قم بتشغيل Python Spark REPL مع تبعية Spark Extension (الإصدار ≥1.1.0) كما يلي:
pyspark --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5
ملحوظة: اختر إصدار Scala الصحيح (هنا 2.12) وإصدار Spark (هنا 3.5) اعتمادًا على إصدار PySpark الخاص بك.
spark-submit
PySpark قم بتشغيل برامج Python النصية التي تستخدم PySpark عبر spark-submit
:
spark-submit --packages uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5 [script.py]
ملاحظة: اختر إصدار Scala المناسب (هنا 2.12) وإصدار Spark (هنا 3.5) اعتمادًا على إصدار Spark الخاص بك.
قد ترغب في تثبيت حزمة pyspark-extension
python من PyPi في بيئة التطوير الخاصة بك. يوفر لك هذا إمكانات إكمال التعليمات البرمجية والكتابة والاختبار أثناء مرحلة التطوير.
سيظل تشغيل تطبيق Python الخاص بك على مجموعة Spark يتطلب إحدى الطرق المذكورة أعلاه لإضافة حزمة Scala إلى بيئة Spark.
pip install pyspark-extension==2.13.0.3.5
ملحوظة: اختر إصدار Spark المناسب (هنا 3.5) اعتمادًا على إصدار PySpark الخاص بك.
هناك الكثير من دفاتر الملاحظات الخاصة بعلوم البيانات. لاستخدام هذه المكتبة، أضف تبعية jar إلى دفتر ملاحظاتك باستخدام إحداثيات Maven التالية:
uk.co.gresearch.spark:spark-extension_2.12:2.13.0-3.5
أو قم بتنزيل الجرة ووضعها على نظام ملفات حيث يمكن الوصول إليها عن طريق دفتر الملاحظات، ثم قم بالإشارة إلى ملف الجرة مباشرة.
تحقق من وثائق دفتر الملاحظات المفضل لديك لمعرفة كيفية إضافة الجرار إلى بيئة Spark الخاصة بك.
معظم الميزات غير مدعومة في Python بالتزامن مع خادم Spark Connect. وينطبق هذا أيضًا على بيئة Databricks Runtime 13.x والإصدارات الأحدث. التفاصيل يمكن العثور عليها في هذه المدونة.
سيؤدي استدعاء أي من هذه الميزات عند الاتصال بخادم Spark Connect إلى ظهور هذا الخطأ:
This feature is not supported for Spark Connect.
استخدم اتصالًا كلاسيكيًا بمجموعة Spark بدلاً من ذلك.
يمكنك بناء هذا المشروع مقابل إصدارات مختلفة من Spark وScala.
إذا كنت تريد إنشاء إصدار Spark أو Scala مختلف عما تم تعريفه في ملف pom.xml
، فقم بتشغيله
sh set-version.sh [SPARK-VERSION] [SCALA-VERSION]
على سبيل المثال، قم بالتبديل إلى Spark 3.5.0 وScala 2.13.8 عن طريق تشغيل sh set-version.sh 3.5.0 2.13.8
.
ثم قم بتنفيذ mvn package
لإنشاء جرة من المصادر. يمكن العثور عليها في target/
.
قم بإجراء اختبارات Scala عبر mvn test
.
من أجل تشغيل اختبارات Python، قم بإعداد بيئة Python على النحو التالي (استبدل [SCALA-COMPAT-VERSION]
و [SPARK-COMPAT-VERSION]
بالقيم المعنية):
virtualenv -p python3 venv
source venv/bin/activate
pip install -r python/requirements-[SPARK-COMPAT-VERSION]_[SCALA-COMPAT-VERSION].txt
pip install pytest
قم بإجراء اختبارات Python عبر env PYTHONPATH=python:python/test python -m pytest python/test
.
ملاحظة: عليك أولاً إنشاء مصادر Scala.
قم بتشغيل التسلسل التالي من الأوامر في الدليل الجذر للمشروع:
mkdir -p python/pyspark/jars/
cp -v target/spark-extension_ * - * .jar python/pyspark/jars/
pip install build
ثم قم بتنفيذ python -m build python/
لإنشاء whl من المصادر. يمكن العثور عليها في python/dist/
.