يدعم الموصل قراءة جداول Google BigQuery في DataFrames الخاصة بـ Spark، وكتابة DataFrames مرة أخرى في BigQuery. ويتم ذلك عن طريق استخدام Spark SQL Data Source API للتواصل مع BigQuery.
تعمل واجهة برمجة تطبيقات التخزين على تدفق البيانات بشكل متوازٍ مباشرةً من BigQuery عبر gRPC دون استخدام Google Cloud Storage كوسيط.
يتمتع بعدد من المزايا مقارنة باستخدام تدفق القراءة السابق القائم على التصدير والذي من المفترض أن يؤدي بشكل عام إلى أداء قراءة أفضل:
لا يترك أي ملفات مؤقتة في Google Cloud Storage. تتم قراءة الصفوف مباشرةً من خوادم BigQuery باستخدام تنسيقات Arrow أو Avro wire.
تسمح واجهة برمجة التطبيقات الجديدة بتصفية الأعمدة والمسندات لقراءة البيانات التي تهمك فقط.
نظرًا لأن BigQuery مدعوم بمخزن بيانات عمودي، فيمكنه دفق البيانات بكفاءة دون قراءة جميع الأعمدة.
تدعم واجهة برمجة تطبيقات التخزين عملية الضغط العشوائي للمرشحات الأصلية. يدعم إصدار الموصل 0.8.0-beta وما فوقه الضغط لأسفل للمرشحات العشوائية إلى Bigquery.
هناك مشكلة معروفة في Spark وهي عدم السماح بالضغط على المرشحات في الحقول المتداخلة. على سبيل المثال - لن يتم الضغط على المرشحات مثل address.city = "Sunnyvale"
للأسفل إلى Bigquery.
تقوم واجهة برمجة التطبيقات (API) بإعادة توازن السجلات بين القراء حتى يكتملوا جميعًا. وهذا يعني أن جميع مراحل الخريطة ستنتهي بشكل متزامن تقريبًا. راجع مقالة المدونة هذه حول كيفية استخدام التقسيم الديناميكي بالمثل في Google Cloud Dataflow.
راجع تكوين التقسيم لمزيد من التفاصيل.
اتبع هذه التعليمات.
إذا لم تكن لديك بيئة Apache Spark، فيمكنك إنشاء مجموعة Cloud Dataproc بمصادقة مكونة مسبقًا. تفترض الأمثلة التالية أنك تستخدم Cloud Dataproc، ولكن يمكنك استخدام spark-submit
على أي مجموعة.
تحتاج أي مجموعة Dataproc تستخدم واجهة برمجة التطبيقات (API) إلى نطاقات "bigquery" أو "cloud-platform". تحتوي مجموعات Dataproc على نطاق "bigquery" بشكل افتراضي، لذلك يجب أن تعمل معظم المجموعات في المشاريع الممكّنة بشكل افتراضي، على سبيل المثال
MY_CLUSTER=...
gcloud dataproc clusters create "$MY_CLUSTER"
أحدث إصدار من الموصل متاح للعامة في الروابط التالية:
إصدار | وصلة |
---|---|
سبارك 3.5 | gs://spark-lib/bigquery/spark-3.5-bigquery-0.41.0.jar (رابط HTTP) |
سبارك 3.4 | gs://spark-lib/bigquery/spark-3.4-bigquery-0.41.0.jar (رابط HTTP) |
سبارك 3.3 | gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar (رابط HTTP) |
سبارك 3.2 | gs://spark-lib/bigquery/spark-3.2-bigquery-0.41.0.jar (رابط HTTP) |
سبارك 3.1 | gs://spark-lib/bigquery/spark-3.1-bigquery-0.41.0.jar (رابط HTTP) |
سبارك 2.4 | gs://spark-lib/bigquery/spark-2.4-bigquery-0.37.0.jar (رابط HTTP) |
سكالا 2.13 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-0.41.0.jar (رابط HTTP) |
سكالا 2.12 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.41.0.jar (رابط HTTP) |
سكالا 2.11 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.29.0.jar (رابط HTTP) |
الإصدارات الستة الأولى عبارة عن موصلات تعتمد على Java وتستهدف Spark 2.4/3.1/3.2/3.3/3.4/3.5 لجميع إصدارات Scala المبنية على واجهات برمجة تطبيقات مصدر البيانات الجديدة (Data Source API v2) من Spark.
الموصلان الأخيران عبارة عن موصلات تعتمد على Scala، يرجى استخدام الجرة ذات الصلة بتثبيت Spark الخاص بك كما هو موضح أدناه.
موصل شرارة | 2.3 | 2.4 | 3.0 | 3.1 | 3.2 | 3.3 | 3.4 | 3.5 |
---|---|---|---|---|---|---|---|---|
شرارة-3.5-bigquery | ✓ | |||||||
شرارة-3.4-bigquery | ✓ | ✓ | ||||||
شرارة-3.3-bigquery | ✓ | ✓ | ✓ | |||||
شرارة-3.2-bigquery | ✓ | ✓ | ✓ | ✓ | ||||
شرارة-3.1-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | |||
شرارة-2.4-bigquery | ✓ | |||||||
spark-bigquery-with-dependeency_2.13 | ✓ | ✓ | ✓ | ✓ | ||||
spark-bigquery-with-dependeency_2.12 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | |
spark-bigquery-with-dependeency_2.11 | ✓ | ✓ |
موصل صورة Dataproc | 1.3 | 1.4 | 1.5 | 2.0 | 2.1 | 2.2 | بدون خادم الصورة 1.0 | بدون خادم الصورة 2.0 | بدون خادم الصورة 2.1 | بدون خادم الصورة 2.2 |
---|---|---|---|---|---|---|---|---|---|---|
شرارة-3.5-bigquery | ✓ | ✓ | ||||||||
شرارة-3.4-bigquery | ✓ | ✓ | ✓ | |||||||
شرارة-3.3-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ||||
شرارة-3.2-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ||||
شرارة-3.1-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | |||
شرارة-2.4-bigquery | ✓ | ✓ | ||||||||
spark-bigquery-with-dependeency_2.13 | ✓ | ✓ | ✓ | |||||||
spark-bigquery-with-dependeency_2.12 | ✓ | ✓ | ✓ | ✓ | ✓ | |||||
spark-bigquery-with-dependeency_2.11 | ✓ | ✓ |
الموصل متاح أيضًا من مستودع Maven المركزي. يمكن استخدامه باستخدام خيار --packages
أو خاصية التكوين spark.jars.packages
. استخدم القيمة التالية
إصدار | قطعة أثرية موصل |
---|---|
سبارك 3.5 | com.google.cloud.spark:spark-3.5-bigquery:0.41.0 |
سبارك 3.4 | com.google.cloud.spark:spark-3.4-bigquery:0.41.0 |
سبارك 3.3 | com.google.cloud.spark:spark-3.3-bigquery:0.41.0 |
سبارك 3.2 | com.google.cloud.spark:spark-3.2-bigquery:0.41.0 |
سبارك 3.1 | com.google.cloud.spark:spark-3.1-bigquery:0.41.0 |
سبارك 2.4 | com.google.cloud.spark:spark-2.4-bigquery:0.37.0 |
سكالا 2.13 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.13:0.41.0 |
سكالا 2.12 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0 |
سكالا 2.11 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.29.0 |
مجموعات Dataproc التي تم إنشاؤها باستخدام الصورة 2.1 وما فوق، أو الدُفعات باستخدام خدمة Dataproc بدون خادم تأتي مزودة بموصل Spark BigQuery المدمج. لن يساعد استخدام --jars
أو --packages
(أو بدلاً من ذلك، إعدادات spark.jars
/ spark.jars.packages
) في هذه الحالة لأن الموصل المدمج له الأسبقية.
لاستخدام إصدار آخر غير الإصدار المدمج، يرجى القيام بأحد الإجراءات التالية:
--metadata SPARK_BQ_CONNECTOR_VERSION=0.41.0
أو --metadata SPARK_BQ_CONNECTOR_URL=gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar
لإنشاء المجموعة بجرة مختلفة. يمكن أن يشير عنوان URL إلى أي موصل JAR صالح لإصدار Spark الخاص بالمجموعة.--properties dataproc.sparkBqConnector.version=0.41.0
أو --properties dataproc.sparkBqConnector.uri=gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar
لإنشاء الدفعة بجرة مختلفة. يمكن أن يشير عنوان URL إلى أي موصل JAR صالح لإصدار Spark الخاص بوقت التشغيل. يمكنك تشغيل عدد كلمات PySpark بسيط مقابل واجهة برمجة التطبيقات (API) بدون تجميع عن طريق التشغيل
صورة Dataproc 1.5 وما فوق
gcloud dataproc jobs submit pyspark --cluster "$MY_CLUSTER"
--jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.41.0.jar
examples/python/shakespeare.py
صورة Dataproc 1.4 وما دونها
gcloud dataproc jobs submit pyspark --cluster "$MY_CLUSTER"
--jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.29.0.jar
examples/python/shakespeare.py
https://codelabs.developers.google.com/codelabs/pyspark-bigquery
يستخدم الموصل واجهة برمجة تطبيقات Spark SQL Data Source API:
df = spark.read
.format("bigquery")
.load("bigquery-public-data.samples.shakespeare")
أو Scala API الضمنية فقط:
import com.google.cloud.spark.bigquery._
val df = spark.read.bigquery("bigquery-public-data.samples.shakespeare")
لمزيد من المعلومات، راجع نماذج التعليمات البرمجية الإضافية في Python وScala وJava.
يتيح لك الموصل تشغيل أي استعلام SQL SELECT قياسي على BigQuery وجلب نتائجه مباشرة إلى Spark Dataframe. ويتم ذلك بسهولة كما هو موضح في نموذج التعليمات البرمجية التالي:
spark.conf.set("viewsEnabled","true")
spark.conf.set("materializationDataset","<dataset>")
sql = """
SELECT tag, COUNT(*) c
FROM (
SELECT SPLIT(tags, '|') tags
FROM `bigquery-public-data.stackoverflow.posts_questions` a
WHERE EXTRACT(YEAR FROM creation_date)>=2014
), UNNEST(tags) tag
GROUP BY 1
ORDER BY 2 DESC
LIMIT 10
"""
df = spark.read.format("bigquery").load(sql)
df.show()
مما يؤدي إلى النتيجة
+----------+-------+
| tag| c|
+----------+-------+
|javascript|1643617|
| python|1352904|
| java|1218220|
| android| 913638|
| php| 911806|
| c#| 905331|
| html| 769499|
| jquery| 608071|
| css| 510343|
| c++| 458938|
+----------+-------+
الخيار الثاني هو استخدام خيار query
مثل هذا:
df = spark.read.format("bigquery").option("query", sql).load()
لاحظ أن التنفيذ يجب أن يكون أسرع حيث يتم نقل النتيجة فقط عبر السلك. وبطريقة مماثلة، يمكن أن تتضمن الاستعلامات عمليات الصلات بشكل أكثر كفاءة ثم تشغيل الصلات على Spark أو استخدام ميزات BigQuery الأخرى مثل الاستعلامات الفرعية والوظائف المحددة من قبل مستخدم BigQuery وجداول أحرف البدل وBigQuery ML والمزيد.
لاستخدام هذه الميزة، يجب ضبط التكوينات التالية:
viewsEnabled
على true
.materializationDataset
على مجموعة بيانات حيث يكون لدى مستخدم Google Cloud Platform إذن إنشاء الجدول. materializationProject
اختياري. ملاحظة: كما هو مذكور في وثائق BigQuery، يجب أن تكون الجداول التي تم الاستعلام عنها في نفس موقع materializationDataset
. أيضًا، إذا كانت الجداول الموجودة في SQL statement
من مشاريع أخرى غير parentProject
، فاستخدم اسم الجدول المؤهل بالكامل، أي [project].[dataset].[table]
.
هام: يتم تنفيذ هذه الميزة عن طريق تشغيل الاستعلام على BigQuery وحفظ النتيجة في جدول مؤقت، حيث سيقرأ Spark النتائج منه. وقد يؤدي هذا إلى إضافة تكاليف إضافية إلى حسابك على BigQuery.
يحتوي الموصل على دعم أولي للقراءة من طرق عرض BigQuery. يرجى ملاحظة أن هناك بعض التحذيرات:
collect()
أو count()
.materializationProject
و materializationDataset
الاختياريين، على التوالي. يمكن أيضًا تعيين هذه الخيارات عالميًا عن طريق استدعاء spark.conf.set(...)
قبل قراءة طرق العرض..option("viewsEnabled", "true")
) أو قم بتعيينه بشكل عام عن طريق استدعاء spark.conf.set("viewsEnabled", "true")
.materializationDataset
في نفس موقع العرض.يمكن كتابة DataFrames إلى BigQuery باستخدام طريقتين: المباشرة وغير المباشرة.
في هذه الطريقة، تتم كتابة البيانات مباشرة إلى BigQuery باستخدام BigQuery Storage Write API. لتمكين هذا الخيار، يرجى ضبط خيار writeMethod
على direct
، كما هو موضح أدناه:
df.write
.format("bigquery")
.option("writeMethod", "direct")
.save("dataset.table")
يتم دعم الكتابة إلى الجداول المقسمة الموجودة (مقسمة التاريخ، ومقسمة وقت العرض، ومقسمة النطاق) في وضع حفظ APPEND ووضع OVERWRITE (مقسم التاريخ والنطاق فقط) بشكل كامل بواسطة الموصل وواجهة برمجة التطبيقات BigQuery Storage Write API. إن استخدام datePartition
و partitionField
و partitionType
و partitionRangeStart
و partitionRangeEnd
و partitionRangeInterval
الموضح أدناه غير مدعوم في الوقت الحالي بواسطة طريقة الكتابة المباشرة.
هام: يرجى الرجوع إلى صفحة تسعير استيعاب البيانات فيما يتعلق بتسعير BigQuery Storage Write API.
هام: يرجى استخدام الإصدار 0.24.2 وما فوق للكتابة المباشرة، حيث أن الإصدارات السابقة تحتوي على خطأ قد يتسبب في حذف الجدول في حالات معينة.
في هذه الطريقة، تتم كتابة البيانات أولاً إلى GCS، ثم يتم تحميلها إلى BigQuery. يجب تكوين مجموعة GCS للإشارة إلى موقع البيانات المؤقت.
df.write
.format("bigquery")
.option("temporaryGcsBucket","some-bucket")
.save("dataset.table")
يتم تخزين البيانات مؤقتًا باستخدام تنسيقات Apache Parquet أو Apache ORC أو Apache Avro.
يمكن أيضًا تعيين مجموعة GCS والتنسيق عالميًا باستخدام Spark's RuntimeConfig مثل هذا:
spark.conf.set("temporaryGcsBucket","some-bucket")
df.write
.format("bigquery")
.save("dataset.table")
عند دفق DataFrame إلى BigQuery، تتم كتابة كل دفعة بنفس طريقة DataFrame غير المتدفقة. لاحظ أنه يجب تحديد موقع نقطة تفتيش متوافق مع HDFS (على سبيل المثال: path/to/HDFS/dir
أو gs://checkpoint-bucket/checkpointDir
).
df.writeStream
.format("bigquery")
.option("temporaryGcsBucket","some-bucket")
.option("checkpointLocation", "some-location")
.option("table", "dataset.table")
هام: لا يقوم الموصل بتكوين موصل GCS لتجنب التعارض مع موصل GCS آخر، إذا كان موجودًا. من أجل استخدام إمكانيات الكتابة للموصل، يرجى تكوين موصل GCS على مجموعتك كما هو موضح هنا.
تدعم واجهة برمجة التطبيقات عددًا من الخيارات لتكوين القراءة
<style> table#propertytable td، الجدول {word-break:break-word } </style>ملكية | معنى | الاستخدام |
---|---|---|
table | جدول BigQuery بتنسيق [[project:]dataset.]table . يوصى باستخدام معلمة path الخاصة load() / save() بدلاً من ذلك. لقد تم إهمال هذا الخيار وستتم إزالته في إصدار مستقبلي.(موقوف) | القراءة/الكتابة |
dataset | مجموعة البيانات التي تحتوي على الجدول. يجب استخدام هذا الخيار مع الجدول وطرق العرض القياسية، ولكن ليس عند تحميل نتائج الاستعلام. (اختياري ما لم يتم حذفه في table ) | القراءة/الكتابة |
project | معرف مشروع Google Cloud للجدول. يجب استخدام هذا الخيار مع الجدول وطرق العرض القياسية، ولكن ليس عند تحميل نتائج الاستعلام. (اختياري. الإعدادات الافتراضية لمشروع حساب الخدمة المستخدم) | القراءة/الكتابة |
parentProject | معرف مشروع Google Cloud للجدول الذي سيتم إصدار فاتورة به لعملية التصدير. (اختياري. الإعدادات الافتراضية لمشروع حساب الخدمة المستخدم) | القراءة/الكتابة |
maxParallelism | الحد الأقصى لعدد الأقسام التي سيتم تقسيم البيانات إليها. قد يكون العدد الفعلي أقل إذا رأت BigQuery أن البيانات صغيرة بدرجة كافية. إذا لم يكن هناك ما يكفي من المنفذين لجدولة قارئ لكل قسم، فقد تكون بعض الأقسام فارغة. هام: لا تزال المعلمة القديمة ( parallelism ) مدعومة ولكن في الوضع المهمل. ستتم إزالته في الإصدار 1.0 من الموصل.(اختياري. القيمة الافتراضية هي الأكبر من MinParallelism المفضل و20000).) | يقرأ |
preferredMinParallelism | الحد الأدنى المفضل لعدد الأقسام لتقسيم البيانات إليها. قد يكون العدد الفعلي أقل إذا رأت BigQuery أن البيانات صغيرة بدرجة كافية. إذا لم يكن هناك ما يكفي من المنفذين لجدولة قارئ لكل قسم، فقد تكون بعض الأقسام فارغة. (اختياري. القيمة الافتراضية هي الأصغر من 3 أضعاف التوازي الافتراضي للتطبيق والحد الأقصى للتوازي.) | يقرأ |
viewsEnabled | تمكين الموصل من القراءة من طرق العرض وليس من الجداول فقط. يرجى قراءة القسم ذي الصلة قبل تفعيل هذا الخيار. (اختياري. الإعداد الافتراضي هو false ) | يقرأ |
materializationProject | معرف المشروع حيث سيتم إنشاء العرض الفعلي (اختياري. الإعدادات الافتراضية لمعرف مشروع العرض) | يقرأ |
materializationDataset | مجموعة البيانات التي سيتم إنشاء العرض الفعلي فيها. يجب أن تكون مجموعة البيانات هذه في نفس موقع العرض أو الجداول التي تم الاستعلام عنها. (اختياري. الإعدادات الافتراضية لمجموعة بيانات العرض) | يقرأ |
materializationExpirationTimeInMinutes | وقت انتهاء صلاحية الجدول المؤقت الذي يحتوي على البيانات الفعلية للعرض أو الاستعلام، بالدقائق. لاحظ أن الموصل قد يعيد استخدام الجدول المؤقت بسبب استخدام ذاكرة التخزين المؤقت المحلية ومن أجل تقليل حساب BigQuery، لذلك قد تتسبب القيم المنخفضة جدًا في حدوث أخطاء. يجب أن تكون القيمة عددًا صحيحًا موجبًا. (اختياري. الإعدادات الافتراضية هي 1440، أو 24 ساعة) | يقرأ |
readDataFormat | تنسيق البيانات للقراءة من BigQuery. الخيارات: ARROW ، AVRO (اختياري. الإعدادات الافتراضية هي ARROW ) | يقرأ |
optimizedEmptyProjection | يستخدم الموصل منطق إسقاط فارغًا محسّنًا (حدد بدون أي أعمدة)، يُستخدم لتنفيذ count() . يأخذ هذا المنطق البيانات مباشرة من بيانات تعريف الجدول أو ينفذ عملية `SELECT COUNT(*) WHERE...` فعالة للغاية في حالة وجود مرشح. يمكنك إلغاء استخدام هذا المنطق عن طريق ضبط هذا الخيار على false .(اختياري، الافتراضي هو true ) | يقرأ |
pushAllFilters | إذا تم التعيين على true ، فسيدفع الموصل جميع المرشحات التي يمكن لـ Spark تفويضها إلى BigQuery Storage API. وهذا يقلل من كمية البيانات التي يلزم إرسالها من خوادم BigQuery Storage API إلى عملاء Spark. لقد تم إهمال هذا الخيار وستتم إزالته في إصدار مستقبلي.(اختياري، الافتراضي هو true )(موقوف) | يقرأ |
bigQueryJobLabel | يمكن استخدامه لإضافة تسميات إلى الاستعلام الذي بدأه الموصل وتحميل مهام BigQuery. يمكن تعيين تسميات متعددة. (خياري) | يقرأ |
bigQueryTableLabel | يمكن استخدامه لإضافة تسميات إلى الجدول أثناء الكتابة إلى الجدول. يمكن تعيين تسميات متعددة. (خياري) | يكتب |
traceApplicationName | اسم التطبيق المستخدم لتتبع جلسات القراءة والكتابة في BigQuery Storage. يلزم تعيين اسم التطبيق لتعيين معرف التتبع في الجلسات. (خياري) | يقرأ |
traceJobId | معرف الوظيفة المستخدم لتتبع جلسات القراءة والكتابة في BigQuery Storage. (اختياري، الإعدادات الافتراضية لمعرف مهمة Dataproc موجودة، وإلا سيتم استخدام معرف تطبيق Spark) | يقرأ |
createDisposition | يحدد ما إذا كان مسموحًا للمهمة بإنشاء جداول جديدة. القيم المسموح بها هي:
(اختياري. الافتراضي هو CREATE_IF_NEEDED). | يكتب |
writeMethod | يتحكم في الطريقة التي تتم بها كتابة البيانات إلى BigQuery. القيم المتاحة direct لاستخدام BigQuery Storage Write API indirect والتي تكتب البيانات أولاً إلى GCS ثم تقوم بتشغيل عملية تحميل BigQuery. رؤية المزيد هنا(اختياري، الافتراضي هو indirect ) | يكتب |
writeAtLeastOnce | يضمن كتابة البيانات إلى BigQuery مرة واحدة على الأقل. وهذا ضمان أقل من مرة واحدة بالضبط. يعد هذا مناسبًا لسيناريوهات الدفق التي تتم فيها كتابة البيانات بشكل مستمر على دفعات صغيرة. (اختياري. الإعداد الافتراضي هو false )مدعوم فقط بطريقة الكتابة "المباشرة" والوضع ليس "الكتابة الفوقية". | يكتب |
temporaryGcsBucket | مجموعة GCS التي تحتفظ بالبيانات مؤقتًا قبل تحميلها إلى BigQuery. مطلوب ما لم يتم تعيينه في تكوين Spark ( spark.conf.set(...) ).غير مدعوم بطريقة الكتابة "DIRECT". | يكتب |
persistentGcsBucket | مجموعة GCS التي تحتوي على البيانات قبل تحميلها إلى BigQuery. إذا تم إعلامك بذلك، فلن يتم حذف البيانات بعد كتابة البيانات في BigQuery. غير مدعوم بطريقة الكتابة "DIRECT". | يكتب |
persistentGcsPath | مسار GCS الذي يحتفظ بالبيانات قبل تحميلها إلى BigQuery. يستخدم فقط مع persistentGcsBucket .غير مدعوم بطريقة الكتابة "DIRECT". | يكتب |
intermediateFormat | تنسيق البيانات قبل تحميلها إلى BigQuery، يمكن أن تكون القيم إما "parquet" أو "orc" أو "avro". من أجل استخدام تنسيق Avro، يجب إضافة حزمة spark-avro في وقت التشغيل. (اختياري. الإعدادات الافتراضية parquet ). على الكتابة فقط. مدعوم فقط لأسلوب الكتابة "غير المباشر". | يكتب |
useAvroLogicalTypes | عند التحميل من Avro (`.option("intermediateFormat"، "avro")`)، يستخدم BigQuery أنواع Avro الأساسية بدلاً من الأنواع المنطقية [افتراضيًا](https://cloud.google.com/bigquery/docs/ تحميل-بيانات-التخزين السحابي-avro#logic_types). يؤدي توفير هذا الخيار إلى تحويل أنواع Avro المنطقية إلى أنواع بيانات BigQuery المقابلة لها. (اختياري. القيمة الافتراضية هي false ). على الكتابة فقط. | يكتب |
datePartition | قسم التاريخ الذي سيتم كتابة البيانات إليه. يجب أن تكون سلسلة تاريخ مقدمة بالتنسيق YYYYMMDD . يمكن استخدامها للكتابة فوق بيانات قسم واحد، مثل هذا:
(خياري). على الكتابة فقط. يمكن استخدامه أيضًا مع أنواع الأقسام المختلفة مثل: ساعة: YYYYMMDDHH الشهر: YYYYMM السنة: YYYY غير مدعوم بطريقة الكتابة "DIRECT". | يكتب |
partitionField | إذا تم تحديد هذا الحقل، فسيتم تقسيم الجدول حسب هذا الحقل. بالنسبة لتقسيم الوقت، قم بتحديده مع الخيار `partitionType`. بالنسبة لتقسيم النطاق الصحيح، حدده مع الخيارات الثلاثة: `partitionRangeStart`، `partitionRangeEnd، `partitionRangeInterval`. يجب أن يكون الحقل عبارة عن حقل TIMESTAMP أو DATE ذي المستوى الأعلى لتقسيم الوقت، أو INT64 لتقسيم النطاق الصحيح. يجب أن يكون وضعه NULLABLE أو REQUIRED . إذا لم يتم تعيين الخيار لجدول مقسم زمنيًا، فسيتم تقسيم الجدول بواسطة عمود زائف، تتم الإشارة إليه إما عبر '_PARTITIONTIME' as TIMESTAMP ، أو '_PARTITIONDATE' as DATE .(خياري). غير مدعوم بطريقة الكتابة "DIRECT". | يكتب |
partitionExpirationMs | عدد المللي ثانية اللازمة للاحتفاظ بتخزين الأقسام في الجدول. سيكون للتخزين في القسم وقت انتهاء صلاحية وقت القسم الخاص به بالإضافة إلى هذه القيمة. (خياري). غير مدعوم بطريقة الكتابة "DIRECT". | يكتب |
partitionType | يستخدم لتحديد تقسيم الوقت. الأنواع المدعومة هي: HOUR, DAY, MONTH, YEAR يعد هذا الخيار إلزاميًا حتى يتم تقسيم الجدول الهدف إلى وقت. (اختياري. الافتراضي هو DAY إذا تم تحديد PartitionField). غير مدعوم بطريقة الكتابة "DIRECT". | يكتب |
partitionRangeStart ، partitionRangeEnd ، partitionRangeInterval | يستخدم لتحديد تقسيم النطاق الصحيح. تعتبر هذه الخيارات إلزامية لتقسيم الجدول الهدف إلى نطاق صحيح. يجب تحديد جميع الخيارات الثلاثة. غير مدعوم بطريقة الكتابة "DIRECT". | يكتب |
clusteredFields | سلسلة من أعمدة المستوى الأعلى غير المتكررة مفصولة بفاصلة. (خياري). | يكتب |
allowFieldAddition | إضافة ALLOW_FIELD_ADDITION SchemaUpdateOption إلى BigQuery LoadJob. القيم المسموح بها true false .(اختياري. الافتراضي هو false ).مدعوم فقط بطريقة الكتابة "غير المباشرة". | يكتب |
allowFieldRelaxation | إضافة ALLOW_FIELD_RELAXATION SchemaUpdateOption إلى BigQuery LoadJob. القيم المسموح بها true false .(اختياري. الافتراضي هو false ).مدعوم فقط بطريقة الكتابة "غير المباشرة". | يكتب |
proxyAddress | عنوان الخادم الوكيل. يجب أن يكون الوكيل وكيل HTTP ويجب أن يكون العنوان بتنسيق "المضيف: المنفذ". يمكن تعيينه بدلاً من ذلك في تكوين Spark ( spark.conf.set(...) ) أو في تكوين Hadoop ( fs.gs.proxy.address ).(اختياري. مطلوب فقط في حالة الاتصال بـ Google Cloud Platform عبر الخادم الوكيل.) | القراءة/الكتابة |
proxyUsername | اسم المستخدم المستخدم للاتصال بالوكيل. يمكن تعيينه بدلاً من ذلك في تكوين Spark ( spark.conf.set(...) ) أو في تكوين Hadoop ( fs.gs.proxy.username ).(اختياري. مطلوب فقط في حالة الاتصال بـ Google Cloud Platform عبر الخادم الوكيل مع المصادقة.) | القراءة/الكتابة |
proxyPassword | كلمة المرور المستخدمة للاتصال بالوكيل. يمكن تعيينه بدلاً من ذلك في تكوين Spark ( spark.conf.set(...) ) أو في تكوين Hadoop ( fs.gs.proxy.password ).(اختياري. مطلوب فقط في حالة الاتصال بـ Google Cloud Platform عبر الخادم الوكيل مع المصادقة.) | القراءة/الكتابة |
httpMaxRetry | الحد الأقصى لعدد مرات إعادة المحاولة لطلبات HTTP ذات المستوى المنخفض إلى BigQuery. يمكن تعيينه بدلاً من ذلك في تكوين Spark ( spark.conf.set("httpMaxRetry", ...) ) أو في تكوين Hadoop ( fs.gs.http.max.retry ).(اختياري. الافتراضي هو 10) | القراءة/الكتابة |
httpConnectTimeout | المهلة بالمللي ثانية لإنشاء اتصال مع BigQuery. يمكن تعيينه بدلاً من ذلك في تكوين Spark ( spark.conf.set("httpConnectTimeout", ...) ) أو في تكوين Hadoop ( fs.gs.http.connect-timeout ).(اختياري. الافتراضي هو 60000 مللي ثانية 0 للمهلة اللانهائية، رقم سالب لـ 20000) | القراءة/الكتابة |
httpReadTimeout | المهلة بالمللي ثانية لقراءة البيانات من اتصال تم إنشاؤه. يمكن تعيينه بدلاً من ذلك في تكوين Spark ( spark.conf.set("httpReadTimeout", ...) ) أو في تكوين Hadoop ( fs.gs.http.read-timeout ).(اختياري. الافتراضي هو 60000 مللي ثانية 0 للمهلة اللانهائية، رقم سالب لـ 20000) | يقرأ |
arrowCompressionCodec | برنامج ترميز الضغط أثناء القراءة من جدول BigQuery عند استخدام تنسيق Arrow. الخيارات: ZSTD (Zstandard compression) ، LZ4_FRAME (https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md) ، COMPRESSION_UNSPECIFIED . برنامج ترميز الضغط الموصى به هو ZSTD أثناء استخدام Java.(اختياري. الإعدادات الافتراضية هي COMPRESSION_UNSPECIFIED مما يعني أنه لن يتم استخدام أي ضغط) | يقرأ |
responseCompressionCodec | برنامج ترميز الضغط المستخدم لضغط بيانات ReadRowsResponse. الخيارات: RESPONSE_COMPRESSION_CODEC_UNSPECIFIED ، RESPONSE_COMPRESSION_CODEC_LZ4 (اختياري. الإعدادات الافتراضية هي RESPONSE_COMPRESSION_CODEC_UNSPECIFIED مما يعني أنه لن يتم استخدام أي ضغط) | يقرأ |
cacheExpirationTimeInMinutes | وقت انتهاء صلاحية ذاكرة التخزين المؤقت في الذاكرة التي تقوم بتخزين معلومات الاستعلام. لتعطيل التخزين المؤقت، اضبط القيمة على 0. (اختياري. الافتراضي هو 15 دقيقة) | يقرأ |
enableModeCheckForSchemaFields | التحقق من أن وضع كل حقل في مخطط الوجهة يساوي الوضع في مخطط الحقل المصدر المقابل، أثناء الكتابة المباشرة. القيمة الافتراضية صحيحة، أي أن الفحص يتم بشكل افتراضي. إذا تم التعيين على خطأ، فسيتم تجاهل التحقق من الوضع. | يكتب |
enableListInference | يشير إلى ما إذا كان سيتم استخدام استنتاج المخطط على وجه التحديد عندما يكون الوضع Parquet (https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions). الافتراضيات كاذبة. | يكتب |
bqChannelPoolSize | الحجم (الثابت) لتجمع قنوات gRPC الذي تم إنشاؤه بواسطة BigQueryReadClient. للحصول على الأداء الأمثل، يجب ضبط ذلك على عدد النوى على الأقل في منفذي المجموعة. | يقرأ |
createReadSessionTimeoutInSeconds | المهلة بالثواني لإنشاء جلسة قراءة عند قراءة جدول. بالنسبة للجدول الكبير جدًا، يجب زيادة هذه القيمة. (اختياري. الافتراضي هو 600 ثانية) | يقرأ |
queryJobPriority | تم تعيين مستويات الأولوية للوظيفة أثناء قراءة البيانات من استعلام BigQuery. القيم المسموح بها هي:
(اختياري. الإعدادات الافتراضية هي INTERACTIVE ) | القراءة/الكتابة |
destinationTableKmsKeyName | يصف مفتاح تشفير Cloud KMS الذي سيتم استخدامه لحماية جدول BigQuery الوجهة. يتطلب حساب خدمة BigQuery المرتبط بمشروعك الوصول إلى مفتاح التشفير هذا. لمزيد من المعلومات حول استخدام CMEK مع BigQuery، راجع [هنا](https://cloud.google.com/bigquery/docs/customer-managed-encryption#key_resource_id). ملاحظة: سيتم تشفير الجدول بواسطة المفتاح فقط إذا تم إنشاؤه بواسطة الموصل. لن يتم تشفير الجدول غير المشفر الموجود مسبقًا بمجرد تعيين هذا الخيار. (خياري) | يكتب |
allowMapTypeConversion | التكوين المنطقي لتعطيل التحويل من سجلات BigQuery إلى Spark MapType عندما يحتوي السجل على حقلين فرعيين بأسماء الحقول key value . القيمة الافتراضية true مما يسمح بالتحويل.(خياري) | يقرأ |
spark.sql.sources.partitionOverwriteMode | التكوين لتحديد وضع الكتابة الفوقية عند الكتابة عندما يكون الجدول مقسمًا إلى نطاق/وقت. يدعم حاليًا وضعين: STATIC و DYNAMIC . في الوضع STATIC ، تتم الكتابة فوق الجدول بأكمله. في الوضع DYNAMIC ، تتم الكتابة فوق البيانات بواسطة أقسام الجدول الموجود. القيمة الافتراضية هي STATIC .(خياري) | يكتب |
enableReadSessionCaching | التكوين المنطقي لتعطيل التخزين المؤقت لجلسة القراءة. يقوم BigQuery بتخزين جلسات القراءة للسماح بتخطيط استعلام Spark بشكل أسرع. القيمة الافتراضية true .(خياري) | يقرأ |
readSessionCacheDurationMins | قم بالتكوين لتعيين مدة التخزين المؤقت لجلسة القراءة بالدقائق. يعمل فقط إذا كان enableReadSessionCaching true (افتراضي). يسمح بتحديد مدة تخزين جلسات القراءة مؤقتًا. الحد الأقصى للقيمة المسموح بها هو 300 . القيمة الافتراضية هي 5 .(خياري) | يقرأ |
bigQueryJobTimeoutInMinutes | قم بالتكوين لتعيين مهلة مهمة BigQuery بالدقائق. القيمة الافتراضية هي 360 دقيقة.(خياري) | القراءة/الكتابة |
snapshotTimeMillis | طابع زمني محدد بالمللي ثانية لاستخدامه في قراءة لقطة جدول. بشكل افتراضي، لا يتم تعيين هذا ويتم قراءة الإصدار الأحدث من الجدول. (خياري) | يقرأ |
bigNumericDefaultPrecision | دقة افتراضية بديلة لحقول BigNumeric، نظرًا لأن الإعداد الافتراضي BigQuery واسع جدًا بالنسبة لـ Spark. يمكن أن تتراوح القيم بين 1 و38. يتم استخدام هذا الإعداد الافتراضي فقط عندما يحتوي الحقل على نوع BigNumeric غير محدد المعلمات. يرجى ملاحظة أنه قد يكون هناك فقدان للبيانات إذا كانت دقة البيانات الفعلية أكبر مما هو محدد. (خياري) | القراءة/الكتابة |
bigNumericDefaultScale | مقياس افتراضي بديل للحقول BigNumeric. يمكن أن تتراوح القيم بين 0 و38، وأقل من bigNumericFieldsPrecision. يتم استخدام هذا الإعداد الافتراضي فقط عندما يحتوي الحقل على نوع BigNumeric غير محدد المعلمات. يرجى ملاحظة أنه قد يكون هناك فقدان للبيانات إذا كان مقياس البيانات الفعلي أكبر مما هو محدد. (خياري) | القراءة/الكتابة |
يمكن أيضًا تعيين الخيارات خارج الكود، باستخدام المعلمة --conf
للمعلمة spark-submit
أو --properties
الخاصة بالمعلمة gcloud dataproc submit spark
. من أجل استخدام هذا، ألحق البادئة spark.datasource.bigquery.
إلى أي من الخيارات، على سبيل المثال spark.conf.set("temporaryGcsBucket", "some-bucket")
يمكن أيضًا تعيينه كـ --conf spark.datasource.bigquery.temporaryGcsBucket=some-bucket
.
باستثناء DATETIME
و TIME
، يتم توجيه جميع أنواع بيانات BigQuery إلى نوع بيانات Spark SQL المقابل. فيما يلي كافة التعيينات:
نوع بيانات SQL القياسية في BigQuery | شرارة SQL نوع البيانات | ملحوظات |
BOOL | BooleanType | |
INT64 | LongType | |
FLOAT64 | DoubleType | |
NUMERIC | DecimalType | يرجى الرجوع إلى الدعم الرقمي والرقمي الكبير |
BIGNUMERIC | DecimalType | يرجى الرجوع إلى الدعم الرقمي والرقمي الكبير |
STRING | StringType | |
BYTES | BinaryType | |
STRUCT | StructType | |
ARRAY | ArrayType | |
TIMESTAMP | TimestampType | |
DATE | DateType | |
DATETIME | StringType ، TimestampNTZType * | لا يحتوي Spark على نوع DATETIME. يمكن كتابة سلسلة الإشارة إلى عمود BQ DATETIME الموجود بشرط أن يكون بتنسيق BQ DATETIME الحرفي. * بالنسبة إلى Spark 3.4+، تتم قراءة BQ DATETIME كنوع Spark's TimestampNTZ، أي Java LocalDateTime |
TIME | LongType ، StringType * | Spark ليس لها نوع TIME. يمكن إرسال الأطوال التي تم إنشاؤها، والتي تشير إلى ميكروثانية منذ منتصف الليل، بأمان إلى TimestampType، ولكن هذا يتسبب في استنتاج التاريخ على أنه اليوم الحالي. وبالتالي يتم ترك الأوقات لفترة طويلة ويمكن للمستخدم الإدلاء بها إذا أراد ذلك. عند الإرسال إلى الطابع الزمني، يواجه TIME نفس مشكلات المنطقة الزمنية مثل DATETIME * يمكن كتابة السلسلة الوامضة إلى عمود BQ TIME الموجود بشرط أن تكون بتنسيق BQ TIME الحرفي. |
JSON | StringType | Spark ليس لها نوع JSON. تتم قراءة القيم كسلسلة. من أجل إعادة كتابة JSON إلى BigQuery، يلزم توفر الشروط التالية:
|
ARRAY<STRUCT<key,value>> | MapType | لا يحتوي BigQuery على نوع MAP، وبالتالي، على غرار التحويلات الأخرى مثل Apache Avro وBigQuery Load jobs، يقوم الموصل بتحويل Spark Map إلى REPEATED STRUCT<key,value>. وهذا يعني أنه على الرغم من توفر كتابة الخرائط وقراءتها، فإن تشغيل SQL على BigQuery الذي يستخدم دلالات الخريطة غير مدعوم. للإشارة إلى قيم الخريطة باستخدام BigQuery SQL، يرجى مراجعة وثائق BigQuery. وبسبب حالات عدم التوافق هذه، يتم تطبيق بعض القيود:
|
يتم دعم Spark ML Vector وMatrix، بما في ذلك الإصدارات الكثيفة والمتفرقة. يتم حفظ البيانات كسجل BigQuery. لاحظ أنه تمت إضافة لاحقة إلى وصف الحقل الذي يتضمن نوع شرارة الحقل.
لكتابة هذه الأنواع إلى BigQuery، استخدم تنسيق ORC أو Avro المتوسط، واجعلها كعمود في الصف (أي ليس حقلاً في البنية).
تبلغ دقة BigNumeric في BigQuery 76.76 (الرقم 77 جزئي) ومقياس 38. نظرًا لأن هذه الدقة والمقياس يتجاوز دعم DecimalType الخاص بـ spark (مقياس 38 ودقة 38)، فهذا يعني أنه لا يمكن استخدام حقول BigNumeric بدقة أكبر من 38. . بمجرد تحديث قيود Spark هذه، سيتم تحديث الموصل وفقًا لذلك.
يحاول تحويل Spark Decimal/BigQuery Numeric الحفاظ على معلمات النوع، أي سيتم تحويل NUMERIC(10,2)
إلى Decimal(10,2)
والعكس صحيح. لكن لاحظ أن هناك حالات يتم فيها فقدان المعلمات. وهذا يعني أنه سيتم إرجاع المعلمات إلى الإعدادات الافتراضية - NUMERIC (38,9) وBIGNUMERIC(76,38). وهذا يعني أنه في الوقت الحالي، يتم دعم قراءة BigNumeric فقط من جدول قياسي، ولكن ليس من طريقة عرض BigQuery أو عند قراءة البيانات من استعلام BigQuery.
يقوم الموصل تلقائيًا بحساب العمود والضغط لأسفل لتصفية عبارة SELECT
الخاصة بـ DataFrame على سبيل المثال
spark.read.bigquery("bigquery-public-data:samples.shakespeare")
.select("word")
.where("word = 'Hamlet' or word = 'Claudius'")
.collect()
مرشحات word
العمود ودفعت لأسفل عامل التصفية المسند word = 'hamlet' or word = 'Claudius'
.
إذا كنت لا ترغب في تقديم طلبات قراءة متعددة إلى BigQuery، فيمكنك تخزين DataFrame مؤقتًا قبل التصفية، على سبيل المثال:
val cachedDF = spark.read.bigquery("bigquery-public-data:samples.shakespeare").cache()
val rows = cachedDF.select("word")
.where("word = 'Hamlet'")
.collect()
// All of the table was cached and this doesn't require an API call
val otherRows = cachedDF.select("word_count")
.where("word = 'Romeo'")
.collect()
يمكنك أيضًا تحديد خيار filter
يدويًا، والذي سيتجاوز الضغط التلقائي للأسفل وسيقوم Spark ببقية التصفية في العميل.
الأعمدة الزائفة _PARTITIONDATE و_PARTITIONTIME ليست جزءًا من مخطط الجدول. لذلك، من أجل الاستعلام عن أقسام الجداول المقسمة، لا تستخدم طريقة Where() الموضحة أعلاه. بدلاً من ذلك، أضف خيار التصفية بالطريقة التالية:
val df = spark.read.format("bigquery")
.option("filter", "_PARTITIONDATE > '2019-01-01'")
...
.load(TABLE)
افتراضيًا، يقوم الموصل بإنشاء قسم واحد لكل 400 ميجابايت في الجدول الذي تتم قراءته (قبل التصفية). يجب أن يتوافق هذا تقريبًا مع الحد الأقصى لعدد أجهزة القراءة التي تدعمها BigQuery Storage API. يمكن تكوين هذا بشكل صريح باستخدام خاصية maxParallelism
. قد يحد BigQuery من عدد الأقسام بناءً على قيود الخادم.
من أجل دعم تتبع استخدام موارد BigQuery، توفر الموصلات الخيارات التالية لوضع علامة على موارد BigQuery:
يمكن للموصل تشغيل مهام التحميل والاستعلام في BigQuery. تتم إضافة التسميات إلى الوظائف بالطريقة التالية:
spark.conf.set("bigQueryJobLabel.cost_center", "analytics")
spark.conf.set("bigQueryJobLabel.usage", "nightly_etl")
سيؤدي هذا إلى إنشاء تسميات cost_center
= analytics
and usage
= nightly_etl
.
يستخدم للتعليق على جلسات القراءة والكتابة. معرف التتبع هو بالتنسيق Spark:ApplicationName:JobID
. يعد هذا خيارًا اختياريًا، ولاستخدامه يحتاج المستخدم إلى تعيين خاصية traceApplicationName
. يتم إنشاء JobID تلقائيًا بواسطة معرف مهمة Dataproc، مع الرجوع إلى معرف تطبيق Spark (مثل application_1648082975639_0001
). يمكن تجاوز معرف الوظيفة عن طريق تعيين خيار traceJobId
. لاحظ أن الطول الإجمالي لمعرف التتبع لا يمكن أن يزيد عن 256 حرفًا.
يمكن استخدام الموصل في أجهزة الكمبيوتر المحمولة Jupyter حتى لو لم يتم تثبيته على مجموعة Spark. ويمكن إضافته كجرة خارجية باستخدام الكود التالي:
بايثون:
from pyspark . sql import SparkSession
spark = SparkSession . builder
. config ( "spark.jars.packages" , "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0" )
. getOrCreate ()
df = spark . read . format ( "bigquery" )
. load ( "dataset.table" )
سكالا:
val spark = SparkSession .builder
.config( " spark.jars.packages " , " com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0 " )
.getOrCreate()
val df = spark.read.format( " bigquery " )
.load( " dataset.table " )
في حالة استخدام مجموعة Spark لـ Scala 2.12 (وهو اختياري لـ Spark 2.4.x، وإلزامي في 3.0.x)، فإن الحزمة ذات الصلة هي com.google.cloud.spark:spark-bigquery-with-dependeency_ 2.12 :0.41.0. لمعرفة إصدار Scala المستخدم، يرجى تشغيل الكود التالي:
بايثون:
spark . sparkContext . _jvm . scala . util . Properties . versionString ()
سكالا:
scala . util . Properties . versionString
ما لم تكن ترغب في استخدام Scala API الضمني spark.read.bigquery("TABLE_ID")
، ليست هناك حاجة للتجميع مقابل الموصل.
لتضمين الموصل في مشروعك:
< dependency >
< groupId >com.google.cloud.spark</ groupId >
< artifactId >spark-bigquery-with-dependencies_${scala.version}</ artifactId >
< version >0.41.0</ version >
</ dependency >
libraryDependencies + = " com.google.cloud.spark " %% " spark-bigquery-with-dependencies " % " 0.41.0 "
يقوم Spark بملء الكثير من المقاييس التي يمكن للمستخدم النهائي العثور عليها في صفحة سجل الشرارة. لكن كل هذه المقاييس مرتبطة بالشرارة والتي يتم جمعها ضمنيًا دون أي تغيير من الموصل. ولكن هناك عدد قليل من المقاييس التي يتم ملؤها من BigQuery وتكون مرئية حاليًا في سجلات التطبيق والتي يمكن قراءتها في سجلات برنامج التشغيل/المنفذ.
من Spark 3.2 فصاعدًا ، قدمت Spark واجهة برمجة التطبيقات لفضح المقاييس المخصصة في صفحة الشرارة واجهة المستخدم https://spark.apache.org/docs/3.2.0/api/java/org/apache/spark/sql/connector/metric /custommetric.html
حاليًا ، باستخدام واجهة برمجة التطبيقات هذه ، يعرض الموصل مقاييس BigQuery التالية أثناء القراءة
<style> جدول#metricstable td ، table th {word-break: break-word} </style>اسم متري | وصف |
---|---|
bytes read | قراءة عدد بايتات كبيرة |
rows read | قراءة عدد صفوف كبيرة |
scan time | مقدار الوقت الذي يقضيه بين استجابة صفوف القراءة المطلوبة الحصول على جميع المنفذين ، بالميلي ثانية. |
parse time | مقدار الوقت الذي تقضيه في تحليل الصفوف يقرأ عبر جميع المنفذين ، بالميلي ثانية. |
spark time | مقدار الوقت الذي تقضيه في Spark لمعالجة الاستعلامات (أي ، بصرف النظر عن المسح والتحليل) ، في جميع المنفذين ، بالمللي ثانية. |
ملاحظة: لاستخدام المقاييس في صفحة شرارة واجهة المستخدم ، تحتاج إلى التأكد من أن spark-bigquery-metrics-0.41.0.jar
هو مسار الفصل قبل بدء خادم التاريخ وإصدار الموصل هو spark-3.2
أو أعلى.
انظر وثائق التسعير الكبيرة.
يمكنك ضبط عدد الأقسام يدويًا باستخدام خاصية maxParallelism
. قد توفر BigQuery أقسامًا أقل مما تطلبه. انظر تكوين التقسيم.
يمكنك أيضًا إعادة الإصلاح دائمًا بعد القراءة في Spark.
إذا كان هناك الكثير من الأقسام ، فقد يتم تجاوز حصص الإنتاجية أو الحصص الإنتاجية. يحدث هذا لأنه بينما تتم معالجة البيانات الموجودة في كل قسم بشكل تسلسلي ، يمكن معالجة أقسام مستقلة بالتوازي مع العقد المختلفة داخل مجموعة الشرارة. بشكل عام ، لضمان أقصى قدر من الإنتاجية المستدامة ، يجب عليك تقديم طلب زيادة الحصص. ومع ذلك ، يمكنك أيضًا تقليل عدد الأقسام التي يتم كتابتها يدويًا عن طريق استدعاء coalesce
على نظام البيانات للتخفيف من هذه المشكلة.
desiredPartitionCount = 5
dfNew = df.coalesce(desiredPartitionCount)
dfNew.write
هناك قاعدة عامة تتمثل في الحصول على مقبض قسم واحد لا يقل عن 1 جيجابايت من البيانات.
لاحظ أيضًا أن الوظيفة التي تعمل مع خاصية writeAtLeastOnce
التي تم تشغيلها لن تواجه أخطاء حصة CreaterRiteStream.
يحتاج الموصل إلى مثيل لـ googlecredentials من أجل الاتصال بواجهة برمجة تطبيقات BigQuery. هناك خيارات متعددة لتوفيرها:
GOOGLE_APPLICATION_CREDENTIALS
، كما هو موضح هنا. // Globally
spark.conf.set("credentialsFile", "</path/to/key/file>")
// Per read/Write
spark.read.format("bigquery").option("credentialsFile", "</path/to/key/file>")
// Globally
spark.conf.set("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
// Per read/Write
spark.read.format("bigquery").option("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
gcpAccessTokenProvider
. يجب تنفيذ AccessTokenProvider
في Java أو لغة JVM الأخرى مثل Scala أو Kotlin. يجب أن يكون إما مُنشئًا لا يوجد أو مُنشئ يقبل حجة java.util.String
واحدة. يمكن توفير معلمة التكوين هذه باستخدام خيار gcpAccessTokenProviderConfig
. إذا لم يتم توفير هذا ، فسيتم استدعاء مُنشئ عدم وجود arg. يجب أن تكون الجرة التي تحتوي على التنفيذ على Classpath للمجموعة. // Globally
spark.conf.set("gcpAccessTokenProvider", "com.example.ExampleAccessTokenProvider")
// Per read/Write
spark.read.format("bigquery").option("gcpAccessTokenProvider", "com.example.ExampleAccessTokenProvider")
يمكن تكوين انتحال شخصية حساب الخدمة لاسم مستخدم معين واسم مجموعة ، أو لجميع المستخدمين افتراضيًا باستخدام الخصائص أدناه:
gcpImpersonationServiceAccountForUser_<USER_NAME>
(لم يتم تعيينه افتراضيًا)
انتحال شخصية حساب الخدمة لمستخدم معين.
gcpImpersonationServiceAccountForGroup_<GROUP_NAME>
(لم يتم تعيينه افتراضيًا)
انتحال شخصية حساب الخدمة لمجموعة معينة.
gcpImpersonationServiceAccount
(لم يتم تعيينه افتراضيًا)
انتحال شخصية حساب الخدمة الافتراضية لجميع المستخدمين.
إذا تم تعيين أي من الخصائص المذكورة أعلاه ، فسيتم انتحال شخصية حساب الخدمة المحددة عن طريق إنشاء بيانات اعتماد قصيرة الأجل عند الوصول إلى BigQuery.
إذا تم تعيين أكثر من خاصية واحدة ، فإن حساب الخدمة المرتبط باسم المستخدم سوف يكون له الأسبقية على حساب الخدمة المرتبط باسم المجموعة للمستخدم والمجموعة المتطابقة ، والتي بدورها ستأخذ الأسبقية على انتحال حساب حساب الخدمة الافتراضي.
لتطبيق أبسط ، حيث لا يلزم تحديث الرمز المميز للوصول ، يتمثل بديل آخر في تمرير رمز الوصول كخيار تكوين gcpAccessToken
. يمكنك الحصول على رمز الوصول عن طريق تشغيل gcloud auth application-default print-access-token
.
// Globally
spark.conf.set("gcpAccessToken", "<access-token>")
// Per read/Write
spark.read.format("bigquery").option("gcpAccessToken", "<acccess-token>")
هام: يجب تنفيذ CredentialsProvider
و AccessTokenProvider
في Java أو لغة JVM الأخرى مثل Scala أو Kotlin. يجب أن تكون الجرة التي تحتوي على التنفيذ على Classpath للمجموعة.
إشعار: يجب توفير واحد فقط من الخيارات المذكورة أعلاه.
للاتصال بالوكيل الأمامي ولتوصل إلى بيانات اعتماد المستخدم ، قم بتكوين الخيارات التالية.
proxyAddress
: عنوان خادم الوكيل. يجب أن يكون الوكيل وكيل HTTP ويجب أن يكون العنوان في host:port
.
proxyUsername
: اسم المستخدم المستخدم للاتصال بالوكالة.
proxyPassword
: كلمة المرور المستخدمة للاتصال بالوكيل.
val df = spark.read.format("bigquery")
.option("proxyAddress", "http://my-proxy:1234")
.option("proxyUsername", "my-username")
.option("proxyPassword", "my-password")
.load("some-table")
يمكن أيضًا تعيين نفس معلمات الوكيل على مستوى العالم باستخدام Spark's RunTimeConfig مثل هذا:
spark.conf.set("proxyAddress", "http://my-proxy:1234")
spark.conf.set("proxyUsername", "my-username")
spark.conf.set("proxyPassword", "my-password")
val df = spark.read.format("bigquery")
.load("some-table")
يمكنك تعيين ما يلي في تكوين Hadoop أيضًا.
fs.gs.proxy.address
(على غرار "proxyaddress") ، fs.gs.proxy.username
(على غرار "proxyusername") و fs.gs.proxy.password
(على غرار "proxypasword").
إذا تم تعيين نفس المعلمة في أماكن متعددة ، يكون ترتيب الأولوية كما يلي:
الخيار ("المفتاح" ، "القيمة")> Spark.conf> تكوين Hadoop