Konektor ini mendukung pembacaan tabel Google BigQuery ke DataFrames Spark, dan menulis DataFrames kembali ke BigQuery. Hal ini dilakukan dengan menggunakan API Sumber Data Spark SQL untuk berkomunikasi dengan BigQuery.
Storage API mengalirkan data secara paralel langsung dari BigQuery melalui gRPC tanpa menggunakan Google Cloud Storage sebagai perantara.
Ini memiliki sejumlah keunggulan dibandingkan menggunakan alur baca berbasis ekspor sebelumnya yang secara umum akan menghasilkan kinerja baca yang lebih baik:
Itu tidak meninggalkan file sementara apa pun di Google Cloud Storage. Baris dibaca langsung dari server BigQuery menggunakan format kabel Arrow atau Avro.
API baru memungkinkan pemfilteran kolom dan predikat untuk hanya membaca data yang Anda minati.
Karena BigQuery didukung oleh penyimpanan data berbentuk kolom, BigQuery dapat mengalirkan data secara efisien tanpa membaca semua kolom.
Storage API mendukung penekanan filter predikat secara sewenang-wenang. Konektor versi 0.8.0-beta dan yang lebih baru mendukung penekanan filter arbitrer ke Bigquery.
Ada masalah umum di Spark yang tidak mengizinkan penekanan filter pada bidang bersarang. Misalnya - filter seperti address.city = "Sunnyvale"
tidak akan diturunkan ke Bigquery.
API menyeimbangkan kembali catatan antar pembaca hingga semuanya selesai. Ini berarti bahwa semua fase Peta akan selesai hampir secara bersamaan. Lihat artikel blog ini tentang cara sharding dinamis digunakan di Google Cloud Dataflow.
Lihat Mengonfigurasi Partisi untuk lebih jelasnya.
Ikuti petunjuk ini.
Jika Anda tidak memiliki lingkungan Apache Spark, Anda dapat membuat cluster Cloud Dataproc dengan autentikasi yang telah dikonfigurasi sebelumnya. Contoh berikut mengasumsikan Anda menggunakan Cloud Dataproc, namun Anda dapat menggunakan spark-submit
di cluster mana pun.
Setiap kluster Dataproc yang menggunakan API memerlukan cakupan 'bigquery' atau 'platform cloud'. Kluster Dataproc memiliki cakupan 'bigquery' secara default, sehingga sebagian besar kluster dalam proyek yang diaktifkan harus bekerja secara default, misalnya
MY_CLUSTER=...
gcloud dataproc clusters create "$MY_CLUSTER"
Versi terbaru konektor tersedia untuk umum di tautan berikut:
versi | Link |
---|---|
Percikan 3.5 | gs://spark-lib/bigquery/spark-3.5-bigquery-0.41.0.jar (tautan HTTP) |
Percikan 3.4 | gs://spark-lib/bigquery/spark-3.4-bigquery-0.41.0.jar (tautan HTTP) |
Percikan 3.3 | gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar (tautan HTTP) |
Percikan 3.2 | gs://spark-lib/bigquery/spark-3.2-bigquery-0.41.0.jar (tautan HTTP) |
Percikan 3.1 | gs://spark-lib/bigquery/spark-3.1-bigquery-0.41.0.jar (tautan HTTP) |
Percikan 2.4 | gs://spark-lib/bigquery/spark-2.4-bigquery-0.37.0.jar (tautan HTTP) |
Skala 2.13 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-0.41.0.jar (tautan HTTP) |
Skala 2.12 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.41.0.jar (tautan HTTP) |
Skala 2.11 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.29.0.jar (tautan HTTP) |
Enam versi pertama adalah konektor berbasis Java yang menargetkan Spark 2.4/3.1/3.2/3.3/3.4/3.5 dari semua versi Scala yang dibangun di API Sumber Data baru (API Sumber Data v2) dari Spark.
Dua konektor terakhir adalah konektor berbasis Scala, harap gunakan toples yang relevan dengan instalasi Spark Anda seperti diuraikan di bawah.
Konektor Percikan | 2.3 | 2.4 | 3.0 | 3.1 | 3.2 | 3.3 | 3.4 | 3.5 |
---|---|---|---|---|---|---|---|---|
percikan-3,5-permintaan besar | ✓ | |||||||
percikan-3.4-permintaan besar | ✓ | ✓ | ||||||
percikan-3.3-permintaan besar | ✓ | ✓ | ✓ | |||||
percikan-3.2-permintaan besar | ✓ | ✓ | ✓ | ✓ | ||||
percikan-3.1-permintaan besar | ✓ | ✓ | ✓ | ✓ | ✓ | |||
percikan-2.4-permintaan besar | ✓ | |||||||
spark-bigquery-dengan-dependensi_2.13 | ✓ | ✓ | ✓ | ✓ | ||||
spark-bigquery-dengan-dependensi_2.12 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | |
spark-bigquery-dengan-dependensi_2.11 | ✓ | ✓ |
Konektor Gambar Dataproc | 1.3 | 1.4 | 1.5 | 2.0 | 2.1 | 2.2 | Tanpa server Gambar 1.0 | Tanpa server Gambar 2.0 | Tanpa server Gambar 2.1 | Tanpa server Gambar 2.2 |
---|---|---|---|---|---|---|---|---|---|---|
percikan-3,5-permintaan besar | ✓ | ✓ | ||||||||
percikan-3.4-permintaan besar | ✓ | ✓ | ✓ | |||||||
percikan-3.3-permintaan besar | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ||||
percikan-3.2-permintaan besar | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ||||
percikan-3.1-permintaan besar | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | |||
percikan-2.4-permintaan besar | ✓ | ✓ | ||||||||
spark-bigquery-dengan-dependensi_2.13 | ✓ | ✓ | ✓ | |||||||
spark-bigquery-dengan-dependensi_2.12 | ✓ | ✓ | ✓ | ✓ | ✓ | |||||
spark-bigquery-dengan-dependensi_2.11 | ✓ | ✓ |
Konektor ini juga tersedia dari repositori Maven Central. Ini dapat digunakan menggunakan opsi --packages
atau properti konfigurasi spark.jars.packages
. Gunakan nilai berikut
versi | Artefak Konektor |
---|---|
Percikan 3.5 | com.google.cloud.spark:spark-3.5-bigquery:0.41.0 |
Percikan 3.4 | com.google.cloud.spark:spark-3.4-bigquery:0.41.0 |
Percikan 3.3 | com.google.cloud.spark:spark-3.3-bigquery:0.41.0 |
Percikan 3.2 | com.google.cloud.spark:spark-3.2-bigquery:0.41.0 |
Percikan 3.1 | com.google.cloud.spark:spark-3.1-bigquery:0.41.0 |
Percikan 2.4 | com.google.cloud.spark:spark-2.4-bigquery:0.37.0 |
Skala 2.13 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.13:0.41.0 |
Skala 2.12 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0 |
Skala 2.11 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.29.0 |
Kluster Dataproc yang dibuat menggunakan gambar 2.1 dan yang lebih baru, atau batch yang menggunakan layanan tanpa server Dataproc dilengkapi dengan konektor Spark BigQuery bawaan. Menggunakan --jars
atau --packages
standar (atau alternatifnya, konfigurasi spark.jars
/ spark.jars.packages
) tidak akan membantu dalam kasus ini karena konektor internal lebih diutamakan.
Untuk menggunakan versi lain selain versi bawaan, lakukan salah satu hal berikut:
--metadata SPARK_BQ_CONNECTOR_VERSION=0.41.0
, atau --metadata SPARK_BQ_CONNECTOR_URL=gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar
untuk membuat cluster dengan jar yang berbeda. URL dapat mengarah ke JAR konektor apa pun yang valid untuk versi Spark kluster.--properties dataproc.sparkBqConnector.version=0.41.0
, atau --properties dataproc.sparkBqConnector.uri=gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar
untuk membuat batch dengan toples yang berbeda. URL dapat mengarah ke JAR konektor apa pun yang valid untuk versi Spark runtime. Anda dapat menjalankan jumlah kata PySpark sederhana terhadap API tanpa kompilasi dengan menjalankan
Gambar Dataproc 1.5 ke atas
gcloud dataproc jobs submit pyspark --cluster "$MY_CLUSTER"
--jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.41.0.jar
examples/python/shakespeare.py
Gambar Dataproc 1.4 dan di bawahnya
gcloud dataproc jobs submit pyspark --cluster "$MY_CLUSTER"
--jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.29.0.jar
examples/python/shakespeare.py
https://codelabs.developers.google.com/codelabs/pyspark-bigquery
Konektor menggunakan API Sumber Data Spark SQL lintas bahasa:
df = spark.read
.format("bigquery")
.load("bigquery-public-data.samples.shakespeare")
atau API implisit khusus Scala:
import com.google.cloud.spark.bigquery._
val df = spark.read.bigquery("bigquery-public-data.samples.shakespeare")
Untuk informasi selengkapnya, lihat contoh kode tambahan dalam Python, Scala, dan Java.
Konektor ini memungkinkan Anda menjalankan kueri SQL SELECT Standar apa pun di BigQuery dan mengambil hasilnya langsung ke Spark Dataframe. Ini mudah dilakukan seperti yang dijelaskan dalam contoh kode berikut:
spark.conf.set("viewsEnabled","true")
spark.conf.set("materializationDataset","<dataset>")
sql = """
SELECT tag, COUNT(*) c
FROM (
SELECT SPLIT(tags, '|') tags
FROM `bigquery-public-data.stackoverflow.posts_questions` a
WHERE EXTRACT(YEAR FROM creation_date)>=2014
), UNNEST(tags) tag
GROUP BY 1
ORDER BY 2 DESC
LIMIT 10
"""
df = spark.read.format("bigquery").load(sql)
df.show()
Yang membuahkan hasil
+----------+-------+
| tag| c|
+----------+-------+
|javascript|1643617|
| python|1352904|
| java|1218220|
| android| 913638|
| php| 911806|
| c#| 905331|
| html| 769499|
| jquery| 608071|
| css| 510343|
| c++| 458938|
+----------+-------+
Opsi kedua adalah menggunakan opsi query
seperti ini:
df = spark.read.format("bigquery").option("query", sql).load()
Perhatikan bahwa eksekusi harus lebih cepat karena hanya hasilnya yang dikirimkan melalui kabel. Dengan cara yang sama, kueri dapat menyertakan GABUNG secara lebih efisien daripada menjalankan gabungan di Spark atau menggunakan fitur BigQuery lainnya seperti subkueri, fungsi yang ditentukan pengguna BigQuery, tabel wildcard, BigQuery ML, dan banyak lagi.
Untuk menggunakan fitur ini konfigurasi berikut HARUS diatur:
viewsEnabled
harus disetel ke true
.materializationDataset
harus disetel ke set data yang pengguna GCP-nya memiliki izin pembuatan tabel. materializationProject
adalah opsional. Catatan: Seperti disebutkan dalam dokumentasi BigQuery, tabel yang dikueri harus berada di lokasi yang sama dengan materializationDataset
. Juga, jika tabel dalam SQL statement
berasal dari proyek selain parentProject
maka gunakan nama tabel yang sepenuhnya memenuhi syarat yaitu [project].[dataset].[table]
.
Penting: Fitur ini diterapkan dengan menjalankan kueri di BigQuery dan menyimpan hasilnya ke dalam tabel sementara, yang hasilnya akan dibaca oleh Spark. Hal ini mungkin menambah biaya tambahan pada akun BigQuery Anda.
Konektor ini memiliki dukungan awal untuk membaca dari tampilan BigQuery. Harap dicatat ada beberapa peringatan:
collect()
atau count()
apa pun.materializationProject
dan materializationDataset
opsional. Opsi ini juga dapat diatur secara global dengan memanggil spark.conf.set(...)
sebelum membaca tampilan..option("viewsEnabled", "true")
) atau atur secara global dengan memanggil spark.conf.set("viewsEnabled", "true")
.materializationDataset
harus berada di lokasi yang sama dengan tampilan.Menulis DataFrames ke BigQuery dapat dilakukan dengan dua metode: Langsung dan Tidak Langsung.
Dalam metode ini data ditulis langsung ke BigQuery menggunakan BigQuery Storage Write API. Untuk mengaktifkan opsi ini, harap atur opsi writeMethod
ke direct
, seperti yang ditunjukkan di bawah ini:
df.write
.format("bigquery")
.option("writeMethod", "direct")
.save("dataset.table")
Penulisan ke tabel terpartisi yang sudah ada (tanggal dipartisi, waktu penyerapan dipartisi, dan rentang dipartisi) dalam mode simpan APPEND dan mode OVERWRITE (hanya partisi tanggal dan rentang) didukung sepenuhnya oleh konektor dan BigQuery Storage Write API. Penggunaan datePartition
, partitionField
, partitionType
, partitionRangeStart
, partitionRangeEnd
, partitionRangeInterval
yang dijelaskan di bawah saat ini tidak didukung oleh metode penulisan langsung.
Penting: Silakan merujuk ke halaman harga penyerapan data mengenai harga BigQuery Storage Write API.
Penting: Harap gunakan versi 0.24.2 ke atas untuk penulisan langsung, karena versi sebelumnya memiliki bug yang dapat menyebabkan penghapusan tabel dalam kasus tertentu.
Dalam metode ini, data ditulis terlebih dahulu ke GCS, lalu dimuat ke BigQuery. Bucket GCS harus dikonfigurasi untuk menunjukkan lokasi data sementara.
df.write
.format("bigquery")
.option("temporaryGcsBucket","some-bucket")
.save("dataset.table")
Data disimpan sementara menggunakan format Apache Parquet, Apache ORC atau Apache Avro.
Bucket GCS dan formatnya juga dapat disetel secara global menggunakan RuntimeConfig Spark seperti ini:
spark.conf.set("temporaryGcsBucket","some-bucket")
df.write
.format("bigquery")
.save("dataset.table")
Saat melakukan streaming DataFrame ke BigQuery, setiap batch ditulis dengan cara yang sama seperti DataFrame non-streaming. Perhatikan bahwa lokasi pos pemeriksaan yang kompatibel dengan HDFS (misalnya: path/to/HDFS/dir
atau gs://checkpoint-bucket/checkpointDir
) harus ditentukan.
df.writeStream
.format("bigquery")
.option("temporaryGcsBucket","some-bucket")
.option("checkpointLocation", "some-location")
.option("table", "dataset.table")
Penting: Konektor tidak mengonfigurasi konektor GCS, untuk menghindari konflik dengan konektor GCS lain, jika ada. Untuk menggunakan kemampuan tulis konektor, konfigurasikan konektor GCS di cluster Anda seperti yang dijelaskan di sini.
API Mendukung sejumlah opsi untuk mengonfigurasi pembacaan
<style> tabel#propertytable td, tabel th { kata-break:break-word } </style>Milik | Arti | Penggunaan |
---|---|---|
table | Tabel BigQuery dalam format [[project:]dataset.]table . Disarankan untuk menggunakan parameter path load() / save() sebagai gantinya. Opsi ini sudah tidak digunakan lagi dan akan dihapus di versi mendatang.(Tidak digunakan lagi) | Baca/Tulis |
dataset | Kumpulan data yang berisi tabel. Opsi ini harus digunakan dengan tabel dan tampilan standar, namun tidak saat memuat hasil kueri. (Opsional kecuali dihilangkan dalam table ) | Baca/Tulis |
project | ID Proyek Google Cloud dari tabel. Opsi ini harus digunakan dengan tabel dan tampilan standar, namun tidak saat memuat hasil kueri. (Opsional. Default pada proyek Akun Layanan yang digunakan) | Baca/Tulis |
parentProject | ID Project Google Cloud dari tabel yang akan ditagih untuk ekspor. (Opsional. Default pada proyek Akun Layanan yang digunakan) | Baca/Tulis |
maxParallelism | Jumlah maksimal partisi untuk membagi data. Jumlah sebenarnya mungkin lebih sedikit jika BigQuery menganggap datanya cukup kecil. Jika pelaksana tidak cukup untuk menjadwalkan pembaca per partisi, beberapa partisi mungkin kosong. Penting: Parameter lama ( parallelism ) masih didukung tetapi dalam mode tidak digunakan lagi. Ini akan dihapus di konektor versi 1.0.(Opsional. Defaultnya adalah MinParallelism pilihan yang lebih besar dan 20.000).) | Membaca |
preferredMinParallelism | Jumlah partisi minimal yang diinginkan untuk membagi data. Jumlah sebenarnya mungkin lebih sedikit jika BigQuery menganggap datanya cukup kecil. Jika pelaksana tidak cukup untuk menjadwalkan pembaca per partisi, beberapa partisi mungkin kosong. (Opsional. Defaultnya adalah yang terkecil 3 kali paralelisme dan maxParallelism default aplikasi.) | Membaca |
viewsEnabled | Memungkinkan konektor untuk membaca dari tampilan dan bukan hanya tabel. Harap baca bagian yang relevan sebelum mengaktifkan opsi ini. (Opsional. Defaultnya adalah false ) | Membaca |
materializationProject | Id proyek tempat tampilan terwujud akan dibuat (Opsional. Defaultnya adalah id proyek tampilan) | Membaca |
materializationDataset | Kumpulan data tempat tampilan terwujud akan dibuat. Kumpulan data ini harus berada di lokasi yang sama dengan tampilan atau tabel yang dikueri. (Opsional. Default untuk kumpulan data tampilan) | Membaca |
materializationExpirationTimeInMinutes | Waktu kedaluwarsa tabel sementara yang menyimpan data terwujud dari tampilan atau kueri, dalam hitungan menit. Perhatikan bahwa konektor mungkin menggunakan kembali tabel sementara karena penggunaan cache lokal dan untuk mengurangi komputasi BigQuery, sehingga nilai yang sangat rendah dapat menyebabkan error. Nilainya harus berupa bilangan bulat positif. (Opsional. Defaultnya adalah 1440, atau 24 jam) | Membaca |
readDataFormat | Format Data untuk membaca dari BigQuery. Pilihan : ARROW , AVRO (Opsional. Defaultnya adalah ARROW ) | Membaca |
optimizedEmptyProjection | Konektor menggunakan logika proyeksi kosong (pilih tanpa kolom apa pun) yang dioptimalkan, yang digunakan untuk eksekusi count() . Logika ini mengambil data langsung dari metadata tabel atau melakukan `SELECT COUNT(*) WHERE...` yang lebih efisien jika ada filter. Anda dapat membatalkan penggunaan logika ini dengan menyetel opsi ini ke false .(Opsional, defaultnya adalah true ) | Membaca |
pushAllFilters | Jika disetel ke true , konektor akan mendorong semua filter yang dapat didelegasikan Spark ke BigQuery Storage API. Hal ini mengurangi jumlah data yang perlu dikirim dari server BigQuery Storage API ke klien Spark. Opsi ini sudah tidak digunakan lagi dan akan dihapus di versi mendatang.(Opsional, defaultnya adalah true )(Tidak digunakan lagi) | Membaca |
bigQueryJobLabel | Dapat digunakan untuk menambahkan label ke kueri yang dimulai oleh konektor dan memuat tugas BigQuery. Beberapa label dapat diatur. (Opsional) | Membaca |
bigQueryTableLabel | Dapat digunakan untuk menambahkan label pada tabel sambil menulis pada tabel. Beberapa label dapat diatur. (Opsional) | Menulis |
traceApplicationName | Nama aplikasi yang digunakan untuk melacak sesi baca dan tulis BigQuery Storage. Menyetel nama aplikasi diperlukan untuk menyetel ID jejak pada sesi. (Opsional) | Membaca |
traceJobId | ID tugas yang digunakan untuk melacak sesi baca dan tulis BigQuery Storage. (Opsional, default untuk ID pekerjaan Dataproc ada, jika tidak, gunakan ID aplikasi Spark) | Membaca |
createDisposition | Menentukan apakah pekerjaan diperbolehkan untuk membuat tabel baru. Nilai yang diizinkan adalah:
(Opsional. Defaultnya adalah CREATE_IF_NEEDED). | Menulis |
writeMethod | Mengontrol metode penulisan data ke BigQuery. Nilai yang tersedia bersifat direct untuk menggunakan BigQuery Storage Write API dan indirect yang menulis data terlebih dahulu ke GCS lalu memicu operasi pemuatan BigQuery. Lihat selengkapnya di sini(Opsional, defaultnya adalah indirect ) | Menulis |
writeAtLeastOnce | Menjamin bahwa data ditulis ke BigQuery setidaknya sekali. Ini adalah jaminan yang lebih rendah dari sekali. Ini cocok untuk skenario streaming di mana data terus-menerus ditulis dalam jumlah kecil. (Opsional. Defaultnya adalah false )Hanya didukung oleh metode penulisan `DIRECT` dan mode BUKAN `Timpa`. | Menulis |
temporaryGcsBucket | Bucket GCS yang menyimpan data untuk sementara sebelum dimuat ke BigQuery. Diperlukan kecuali diatur dalam konfigurasi Spark ( spark.conf.set(...) ).Tidak didukung oleh metode penulisan `DIRECT`. | Menulis |
persistentGcsBucket | Bucket GCS yang menyimpan data sebelum dimuat ke BigQuery. Jika diinformasikan, data tidak akan dihapus setelah menulis data ke BigQuery. Tidak didukung oleh metode penulisan `DIRECT`. | Menulis |
persistentGcsPath | Jalur GCS yang menyimpan data sebelum dimuat ke BigQuery. Hanya digunakan dengan persistentGcsBucket .Tidak didukung oleh metode penulisan `DIRECT`. | Menulis |
intermediateFormat | Format data sebelum dimuat ke BigQuery, nilainya dapat berupa "parquet", "orc" atau "avro". Untuk menggunakan format Avro, paket spark-avro harus ditambahkan saat runtime. (Opsional. Defaultnya adalah parquet ). Hanya pada tulisan. Hanya didukung untuk metode penulisan `INDIRECT`. | Menulis |
useAvroLogicalTypes | Saat memuat dari Avro (`.option("intermediateFormat", "avro")`), BigQuery menggunakan jenis Avro yang mendasarinya, bukan jenis logis [secara default](https://cloud.google.com/bigquery/docs/ memuat-data-penyimpanan-cloud-avro#logical_types). Menyediakan opsi ini akan mengonversi tipe logika Avro ke tipe data BigQuery yang sesuai. (Opsional. Defaultnya adalah false ). Hanya pada tulisan. | Menulis |
datePartition | Tanggal partisi tempat data akan ditulis. Harus berupa string tanggal yang diberikan dalam format YYYYMMDD . Dapat digunakan untuk menimpa data satu partisi, seperti ini:
(Opsional). Hanya pada tulisan. Dapat juga digunakan dengan tipe partisi yang berbeda seperti: JAM: YYYYMMDDHH BULAN: YYYYMM TAHUN: YYYY Tidak didukung oleh metode penulisan `DIRECT`. | Menulis |
partitionField | Jika bidang ini ditentukan, tabel akan dipartisi berdasarkan bidang ini. Untuk Waktu partisi, tentukan bersama dengan opsi `partitionType`. Untuk partisi rentang bilangan bulat, tentukan bersama dengan 3 opsi: `partitionRangeStart`, `partitionRangeEnd, `partitionRangeInterval`. Bidang tersebut harus berupa bidang TIMESTAMP atau DATE tingkat atas untuk partisi Waktu, atau INT64 untuk partisi rentang Integer. Modusnya harus NULLABLE atau REQUIRED . Jika opsi ini tidak disetel untuk tabel yang dipartisi Waktu, maka tabel tersebut akan dipartisi menurut kolom semu, yang direferensikan melalui '_PARTITIONTIME' as TIMESTAMP , atau '_PARTITIONDATE' as DATE .(Opsional). Tidak didukung oleh metode penulisan `DIRECT`. | Menulis |
partitionExpirationMs | Jumlah milidetik untuk menyimpan penyimpanan partisi dalam tabel. Penyimpanan dalam sebuah partisi akan memiliki waktu kedaluwarsa dari waktu partisinya ditambah nilai ini. (Opsional). Tidak didukung oleh metode penulisan `DIRECT`. | Menulis |
partitionType | Digunakan untuk menentukan partisi Waktu. Jenis yang didukung adalah: HOUR, DAY, MONTH, YEAR Opsi ini wajib untuk tabel target yang akan dipartisi Waktu. (Opsional. Defaultnya adalah DAY jika PartitionField ditentukan). Tidak didukung oleh metode penulisan `DIRECT`. | Menulis |
partitionRangeStart , partitionRangeEnd , partitionRangeInterval | Digunakan untuk menentukan partisi rentang bilangan bulat. Opsi ini wajib untuk tabel target yang dipartisi dalam rentang bilangan bulat. Ketiga opsi harus ditentukan. Tidak didukung oleh metode penulisan `DIRECT`. | Menulis |
clusteredFields | Serangkaian kolom tingkat atas yang tidak berulang dan dipisahkan dengan koma. (Opsional). | Menulis |
allowFieldAddition | Menambahkan ALLOW_FIELD_ADDITION SchemaUpdateOption ke BigQuery LoadJob. Nilai yang diperbolehkan adalah true dan false .(Opsional. Defaultnya adalah false ).Hanya didukung oleh metode penulisan `INDIRECT`. | Menulis |
allowFieldRelaxation | Menambahkan ALLOW_FIELD_RELAXATION SchemaUpdateOption ke BigQuery LoadJob. Nilai yang diperbolehkan adalah true dan false .(Opsional. Defaultnya adalah false ).Hanya didukung oleh metode penulisan `INDIRECT`. | Menulis |
proxyAddress | Alamat server proksi. Proksi harus berupa proksi HTTP dan alamatnya harus dalam format `host:port`. Sebagai alternatif, dapat diatur dalam konfigurasi Spark ( spark.conf.set(...) ) atau dalam Konfigurasi Hadoop ( fs.gs.proxy.address ).(Opsional. Diperlukan hanya jika terhubung ke GCP melalui proxy.) | Baca/Tulis |
proxyUsername | Nama pengguna yang digunakan untuk terhubung ke proxy. Sebagai alternatif, dapat diatur dalam konfigurasi Spark ( spark.conf.set(...) ) atau dalam Konfigurasi Hadoop ( fs.gs.proxy.username ).(Opsional. Diperlukan hanya jika terhubung ke GCP melalui proxy dengan autentikasi.) | Baca/Tulis |
proxyPassword | Kata sandi yang digunakan untuk terhubung ke proxy. Sebagai alternatif, dapat diatur dalam konfigurasi Spark ( spark.conf.set(...) ) atau dalam Konfigurasi Hadoop ( fs.gs.proxy.password ).(Opsional. Diperlukan hanya jika terhubung ke GCP melalui proxy dengan autentikasi.) | Baca/Tulis |
httpMaxRetry | Jumlah maksimum percobaan ulang untuk permintaan HTTP tingkat rendah ke BigQuery. Sebagai alternatif, dapat diatur dalam konfigurasi Spark ( spark.conf.set("httpMaxRetry", ...) ) atau dalam Konfigurasi Hadoop ( fs.gs.http.max.retry ).(Opsional. Defaultnya adalah 10) | Baca/Tulis |
httpConnectTimeout | Batas waktu dalam milidetik untuk membuat koneksi dengan BigQuery. Sebagai alternatif, dapat diatur dalam konfigurasi Spark ( spark.conf.set("httpConnectTimeout", ...) ) atau dalam Konfigurasi Hadoop ( fs.gs.http.connect-timeout ).(Opsional. Defaultnya adalah 60000 ms. 0 untuk batas waktu tak terbatas, angka negatif untuk 20000) | Baca/Tulis |
httpReadTimeout | Batas waktu dalam milidetik untuk membaca data dari koneksi yang dibuat. Sebagai alternatif, dapat diatur dalam konfigurasi Spark ( spark.conf.set("httpReadTimeout", ...) ) atau dalam Konfigurasi Hadoop ( fs.gs.http.read-timeout ).(Opsional. Defaultnya adalah 60000 ms. 0 untuk batas waktu tak terbatas, angka negatif untuk 20000) | Membaca |
arrowCompressionCodec | Codec kompresi saat membaca dari tabel BigQuery saat menggunakan format Panah. Opsi : ZSTD (Zstandard compression) , LZ4_FRAME (https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md) , COMPRESSION_UNSPECIFIED . Codec kompresi yang disarankan adalah ZSTD saat menggunakan Java.(Opsional. Defaultnya adalah COMPRESSION_UNSPECIFIED yang berarti tidak ada kompresi yang akan digunakan) | Membaca |
responseCompressionCodec | Codec kompresi digunakan untuk mengompresi data ReadRowsResponse. Pilihan: RESPONSE_COMPRESSION_CODEC_UNSPECIFIED , RESPONSE_COMPRESSION_CODEC_LZ4 (Opsional. Defaultnya adalah RESPONSE_COMPRESSION_CODEC_UNSPECIFIED yang berarti tidak ada kompresi yang akan digunakan) | Membaca |
cacheExpirationTimeInMinutes | Waktu kedaluwarsa cache dalam memori yang menyimpan informasi kueri. Untuk menonaktifkan caching, setel nilainya ke 0. (Opsional. Defaultnya adalah 15 menit) | Membaca |
enableModeCheckForSchemaFields | Memeriksa mode setiap bidang dalam skema tujuan agar sama dengan mode dalam skema bidang sumber terkait, selama penulisan LANGSUNG. Nilai defaultnya adalah true yaitu, pemeriksaan dilakukan secara default. Jika disetel ke false, pemeriksaan mode diabaikan. | Menulis |
enableListInference | Menunjukkan apakah akan menggunakan inferensi skema secara spesifik ketika modenya adalah Parket (https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions). Defaultnya salah. | Menulis |
bqChannelPoolSize | Ukuran (tetap) kumpulan saluran gRPC yang dibuat oleh BigQueryReadClient. Untuk performa optimal, ini harus diatur ke setidaknya jumlah inti pada pelaksana cluster. | Membaca |
createReadSessionTimeoutInSeconds | Batas waktu dalam hitungan detik untuk membuat ReadSession saat membaca tabel. Untuk tabel yang sangat besar, nilai ini harus ditingkatkan. (Opsional. Defaultnya adalah 600 detik) | Membaca |
queryJobPriority | Tingkat prioritas ditetapkan untuk pekerjaan saat membaca data dari kueri BigQuery. Nilai yang diizinkan adalah:
(Opsional. Defaultnya adalah INTERACTIVE ) | Baca/Tulis |
destinationTableKmsKeyName | Menjelaskan kunci enkripsi Cloud KMS yang akan digunakan untuk melindungi tabel BigQuery tujuan. Akun Layanan BigQuery yang terkait dengan proyek Anda memerlukan akses ke kunci enkripsi ini. untuk informasi lebih lanjut tentang penggunaan CMEK dengan BigQuery, lihat [di sini](https://cloud.google.com/bigquery/docs/customer-managed-encryption#key_resource_id). Perhatian: Tabel akan dienkripsi dengan kunci hanya jika dibuat oleh konektor. Tabel tidak terenkripsi yang sudah ada sebelumnya tidak akan dienkripsi hanya dengan mengatur opsi ini. (Opsional) | Menulis |
allowMapTypeConversion | Konfigurasi Boolean untuk menonaktifkan konversi dari data BigQuery ke Spark MapType ketika data memiliki dua subkolom dengan nama kolom sebagai key dan value . Nilai defaultnya adalah true yang memungkinkan konversi.(Opsional) | Membaca |
spark.sql.sources.partitionOverwriteMode | Konfigurasi untuk menentukan mode timpa saat menulis ketika tabel dipartisi rentang/waktu. Saat ini didukung dua mode: STATIC dan DYNAMIC . Dalam mode STATIC , seluruh tabel ditimpa. Dalam mode DYNAMIC , data ditimpa oleh partisi tabel yang ada. Nilai defaultnya adalah STATIC .(Opsional) | Menulis |
enableReadSessionCaching | Konfigurasi Boolean untuk menonaktifkan cache sesi baca. Menyimpan cache sesi baca BigQuery untuk memungkinkan perencanaan kueri Spark yang lebih cepat. Nilai defaultnya adalah true .(Opsional) | Membaca |
readSessionCacheDurationMins | Konfigurasi untuk mengatur durasi cache sesi baca dalam hitungan menit. Hanya berfungsi jika enableReadSessionCaching true (default). Memungkinkan menentukan durasi untuk menyimpan sesi baca dalam cache. Nilai maksimum yang diperbolehkan adalah 300 . Nilai defaultnya adalah 5 .(Opsional) | Membaca |
bigQueryJobTimeoutInMinutes | Konfigurasi untuk menyetel batas waktu tugas BigQuery dalam hitungan menit. Nilai defaultnya adalah 360 menit.(Opsional) | Baca/Tulis |
snapshotTimeMillis | Stempel waktu yang ditentukan dalam milidetik yang digunakan untuk membaca snapshot tabel. Secara default, ini tidak disetel dan versi tabel terbaru dibaca. (Opsional) | Membaca |
bigNumericDefaultPrecision | Presisi default alternatif untuk kolom BigNumeric, karena default BigQuery terlalu lebar untuk Spark. Nilai dapat berkisar antara 1 dan 38. Nilai default ini hanya digunakan bila kolom memiliki tipe BigNumeric yang tidak memiliki parameter. Harap dicatat bahwa mungkin ada kehilangan data jika presisi data sebenarnya lebih dari yang ditentukan. (Opsional) | Baca/Tulis |
bigNumericDefaultScale | Skala default alternatif untuk bidang BigNumeric. Nilainya bisa antara 0 dan 38, dan kurang dari bigNumericFieldsPrecision. Default ini hanya digunakan ketika kolom memiliki tipe BigNumeric yang tidak memiliki parameter. Harap dicatat bahwa mungkin ada kehilangan data jika skala data sebenarnya lebih besar dari yang ditentukan. (Opsional) | Baca/Tulis |
Opsi juga dapat diatur di luar kode, menggunakan parameter --conf
dari spark-submit
atau parameter --properties
dari gcloud dataproc submit spark
. Untuk menggunakan ini, tambahkan awalan spark.datasource.bigquery.
ke salah satu opsi, misalnya spark.conf.set("temporaryGcsBucket", "some-bucket")
juga dapat ditetapkan sebagai --conf spark.datasource.bigquery.temporaryGcsBucket=some-bucket
.
Dengan pengecualian DATETIME
dan TIME
semua tipe data BigQuery mengarahkan peta ke tipe data Spark SQL yang sesuai. Berikut semua pemetaannya:
Tipe Data SQL Standar BigQuery | Percikan SQL Tipe Data | Catatan |
BOOL | BooleanType | |
INT64 | LongType | |
FLOAT64 | DoubleType | |
NUMERIC | DecimalType | Silakan merujuk ke dukungan Numerik dan BigNumerik |
BIGNUMERIC | DecimalType | Silakan merujuk ke dukungan Numerik dan BigNumerik |
STRING | StringType | |
BYTES | BinaryType | |
STRUCT | StructType | |
ARRAY | ArrayType | |
TIMESTAMP | TimestampType | |
DATE | DateType | |
DATETIME | StringType , TimestampNTZType * | Spark tidak memiliki tipe DATETIME. String percikan dapat ditulis ke kolom BQ DATETIME yang sudah ada asalkan dalam format untuk literal BQ DATETIME. * Untuk Spark 3.4+, BQ DATETIME dibaca sebagai tipe TimestampNTZ Spark yaitu java LocalDateTime |
TIME | LongType , StringType * | Spark tidak memiliki tipe TIME. Long yang dihasilkan, yang menunjukkan mikrodetik sejak tengah malam, dapat dengan aman dimasukkan ke TimestampType, namun hal ini menyebabkan tanggal disimpulkan sebagai hari ini. Dengan demikian waktu tersisa selama dan pengguna dapat melakukan cast jika mereka mau. Saat mentransmisi ke Stempel Waktu, TIME memiliki masalah TimeZone yang sama dengan DATETIME * String percikan dapat ditulis ke kolom BQ TIME yang sudah ada asalkan dalam format untuk literal BQ TIME. |
JSON | StringType | Spark tidak memiliki tipe JSON. Nilainya dibaca sebagai String. Untuk menulis kembali JSON ke BigQuery, kondisi berikut DIPERLUKAN :
|
ARRAY<STRUCT<key,value>> | MapType | BigQuery tidak memiliki jenis MAP, oleh karena itu mirip dengan konversi lain seperti pekerjaan Apache Avro dan BigQuery Load, konektor ini mengonversi Spark Map menjadi STRUCT<key,value> BERULANG. Artinya, meskipun penulisan dan pembacaan peta tersedia, menjalankan SQL di BigQuery yang menggunakan semantik peta tidak didukung. Untuk merujuk pada nilai peta menggunakan BigQuery SQL, silakan periksa dokumentasi BigQuery. Karena ketidakcocokan ini, beberapa batasan berlaku:
|
Spark ML Vector dan Matrix didukung, termasuk versi padat dan jarangnya. Data disimpan sebagai BigQuery RECORD. Perhatikan bahwa akhiran ditambahkan ke deskripsi bidang yang mencakup jenis percikan bidang tersebut.
Untuk menulis tipe tersebut ke BigQuery, gunakan format perantara ORC atau Avro, dan jadikan tipe tersebut sebagai kolom Baris (yaitu, bukan kolom dalam struct).
BigNumeric BigQuery memiliki presisi 76,76 (digit ke-77 adalah parsial) dan skala 38. Karena presisi dan skala ini berada di luar dukungan DecimalType (skala 38 dan presisi 38) spark, artinya kolom BigNumeric dengan presisi lebih besar dari 38 tidak dapat digunakan . Setelah batasan Spark ini diperbarui, konektor akan diperbarui sesuai dengan itu.
Konversi Spark Decimal/BigQuery Numeric mencoba mempertahankan parameterisasi tipenya, yaitu NUMERIC(10,2)
akan dikonversi ke Decimal(10,2)
dan sebaliknya. Namun perhatikan bahwa ada kasus di mana parameternya hilang. Ini berarti parameter akan dikembalikan ke default - NUMERIK (38,9) dan BIGNUMERIC(76,38). Artinya, saat ini, pembacaan BigNumeric hanya didukung dari tabel standar, namun tidak dari tampilan BigQuery atau saat membaca data dari kueri BigQuery.
Konektor secara otomatis menghitung kolom dan filter pushdown pernyataan SELECT
DataFrame misalnya
spark.read.bigquery("bigquery-public-data:samples.shakespeare")
.select("word")
.where("word = 'Hamlet' or word = 'Claudius'")
.collect()
filter ke kolom word
dan tekan filter predikat word = 'hamlet' or word = 'Claudius'
.
Jika Anda tidak ingin membuat beberapa permintaan baca ke BigQuery, Anda dapat menyimpan DataFrame dalam cache sebelum memfilter, misalnya:
val cachedDF = spark.read.bigquery("bigquery-public-data:samples.shakespeare").cache()
val rows = cachedDF.select("word")
.where("word = 'Hamlet'")
.collect()
// All of the table was cached and this doesn't require an API call
val otherRows = cachedDF.select("word_count")
.where("word = 'Romeo'")
.collect()
Anda juga dapat menentukan opsi filter
secara manual, yang akan mengesampingkan pushdown otomatis dan Spark akan melakukan pemfilteran selanjutnya di klien.
Kolom semu _PARTITIONDATE dan _PARTITIONTIME bukan bagian dari skema tabel. Oleh karena itu, untuk melakukan kueri berdasarkan partisi tabel yang dipartisi, jangan gunakan metode Where() yang ditunjukkan di atas. Sebagai gantinya, tambahkan opsi filter dengan cara berikut:
val df = spark.read.format("bigquery")
.option("filter", "_PARTITIONDATE > '2019-01-01'")
...
.load(TABLE)
Secara default konektor membuat satu partisi per 400MB dalam tabel yang sedang dibaca (sebelum memfilter). Jumlah ini kira-kira harus sesuai dengan jumlah maksimum pembaca yang didukung oleh BigQuery Storage API. Ini dapat dikonfigurasi secara eksplisit dengan properti maxParallelism
. BigQuery mungkin membatasi jumlah partisi berdasarkan batasan server.
Untuk mendukung pelacakan penggunaan sumber daya BigQuery, konektor menawarkan opsi berikut untuk memberi tag pada sumber daya BigQuery:
Konektor dapat meluncurkan tugas pemuatan dan kueri BigQuery. Menambahkan label ke pekerjaan dilakukan dengan cara berikut:
spark.conf.set("bigQueryJobLabel.cost_center", "analytics")
spark.conf.set("bigQueryJobLabel.usage", "nightly_etl")
Ini akan membuat label cost_center
= analytics
dan usage
= nightly_etl
.
Digunakan untuk membubuhi keterangan pada sesi baca dan tulis. ID pelacakan memiliki format Spark:ApplicationName:JobID
. Ini adalah opsi keikutsertaan, dan untuk menggunakannya, pengguna perlu menyetel properti traceApplicationName
. JobID dibuat secara otomatis oleh ID pekerjaan Dataproc, dengan fallback ke ID aplikasi Spark (seperti application_1648082975639_0001
). ID Pekerjaan dapat diganti dengan mengatur opsi traceJobId
. Perhatikan bahwa total panjang ID jejak tidak boleh lebih dari 256 karakter.
Konektor ini dapat digunakan di notebook Jupyter meskipun tidak diinstal di kluster Spark. Itu dapat ditambahkan sebagai toples eksternal menggunakan kode berikut:
ular piton:
from pyspark . sql import SparkSession
spark = SparkSession . builder
. config ( "spark.jars.packages" , "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0" )
. getOrCreate ()
df = spark . read . format ( "bigquery" )
. load ( "dataset.table" )
skala:
val spark = SparkSession .builder
.config( " spark.jars.packages " , " com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0 " )
.getOrCreate()
val df = spark.read.format( " bigquery " )
.load( " dataset.table " )
Jika kluster Spark menggunakan Scala 2.12 (opsional untuk Spark 2.4.x, wajib di 3.0.x), maka paket yang relevan adalah com.google.cloud.spark:spark-bigquery-with-dependencies_ 2.12 :0.41.0. Untuk mengetahui versi Scala yang digunakan, silakan jalankan kode berikut:
ular piton:
spark . sparkContext . _jvm . scala . util . Properties . versionString ()
skala:
scala . util . Properties . versionString
Kecuali Anda ingin menggunakan Scala API spark.read.bigquery("TABLE_ID")
implisit, Anda tidak perlu melakukan kompilasi terhadap konektor.
Untuk menyertakan konektor dalam proyek Anda:
< dependency >
< groupId >com.google.cloud.spark</ groupId >
< artifactId >spark-bigquery-with-dependencies_${scala.version}</ artifactId >
< version >0.41.0</ version >
</ dependency >
libraryDependencies + = " com.google.cloud.spark " %% " spark-bigquery-with-dependencies " % " 0.41.0 "
Spark mengisi banyak metrik yang dapat ditemukan oleh pengguna akhir di halaman riwayat percikan. Namun semua metrik ini terkait dengan percikan yang dikumpulkan secara implisit tanpa perubahan apa pun dari konektornya. Namun ada beberapa metrik yang diisi dari BigQuery dan saat ini terlihat di log aplikasi yang dapat dibaca di log driver/pelaksana.
Dari Spark 3.2 dan seterusnya, Spark telah menyediakan API untuk mengekspos metrik khusus di halaman Spark UI https://spark.apache.org/docs/3.2.0/api/java/org/apache/spark/sql/connector/metric /Custommetric.html
Saat ini, menggunakan API ini, konektor memaparkan metrik BigQuery berikut selama membaca
<tyle> tabel#metricstable td, tabel th {word-break: break-word} </tyle>Nama metrik | Keterangan |
---|---|
bytes read | Jumlah BigQuery Bytes Dibaca |
rows read | Jumlah Baris BigQuery Dibaca |
scan time | Jumlah waktu yang dihabiskan di antara respons baca yang diminta untuk diperoleh di semua pelaksana, dalam milidetik. |
parse time | Jumlah waktu yang dihabiskan untuk mem -parsing baris dibaca di semua pelaksana, dalam milidetik. |
spark time | Jumlah waktu yang dihabiskan dalam Spark untuk memproses pertanyaan (yaitu, selain pemindaian dan penguraian), di semua pelaksana, dalam milidetik. |
CATATAN: Untuk menggunakan metrik di halaman Spark UI, Anda perlu memastikan spark-bigquery-metrics-0.41.0.jar
adalah jalur kelas sebelum memulai server riwayat dan versi konektor adalah spark-3.2
atau lebih tinggi.
Lihat dokumentasi harga BigQuery.
Anda dapat secara manual mengatur jumlah partisi dengan properti maxParallelism
. BigQuery dapat menyediakan lebih sedikit partisi daripada yang Anda minta. Lihat Mengkonfigurasi Partisi.
Anda juga dapat selalu repartisi setelah membaca di Spark.
Jika ada terlalu banyak partisi, Kuota CreateWriteStream atau Throughput dapat terlampaui. Ini terjadi karena sementara data dalam setiap partisi diproses secara serial, partisi independen dapat diproses secara paralel pada node yang berbeda dalam kluster percikan. Secara umum, untuk memastikan throughput berkelanjutan maksimum Anda harus mengajukan permintaan peningkatan kuota. Namun, Anda juga dapat secara manual mengurangi jumlah partisi yang ditulis dengan memanggil coalesce
di DataFrame untuk mengurangi masalah ini.
desiredPartitionCount = 5
dfNew = df.coalesce(desiredPartitionCount)
dfNew.write
Aturan praktis adalah memiliki pegangan partisi tunggal setidaknya 1GB data.
Perhatikan juga bahwa pekerjaan yang berjalan dengan properti writeAtLeastOnce
dihidupkan tidak akan menghadapi kesalahan kuota CreateWriteStream.
Konektor membutuhkan instance GoogleCredentials untuk terhubung ke API BigQuery. Ada beberapa opsi untuk menyediakannya:
GOOGLE_APPLICATION_CREDENTIALS
, seperti yang dijelaskan di sini. // Globally
spark.conf.set("credentialsFile", "</path/to/key/file>")
// Per read/Write
spark.read.format("bigquery").option("credentialsFile", "</path/to/key/file>")
// Globally
spark.conf.set("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
// Per read/Write
spark.read.format("bigquery").option("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
gcpAccessTokenProvider
. AccessTokenProvider
harus diimplementasikan dalam Java atau bahasa JVM lainnya seperti Scala atau Kotlin. Itu harus memiliki konstruktor no-ARG atau konstruktor yang menerima argumen java.util.String
tunggal. Parameter konfigurasi ini dapat disediakan menggunakan opsi gcpAccessTokenProviderConfig
. Jika ini tidak disediakan maka konstruktor no-arg akan dipanggil. Tabung yang berisi implementasi harus ada di classpath cluster. // Globally
spark.conf.set("gcpAccessTokenProvider", "com.example.ExampleAccessTokenProvider")
// Per read/Write
spark.read.format("bigquery").option("gcpAccessTokenProvider", "com.example.ExampleAccessTokenProvider")
Peniruan Akun Layanan dapat dikonfigurasi untuk nama pengguna tertentu dan nama grup, atau untuk semua pengguna secara default menggunakan properti di bawah ini:
gcpImpersonationServiceAccountForUser_<USER_NAME>
(tidak diatur secara default)
Peniruan akun layanan untuk pengguna tertentu.
gcpImpersonationServiceAccountForGroup_<GROUP_NAME>
(tidak diatur secara default)
Peniruan akun layanan untuk grup tertentu.
gcpImpersonationServiceAccount
(tidak ditetapkan secara default)
Peniruan akun layanan default untuk semua pengguna.
Jika salah satu dari properti di atas ditetapkan maka akun layanan yang ditentukan akan disamar dengan menghasilkan kredensial berumur pendek saat mengakses BigQuery.
Jika lebih dari satu properti ditetapkan maka akun layanan yang terkait dengan nama pengguna akan lebih diutamakan daripada akun layanan yang terkait dengan nama grup untuk pengguna dan grup yang cocok, yang pada gilirannya akan diutamakan daripada peniruan akun layanan default.
Untuk aplikasi yang lebih sederhana, di mana penyegaran token akses tidak diperlukan, alternatif lain adalah untuk meneruskan token akses sebagai opsi konfigurasi gcpAccessToken
. Anda bisa mendapatkan Token Access dengan menjalankan gcloud auth application-default print-access-token
.
// Globally
spark.conf.set("gcpAccessToken", "<access-token>")
// Per read/Write
spark.read.format("bigquery").option("gcpAccessToken", "<acccess-token>")
PENTING: CredentialsProvider
dan AccessTokenProvider
perlu diimplementasikan dalam Java atau bahasa JVM lainnya seperti Scala atau Kotlin. Tabung yang berisi implementasi harus ada di classpath cluster.
PEMBERITAHUAN: Hanya satu dari opsi di atas yang harus disediakan.
Untuk terhubung ke proxy maju dan untuk mengautentikasi kredensial pengguna, konfigurasikan opsi berikut.
proxyAddress
: Alamat server proxy. Proxy harus berupa proxy dan alamat HTTP harus di host:port
.
proxyUsername
: Nama pengguna yang digunakan untuk terhubung ke proxy.
proxyPassword
: Kata sandi yang digunakan untuk terhubung ke proxy.
val df = spark.read.format("bigquery")
.option("proxyAddress", "http://my-proxy:1234")
.option("proxyUsername", "my-username")
.option("proxyPassword", "my-password")
.load("some-table")
Parameter proxy yang sama juga dapat diatur secara global menggunakan runtimeconfig Spark seperti ini:
spark.conf.set("proxyAddress", "http://my-proxy:1234")
spark.conf.set("proxyUsername", "my-username")
spark.conf.set("proxyPassword", "my-password")
val df = spark.read.format("bigquery")
.load("some-table")
Anda dapat mengatur yang berikut dalam konfigurasi Hadoop juga.
fs.gs.proxy.address
(mirip dengan "proxyAddress"), fs.gs.proxy.username
(mirip dengan "proxyusername") dan fs.gs.proxy.password
(mirip dengan "proxypassword").
Jika parameter yang sama diatur di beberapa tempat urutan prioritas adalah sebagai berikut:
Opsi ("Key", "Value")> Spark.conf> Hadoop Configuration