此連接器支援將 Google BigQuery 表格讀入 Spark 的 DataFrame,以及將 DataFrame 寫回 BigQuery。這是透過使用 Spark SQL 資料來源 API 與 BigQuery 通訊來完成的。
Storage API 透過 gRPC 直接從 BigQuery 並行傳輸數據,無需使用 Google Cloud Storage 作為中介。
與使用先前的基於導出的讀取流程相比,它具有許多優點,通常會帶來更好的讀取效能:
它不會在 Google Cloud Storage 中留下任何臨時檔案。使用 Arrow 或 Avro 有線格式直接從 BigQuery 伺服器讀取行。
新的 API 允許列和謂詞過濾以僅讀取您感興趣的資料。
由於 BigQuery 由列式數據存儲支持,因此它可以高效地串流數據,而無需讀取所有列。
儲存 API 支援謂詞過濾器的任意下推。連接器版本 0.8.0-beta 及更高版本支援將任意過濾器下推到 Bigquery。
Spark 中有一個已知問題,不允許在嵌套欄位上下推過濾器。例如,像address.city = "Sunnyvale"
這樣的濾鏡將不會下推到 Bigquery。
API 會在讀取器之間重新平衡記錄,直到它們全部完成。這意味著所有 Map 階段將幾乎同時完成。請參閱此部落格文章,以了解如何在 Google Cloud Dataflow 中類似地使用動態分片。
有關更多詳細信息,請參閱配置分區。
請遵循這些說明。
如果您沒有 Apache Spark 環境,您可以使用預先設定的驗證來建立 Cloud Dataproc 叢集。以下範例假設您使用的是 Cloud Dataproc,但您可以在任何叢集上使用spark-submit
。
任何使用 API 的 Dataproc 叢集都需要“bigquery”或“cloud-platform”範圍。 Dataproc 叢集預設具有「bigquery」範圍,因此啟用的專案中的大多數叢集應該預設工作,例如
MY_CLUSTER=...
gcloud dataproc clusters create "$MY_CLUSTER"
連接器的最新版本可透過以下連結公開取得:
版本 | 關聯 |
---|---|
火花3.5 | gs://spark-lib/bigquery/spark-3.5-bigquery-0.41.0.jar (HTTP 連結) |
火花3.4 | gs://spark-lib/bigquery/spark-3.4-bigquery-0.41.0.jar (HTTP 連結) |
火花3.3 | gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar (HTTP 連結) |
火花3.2 | gs://spark-lib/bigquery/spark-3.2-bigquery-0.41.0.jar (HTTP 連結) |
火花3.1 | gs://spark-lib/bigquery/spark-3.1-bigquery-0.41.0.jar (HTTP 連結) |
火花2.4 | gs://spark-lib/bigquery/spark-2.4-bigquery-0.37.0.jar (HTTP 連結) |
斯卡拉2.13 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-0.41.0.jar (HTTP 連結) |
斯卡拉2.12 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.41.0.jar (HTTP 連結) |
斯卡拉2.11 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.29.0.jar (HTTP 連結) |
前六個版本是基於 Java 的連接器,針對基於 Spark 的新資料來源 API(資料來源 API v2)所建構的所有 Scala 版本中的 Spark 2.4/3.1/3.2/3.3/3.4/3.5。
最後兩個連接器是基於 Scala 的連接器,請使用與您的 Spark 安裝相關的 jar,如下所述。
連接器火花 | 2.3 | 2.4 | 3.0 | 3.1 | 3.2 | 3.3 | 3.4 | 3.5 |
---|---|---|---|---|---|---|---|---|
Spark-3.5-bigquery | ✓ | |||||||
Spark-3.4-bigquery | ✓ | ✓ | ||||||
Spark-3.3-bigquery | ✓ | ✓ | ✓ | |||||
Spark-3.2-bigquery | ✓ | ✓ | ✓ | ✓ | ||||
Spark-3.1-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | |||
Spark-2.4-bigquery | ✓ | |||||||
Spark-bigquery-with-dependencies_2.13 | ✓ | ✓ | ✓ | ✓ | ||||
Spark-bigquery-with-dependencies_2.12 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | |
Spark-bigquery-with-dependencies_2.11 | ✓ | ✓ |
連接器 Dataproc 影像 | 1.3 | 1.4 | 1.5 | 2.0 | 2.1 | 2.2 | 無伺服器 圖片1.0 | 無伺服器 影像2.0 | 無伺服器 圖2.1 | 無伺服器 圖2.2 |
---|---|---|---|---|---|---|---|---|---|---|
Spark-3.5-bigquery | ✓ | ✓ | ||||||||
Spark-3.4-bigquery | ✓ | ✓ | ✓ | |||||||
Spark-3.3-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ||||
Spark-3.2-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ||||
Spark-3.1-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | |||
Spark-2.4-bigquery | ✓ | ✓ | ||||||||
Spark-bigquery-with-dependencies_2.13 | ✓ | ✓ | ✓ | |||||||
Spark-bigquery-with-dependencies_2.12 | ✓ | ✓ | ✓ | ✓ | ✓ | |||||
Spark-bigquery-with-dependencies_2.11 | ✓ | ✓ |
此連接器也可以從 Maven 中央儲存庫取得。可以使用--packages
選項或spark.jars.packages
配置屬性來使用它。使用以下值
版本 | 連接器神器 |
---|---|
火花3.5 | com.google.cloud.spark:spark-3.5-bigquery:0.41.0 |
火花3.4 | com.google.cloud.spark:spark-3.4-bigquery:0.41.0 |
火花3.3 | com.google.cloud.spark:spark-3.3-bigquery:0.41.0 |
火花3.2 | com.google.cloud.spark:spark-3.2-bigquery:0.41.0 |
火花3.1 | com.google.cloud.spark:spark-3.1-bigquery:0.41.0 |
火花2.4 | com.google.cloud.spark:spark-2.4-bigquery:0.37.0 |
斯卡拉2.13 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.13:0.41.0 |
斯卡拉2.12 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0 |
斯卡拉2.11 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.29.0 |
使用映像 2.1 及更高版本建立的 Dataproc 叢集或使用 Dataproc 無伺服器服務的批次隨附內建 Spark BigQuery 連接器。在這種情況下,使用標準的--jars
或--packages
(或者, spark.jars
/ spark.jars.packages
配置)將無濟於事,因為內建連接器優先。
若要使用內建版本以外的其他版本,請執行下列其中一項:
--metadata SPARK_BQ_CONNECTOR_VERSION=0.41.0
或--metadata SPARK_BQ_CONNECTOR_URL=gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar
使用不同的 jar 建立群集。 URL 可以指向叢集 Spark 版本的任何有效連接器 JAR。--properties dataproc.sparkBqConnector.version=0.41.0
或--properties dataproc.sparkBqConnector.uri=gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar
使用不同的 jar 建立批次。 URL 可以指向執行時間 Spark 版本的任何有效連接器 JAR。 您可以針對 API 執行簡單的 PySpark 字數統計,無需編譯,只需執行
Dataproc 映像 1.5 及更高版本
gcloud dataproc jobs submit pyspark --cluster "$MY_CLUSTER"
--jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.41.0.jar
examples/python/shakespeare.py
Dataproc 映像 1.4 及更低版本
gcloud dataproc jobs submit pyspark --cluster "$MY_CLUSTER"
--jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.29.0.jar
examples/python/shakespeare.py
https://codelabs.developers.google.com/codelabs/pyspark-bigquery
連接器使用跨語言 Spark SQL 資料來源 API:
df = spark.read
.format("bigquery")
.load("bigquery-public-data.samples.shakespeare")
或僅 Scala 隱式 API:
import com.google.cloud.spark.bigquery._
val df = spark.read.bigquery("bigquery-public-data.samples.shakespeare")
有關更多信息,請參閱 Python、Scala 和 Java 中的其他程式碼範例。
此連接器可讓您在 BigQuery 上執行任何標準 SQL SELECT 查詢並將其結果直接提取到 Spark Dataframe。這很容易完成,如以下程式碼範例中所述:
spark.conf.set("viewsEnabled","true")
spark.conf.set("materializationDataset","<dataset>")
sql = """
SELECT tag, COUNT(*) c
FROM (
SELECT SPLIT(tags, '|') tags
FROM `bigquery-public-data.stackoverflow.posts_questions` a
WHERE EXTRACT(YEAR FROM creation_date)>=2014
), UNNEST(tags) tag
GROUP BY 1
ORDER BY 2 DESC
LIMIT 10
"""
df = spark.read.format("bigquery").load(sql)
df.show()
產生結果
+----------+-------+
| tag| c|
+----------+-------+
|javascript|1643617|
| python|1352904|
| java|1218220|
| android| 913638|
| php| 911806|
| c#| 905331|
| html| 769499|
| jquery| 608071|
| css| 510343|
| c++| 458938|
+----------+-------+
第二個選項是使用以下query
選項:
df = spark.read.format("bigquery").option("query", sql).load()
請注意,執行速度應該更快,因為只有結果會透過線路傳輸。以類似的方式,查詢可以更有效率地包含 JOIN,然後在 Spark 上執行聯接,或使用其他 BigQuery 功能,例如子查詢、BigQuery 使用者定義函數、通配符表、BigQuery ML 等。
為了使用此功能,必須設定以下配置:
viewsEnabled
必須設定為true
。materializationDataset
必須設定為 GCP 使用者俱有建表權限的資料集。 materializationProject
是可選的。注意:如 BigQuery 文件中所述,查詢的表必須與materializationDataset
位於同一位置。另外,如果SQL statement
中的表格來自parentProject
以外的項目,則使用完全限定的表格名稱,即[project].[dataset].[table]
。
重要提示:此功能是透過在 BigQuery 上執行查詢並將結果儲存到臨時表中來實現的,Spark 將從該臨時表中讀取結果。這可能會增加您的 BigQuery 帳戶的額外費用。
此連接器初步支援從 BigQuery 視圖中讀取。請注意,有一些注意事項:
collect()
或count()
操作之前,此程序也會影響讀取效能。materializationProject
和materializationDataset
選項進行配置。這些選項也可以透過在讀取視圖之前呼叫spark.conf.set(...)
來全域設定。.option("viewsEnabled", "true")
)時設定viewsEnabled選項,或透過呼叫spark.conf.set("viewsEnabled", "true")
全域設定它。materializationDataset
應與視圖位於同一位置。可以使用兩種方法將 DataFrame 寫入 BigQuery:直接方法和間接方法。
在此方法中,資料使用 BigQuery Storage Write API 直接寫入 BigQuery。為了啟用此選項,請將writeMethod
選項設為direct
,如下所示:
df.write
.format("bigquery")
.option("writeMethod", "direct")
.save("dataset.table")
連接器和 BigQuery Storage Write API 完全支援在 APPEND 保存模式和 OVERWRITE 模式(僅日期和範圍分區)下寫入現有分區表(日期分區、攝取時間分區和範圍分區)。直接寫入方法目前不支援使用下面描述的datePartition
、 partitionField
、 partitionType
、 partitionRangeStart
、 partitionRangeEnd
、 partitionRangeInterval
。
重要提示:請參閱 BigQuery Storage Write API 定價的資料擷取定價頁面。
重要提示:請使用0.24.2以上版本進行直接寫入,因為先前的版本有bug,在某些情況下可能會導致表格刪除。
在此方法中,資料首先寫入 GCS,然後將其載入到 BigQuery。必須配置 GCS 儲存桶來指示暫存資料位置。
df.write
.format("bigquery")
.option("temporaryGcsBucket","some-bucket")
.save("dataset.table")
資料使用 Apache Parquet、Apache ORC 或 Apache Avro 格式暫時儲存。
GCS 儲存桶和格式也可以使用 Spark 的 RuntimeConfig 進行全域設置,如下所示:
spark.conf.set("temporaryGcsBucket","some-bucket")
df.write
.format("bigquery")
.save("dataset.table")
將 DataFrame 串流傳輸到 BigQuery 時,每個批次的寫入方式與非串流 DataFrame 相同。請注意,必須指定 HDFS 相容的檢查點位置(例如: path/to/HDFS/dir
或gs://checkpoint-bucket/checkpointDir
)。
df.writeStream
.format("bigquery")
.option("temporaryGcsBucket","some-bucket")
.option("checkpointLocation", "some-location")
.option("table", "dataset.table")
重要:連接器不會配置 GCS 連接器,以避免與其他 GCS 連接器(如果存在)發生衝突。為了使用連接器的寫入功能,請依照此處的說明在叢集上設定 GCS 連接器。
API 支援多種選項來配置讀取
<style> table#propertytable td, table th { word-break:break-word } </style>財產 | 意義 | 用法 |
---|---|---|
table | BigQuery 表的格式為[[project:]dataset.]table 。建議使用load() / save() 的path 參數來代替。此選項已被棄用,並將在未來版本中刪除。(已棄用) | 讀/寫 |
dataset | 包含表格的資料集。此選項應與標準表和視圖一起使用,但不應在載入查詢結果時使用。 (可選,除非 table 中省略) | 讀/寫 |
project | 表的 Google Cloud 項目 ID。此選項應與標準表和視圖一起使用,但不應在載入查詢結果時使用。 (可選。預設為正在使用的服務帳戶的項目) | 讀/寫 |
parentProject | 要為匯出計費的表格的 Google Cloud 項目 ID。 (可選。預設為正在使用的服務帳戶的項目) | 讀/寫 |
maxParallelism | 將資料分割成的最大分區數。如果 BigQuery 認為資料夠小,則實際數量可能會更少。如果沒有足夠的執行器來為每個分割區安排一個讀取器,則某些分割區可能是空的。 重要提示:舊參數( parallelism )仍然受支持,但處於棄用模式。它將在連接器的 1.0 版本中刪除。(可選。預設為 PreferredMinParallelism 和 20,000 中較大的一個)。 | 讀 |
preferredMinParallelism | 將資料分割成的首選最小分區數。如果 BigQuery 認為資料夠小,則實際數量可能會更少。如果沒有足夠的執行器來為每個分割區安排一個讀取器,則某些分割區可能是空的。 (可選。預設為應用程式預設並行度和 maxParallelism 的 3 倍中的最小值。) | 讀 |
viewsEnabled | 使連接器能夠從視圖而不僅僅是表中讀取。請在啟動此選項之前閱讀相關部分。 (可選。預設為 false ) | 讀 |
materializationProject | 將在其中建立物化視圖的項目 ID (可選。預設為視圖的項目 ID) | 讀 |
materializationDataset | 將在其中建立物化視圖的資料集。此資料集應與視圖或查詢表位於同一位置。 (可選。預設為視圖的資料集) | 讀 |
materializationExpirationTimeInMinutes | 儲存檢視或查詢的特定化資料的臨時表的過期時間(以分鐘為單位)。請注意,由於使用本機快取以及為了減少 BigQuery 計算,連接器可能會重複使用臨時表,因此非常低的值可能會導致錯誤。該值必須是正整數。 (可選。預設為 1440,即 24 小時) | 讀 |
readDataFormat | 用於從 BigQuery 讀取的資料格式。選項: ARROW 、 AVRO (可選。預設為 ARROW ) | 讀 |
optimizedEmptyProjection | 連接器使用最佳化的空投影(選擇不含任何列的)邏輯,用於count() 執行。此邏輯直接從表元數據獲取數據,或在存在過濾器的情況下執行高效的「SELECT COUNT(*) WHERE...」。您可以將此選項設為false 來取消使用此邏輯。(可選,預設為 true ) | 讀 |
pushAllFilters | 如果設定為true ,連接器會將 Spark 可以委託給 BigQuery Storage API 的所有篩選器推送。這減少了需要從 BigQuery Storage API 伺服器傳送到 Spark 用戶端的資料量。此選項已被棄用,並將在未來版本中刪除。(可選,預設為 true )(已棄用) | 讀 |
bigQueryJobLabel | 可用於向連接器發起的查詢新增標籤並載入 BigQuery 作業。可以設定多個標籤。 (選修的) | 讀 |
bigQueryTableLabel | 可用於在寫入表格時向表格新增標籤。可以設定多個標籤。 (選修的) | 寫 |
traceApplicationName | 用於追蹤 BigQuery 儲存讀取和寫入會話的應用程式名稱。需要設定應用程式名稱才能設定會話上的追蹤 ID。 (選修的) | 讀 |
traceJobId | 用於追蹤 BigQuery 儲存讀取和寫入會話的作業 ID。 (可選,預設 Dataproc 作業 ID 存在,否則使用 Spark 應用程式 ID) | 讀 |
createDisposition | 指定是否允許作業建立新表。允許的值為:
(可選。預設為 CREATE_IF_NEEDED)。 | 寫 |
writeMethod | 控制將資料寫入 BigQuery 的方法。可用值包括direct 使用 BigQuery Storage Write API 和indirect ,即首先將資料寫入 GCS,然後觸發 BigQuery 載入操作。在這裡查看更多內容(可選,預設為 indirect ) | 寫 |
writeAtLeastOnce | 確保資料至少寫入 BigQuery 一次。這是比恰好一次的保證要少的保證。適用於小批量連續寫入資料的串流場景。 (可選。預設為 false )僅支援“DIRECT”寫入方法,且模式不支援“Overwrite”。 | 寫 |
temporaryGcsBucket | 在將資料載入到 BigQuery 之前暫時保存資料的 GCS 儲存桶。除非在 Spark 配置 ( spark.conf.set(...) ) 中設置,否則是必要的。“DIRECT”寫入方法不支援。 | 寫 |
persistentGcsBucket | 在將資料載入到 BigQuery 之前保存資料的 GCS 儲存桶。如果收到通知,則將資料寫入 BigQuery 後不會刪除資料。 “DIRECT”寫入方法不支援。 | 寫 |
persistentGcsPath | 在載入到 BigQuery 之前保存資料的 GCS 路徑。僅與persistentGcsBucket 一起使用。“DIRECT”寫入方法不支援。 | 寫 |
intermediateFormat | 資料載入到 BigQuery 之前的格式,值可以是「parquet」、「orc」或「avro」。為了使用Avro格式,必須在運行時添加spark-avro包。 (可選。預設為 parquet )。只寫。僅支援“間接”寫入方法。 | 寫 |
useAvroLogicalTypes | 從 Avro 載入時(`.option("intermediateFormat", "avro")`),BigQuery 使用底層 Avro 類型而不是邏輯類型[預設](https://cloud.google.com/bigquery/docs/載入資料雲存儲-avro#邏輯類型)。提供此選項會將 Avro 邏輯類型轉換為其對應的 BigQuery 資料類型。 (可選。預設為 false )。只寫。 | 寫 |
datePartition | 資料將寫入的日期分區。應該是以YYYYMMDD 格式給出的日期字串。可用於覆蓋單一分區的數據,如下所示:
(選修的)。只寫。 也可以與不同的分區類型一起使用,例如: 時間: YYYYMMDDHH 月份: YYYYMM 年份: YYYY “DIRECT”寫入方法不支援。 | 寫 |
partitionField | 如果指定了該字段,則表格將按該字段進行分區。 對於時間分區,請與選項「partitionType」一起指定。 對於整數範圍分區,請指定 3 個選項:「partitionRangeStart」、「partitionRangeEnd」、「partitionRangeInterval」。 對於時間分區,該欄位必須是頂級 TIMESTAMP 或 DATE 欄位;對於整數範圍分區,該欄位必須是 INT64。它的模式必須是NULLABLE或REQUIRED 。如果沒有為時間分區表設定該選項,則該表將按偽列分區,透過 '_PARTITIONTIME' as TIMESTAMP 類型或'_PARTITIONDATE' as DATE 類型參考。(選修的)。 “DIRECT”寫入方法不支援。 | 寫 |
partitionExpirationMs | 表中分區的儲存保留的毫秒數。分區中的儲存將具有其分區時間加上該值的過期時間。 (選修的)。 “DIRECT”寫入方法不支援。 | 寫 |
partitionType | 用於指定時間分區。 支援的類型有: HOUR, DAY, MONTH, YEAR 對於要進行時間分區的目標表,此選項是必要的。 (可選。如果指定了 PartitionField,則預設為 DAY)。 “DIRECT”寫入方法不支援。 | 寫 |
partitionRangeStart 、 partitionRangeEnd 、 partitionRangeInterval | 用於指定整數範圍分割區。 對於要進行整數範圍分割的目標表來說,這些選項是必要的。 必須指定所有 3 個選項。 “DIRECT”寫入方法不支援。 | 寫 |
clusteredFields | 一串不重複的頂級列,以逗號分隔。 (選修的)。 | 寫 |
allowFieldAddition | 將 ALLOW_FIELD_ADDITION SchemaUpdateOption 新增至 BigQuery LoadJob。允許的值為true 和false 。(可選。預設為 false )。僅由“INDIRECT”寫入方法支援。 | 寫 |
allowFieldRelaxation | 將 ALLOW_FIELD_RELAXATION SchemaUpdateOption 新增至 BigQuery LoadJob。允許的值為true 和false 。(可選。預設為 false )。僅由“INDIRECT”寫入方法支援。 | 寫 |
proxyAddress | 代理伺服器的位址。代理必須是 HTTP 代理,位址應採用「主機:連接埠」格式。也可以在 Spark 配置 ( spark.conf.set(...) ) 或 Hadoop 配置 ( fs.gs.proxy.address ) 中設定。(可選。僅當透過代理連接到 GCP 時才需要。) | 讀/寫 |
proxyUsername | 用於連接到代理的用戶名。也可以在 Spark 配置 ( spark.conf.set(...) ) 或 Hadoop 配置 ( fs.gs.proxy.username ) 中設定。(可選。僅當透過具有身份驗證的代理連接到 GCP 時才需要。) | 讀/寫 |
proxyPassword | 用於連接到代理的密碼。也可以在 Spark 配置 ( spark.conf.set(...) ) 或 Hadoop 配置 ( fs.gs.proxy.password ) 中設定。(可選。僅當透過具有身份驗證的代理連接到 GCP 時才需要。) | 讀/寫 |
httpMaxRetry | 對 BigQuery 的低階 HTTP 請求的最大重試次數。也可以在 Spark 配置 ( spark.conf.set("httpMaxRetry", ...) ) 或 Hadoop 配置 ( fs.gs.http.max.retry ) 中設定。(可選。預設值為 10) | 讀/寫 |
httpConnectTimeout | 與 BigQuery 建立連線的逾時時間(以毫秒為單位)。也可以在 Spark 配置 ( spark.conf.set("httpConnectTimeout", ...) ) 或 Hadoop 配置 ( fs.gs.http.connect-timeout ) 中設定。(可選。預設為 60000 毫秒。0 表示無限超時,負數表示 20000) | 讀/寫 |
httpReadTimeout | 從已建立的連線讀取資料的逾時(以毫秒為單位)。也可以在 Spark 配置 ( spark.conf.set("httpReadTimeout", ...) ) 或 Hadoop 配置 ( fs.gs.http.read-timeout ) 中設定。(可選。預設為 60000 毫秒。0 表示無限超時,負數表示 20000) | 讀 |
arrowCompressionCodec | 使用 Arrow 格式從 BigQuery 表讀取時的壓縮編解碼器。選項: ZSTD (Zstandard compression) , LZ4_FRAME (https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md) , COMPRESSION_UNSPECIFIED 。使用 Java 時建議的壓縮編解碼器是ZSTD 。(可選。預設為 COMPRESSION_UNSPECIFIED ,這表示不會使用壓縮) | 讀 |
responseCompressionCodec | 壓縮編解碼器用於壓縮 ReadRowsResponse 資料。選項: RESPONSE_COMPRESSION_CODEC_UNSPECIFIED 、 RESPONSE_COMPRESSION_CODEC_LZ4 (可選。預設為 RESPONSE_COMPRESSION_CODEC_UNSPECIFIED ,這表示不會使用壓縮) | 讀 |
cacheExpirationTimeInMinutes | 儲存查詢資訊的記憶體快取的過期時間。 若要停用緩存,請將值設為 0。 (可選。預設為 15 分鐘) | 讀 |
enableModeCheckForSchemaFields | 在直接寫入期間,檢查目標模式中每個欄位的模式是否等於對應來源欄位模式中的模式。 預設值為 true,即預設進行檢查。如果設定為 false,則忽略模式檢查。 | 寫 |
enableListInference | 指示當模式為 Parquet 時是否專門使用架構推論 (https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions)。 預設為 false。 | 寫 |
bqChannelPoolSize | BigQueryReadClient 所建立的 gRPC 通道池的(固定)大小。 為了獲得最佳效能,應至少將其設定為叢集執行器上的核心數量。 | 讀 |
createReadSessionTimeoutInSeconds | 讀取表格時建立 ReadSession 的逾時時間(以秒為單位)。 對於非常大的表,應該增加該值。 (可選。預設為 600 秒) | 讀 |
queryJobPriority | 從 BigQuery 查詢讀取資料時為作業設定的優先權。允許的值為:
(可選。預設為 INTERACTIVE ) | 讀/寫 |
destinationTableKmsKeyName | 描述將用於保護目標 BigQuery 表的 Cloud KMS 加密金鑰。與您的專案關聯的 BigQuery 服務帳號需要存取此加密金鑰。有關將 CMEK 與 BigQuery 結合使用的更多信息,請參閱[此處](https://cloud.google.com/bigquery/docs/customer-management-encryption#key_resource_id)。 注意:只有連接器建立的表才會被金鑰加密。僅透過設定此選項不會對預先存在的未加密表進行加密。 (選修的) | 寫 |
allowMapTypeConversion | 當記錄有兩個欄位名稱分別為key 和value 子欄位時,布林配置用於停用從 BigQuery 記錄到 Spark MapType 的轉換。預設值為true ,允許轉換。(選修的) | 讀 |
spark.sql.sources.partitionOverwriteMode | 配置為在表進行範圍/時間分區時指定寫入時的覆蓋模式。目前支援兩種模式: STATIC 和DYNAMIC 。在STATIC 模式下,整個表都會被覆蓋。在DYNAMIC 模式下,資料將被現有表的分區覆寫。預設值為STATIC 。(選修的) | 寫 |
enableReadSessionCaching | 用於停用讀取會話快取的布林配置。快取 BigQuery 讀取會話以實現更快的 Spark 查詢規劃。預設值為true 。(選修的) | 讀 |
readSessionCacheDurationMins | 配置以分鐘為單位設定讀取會話快取持續時間。僅當enableReadSessionCaching 為true (預設)時才有效。允許指定快取讀取會話的持續時間。最大允許值為300 。預設值為5 。(選修的) | 讀 |
bigQueryJobTimeoutInMinutes | 配置以設定 BigQuery 作業逾時(以分鐘為單位)。預設值為360 分鐘。(選修的) | 讀/寫 |
snapshotTimeMillis | 以毫秒為單位指定的時間戳,用於讀取表快照。預設情況下,未設定此項,並且會讀取表格的最新版本。 (選修的) | 讀 |
bigNumericDefaultPrecision | BigNumeric 欄位的替代預設精確度,因為 BigQuery 預設值對於 Spark 來說太寬。值可以介於 1 和 38 之間。請注意,如果實際資料的精度超過指定的精度,可能會導致資料遺失。 (選修的) | 讀/寫 |
bigNumericDefaultScale | BigNumeric 欄位的替代預設比例。值可以介於 0 到 38 之間,且小於 bigNumericFieldsPrecision。僅當欄位具有未參數化的 BigNumeric 類型時,才使用此預設值。請注意,如果實際資料規模大於指定規模,可能會導致資料遺失。 (選修的) | 讀/寫 |
也可以使用spark-submit
的--conf
參數或gcloud dataproc submit spark
的--properties
參數在程式碼外部設定選項。為了使用它,請在前面加上前綴spark.datasource.bigquery.
對於任何選項,例如, spark.conf.set("temporaryGcsBucket", "some-bucket")
也可以設定為--conf spark.datasource.bigquery.temporaryGcsBucket=some-bucket
。
除DATETIME
和TIME
之外,所有 BigQuery 資料類型都定向映射到對應的 Spark SQL 資料類型。以下是所有映射:
BigQuery 標準 SQL 資料類型 | 星火SQL 資料類型 | 筆記 |
BOOL | BooleanType | |
INT64 | LongType | |
FLOAT64 | DoubleType | |
NUMERIC | DecimalType | 請參閱數字和 BigNumeric 支持 |
BIGNUMERIC | DecimalType | 請參閱數字和 BigNumeric 支持 |
STRING | StringType | |
BYTES | BinaryType | |
STRUCT | StructType | |
ARRAY | ArrayType | |
TIMESTAMP | TimestampType | |
DATE | DateType | |
DATETIME | StringType , TimestampNTZType * | Spark 沒有 DATETIME 類型。 Spark 字串可以寫入現有的 BQ DATETIME 資料列,前提是它採用 BQ DATETIME 文字的格式。 * 對於 Spark 3.4+,BQ DATETIME 被讀取為 Spark 的 TimestampNTZ 類型,即 java LocalDateTime |
TIME | LongType 、 StringType * | Spark 沒有 TIME 類型。產生的 long 表示自午夜以來的微秒數,可以安全地轉換為 TimestampType,但這會導致日期被推斷為當天。因此,時間可以保留為長,使用者可以根據需要進行投射。 當轉換為時間戳記 TIME 時,與 DATETIME 具有相同的時區問題 * Spark 字串可以寫入現有的 BQ TIME 列,前提是它採用 BQ TIME 文字的格式。 |
JSON | StringType | Spark 沒有 JSON 類型。這些值被讀取為字串。要將 JSON 寫回 BigQuery,需要滿足以下條件:
|
ARRAY<STRUCT<key,value>> | MapType | BigQuery 沒有 MAP 類型,因此與 Apache Avro 和 BigQuery Load 作業等其他轉換類似,連接器將 Spark Map 轉換為 REPEATED STRUCT<key,value>。這意味著雖然可以寫入和讀取地圖,但不支援在 BigQuery 上運行使用地圖語義的 SQL。若要使用 BigQuery SQL 引用地圖的值,請查看 BigQuery 文件。由於這些不相容性,存在一些限制:
|
支援 Spark ML Vector 和 Matrix,包括它們的密集和稀疏版本。資料保存為 BigQuery RECORD。請注意,在字段的描述中添加了一個後綴,其中包括字段的 Spark 類型。
為了將這些類型寫入 BigQuery,請使用 ORC 或 Avro 中間格式,並將它們作為 Row 的欄位(即不是結構中的欄位)。
BigQuery的BigNumeric的精度為76.76(第77位為部分),小數位數為38。大於38的BigNumeric字段。一旦 Spark 限制被更新,連接器也會隨之更新。
Spark Decimal/BigQuery Numeric 轉換嘗試保留類型的參數化,即NUMERIC(10,2)
將轉換為Decimal(10,2)
,反之亦然。但請注意,有時參數會遺失。這意味著參數將恢復為預設值 - NUMERIC (38,9) 和 BIGNUMERIC(76,38)。這意味著目前僅支援從標準表讀取 BigNumeric,而不支援從 BigQuery 視圖或從 BigQuery 查詢讀取資料時讀取。
連接器自動計算列並下推過濾 DataFrame 的SELECT
語句,例如
spark.read.bigquery("bigquery-public-data:samples.shakespeare")
.select("word")
.where("word = 'Hamlet' or word = 'Claudius'")
.collect()
過濾到列word
並下推謂詞過濾器word = 'hamlet' or word = 'Claudius'
。
如果您不希望向 BigQuery 發出多個讀取請求,您可以在過濾之前快取 DataFrame,例如:
val cachedDF = spark.read.bigquery("bigquery-public-data:samples.shakespeare").cache()
val rows = cachedDF.select("word")
.where("word = 'Hamlet'")
.collect()
// All of the table was cached and this doesn't require an API call
val otherRows = cachedDF.select("word_count")
.where("word = 'Romeo'")
.collect()
您也可以手動指定filter
選項,這將覆蓋自動下推,Spark 將在客戶端中完成其餘的過濾。
偽列 _PARTITIONDATE 和 _PARTITIONTIME 不是表架構的一部份。因此,為了按分區表的分區進行查詢,請勿使用上面所示的 where() 方法。相反,請按以下方式新增篩選器選項:
val df = spark.read.format("bigquery")
.option("filter", "_PARTITIONDATE > '2019-01-01'")
...
.load(TABLE)
預設情況下,連接器在正在讀取的表中每 400MB 建立一個分區(過濾之前)。這應大致對應於 BigQuery Storage API 支援的最大讀取器數量。這可以使用maxParallelism
屬性明確配置。 BigQuery 可能會根據伺服器限制限制分區數量。
為了支援追蹤 BigQuery 資源的使用情況,連接器提供以下選項來標記 BigQuery 資源:
連接器可以啟動 BigQuery 載入和查詢作業。在作業中新增標籤的方式如下:
spark.conf.set("bigQueryJobLabel.cost_center", "analytics")
spark.conf.set("bigQueryJobLabel.usage", "nightly_etl")
這將建立標籤cost_center
= analytics
和usage
= nightly_etl
。
用於註釋讀取和寫入會話。追蹤 ID 的格式為Spark:ApplicationName:JobID
。這是一個選擇加入選項,要使用它,使用者需要設定traceApplicationName
屬性。 JobID 由 Dataproc 作業 ID 自動生成,並回退到 Spark 應用程式 ID(例如application_1648082975639_0001
)。可以透過設定traceJobId
選項來覆寫作業ID。請注意,追蹤 ID 的總長度不能超過 256 個字元。
即使未安裝在 Spark 叢集上,該連接器也可以在 Jupyter 筆記本中使用。可以使用以下程式碼將其新增為外部 jar:
Python:
from pyspark . sql import SparkSession
spark = SparkSession . builder
. config ( "spark.jars.packages" , "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0" )
. getOrCreate ()
df = spark . read . format ( "bigquery" )
. load ( "dataset.table" )
斯卡拉:
val spark = SparkSession .builder
.config( " spark.jars.packages " , " com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0 " )
.getOrCreate()
val df = spark.read.format( " bigquery " )
.load( " dataset.table " )
如果 Spark 叢集使用 Scala 2.12(對於 Spark 2.4.x 是可選的,在 3.0.x 中是強制的),則相關包為 com.google.cloud.spark:spark-bigquery-with-dependency_ 2.12 :0.41. 0。若要了解使用的是哪個 Scala 版本,請執行以下程式碼:
Python:
spark . sparkContext . _jvm . scala . util . Properties . versionString ()
斯卡拉:
scala . util . Properties . versionString
除非您希望使用隱式 Scala API spark.read.bigquery("TABLE_ID")
,否則無需針對連接器進行編譯。
若要將連接器包含在您的專案中:
< dependency >
< groupId >com.google.cloud.spark</ groupId >
< artifactId >spark-bigquery-with-dependencies_${scala.version}</ artifactId >
< version >0.41.0</ version >
</ dependency >
libraryDependencies + = " com.google.cloud.spark " %% " spark-bigquery-with-dependencies " % " 0.41.0 "
Spark 填入了許多指標,最終用戶可以在 Spark 歷史記錄頁面中找到這些指標。但所有這些指標都與 Spark 相關,它們是隱式收集的,無需連接器進行任何更改。但是,從 BigQuery 填充並且目前在應用程式日誌中可見的指標很少,可以在驅動程式/執行程式日誌中讀取。
從 Spark 3.2 開始,spark 提供了 API 來在 Spark UI 頁面中公開自訂指標 https://spark.apache.org/docs/3.2.0/api/java/org/apache/spark/sql/connector/metric /CustomMetric.html
目前,使用此 API,連接器在讀取期間公開以下 bigquery 指標
<style> table#metricstable td, table th { word-break:break-word } </style>指標名稱 | 描述 |
---|---|
bytes read | 讀取的 BigQuery 位元組數 |
rows read | 讀取的 BigQuery 行數 |
scan time | 要求在所有執行者中獲得的讀取行響應之間花費的時間,以毫秒為單位。 |
parse time | 分析行所花費的時間以毫秒為單位讀取所有執行者。 |
spark time | 在Spark上花費的時間來處理查詢(即,除了掃描和解析),所有執行者以毫秒為單位。 |
注意:要使用Spark UI頁面中的指標,您需要確保在啟動歷史程式伺服器之前, spark-bigquery-metrics-0.41.0.jar
是類別路徑,並且連接器版本為spark-3.2
或更高。
請參閱BigQuery定價文件。
您可以用maxParallelism
屬性手動設定分區數。 BigQuery可能提供的分區少於您要求的。請參閱配置分區。
在Spark閱讀後,您也可以始終重新分配。
如果分區太多,則可以超過CreateWritestream或吞吐量配額。之所以發生這種情況,是因為在每個分區中的資料串列處理時,可以在火花叢集中的不同節點上並行處理獨立的分區。通常,為了確保最大持續的吞吐量,您應該提交配額增加請求。但是,您也可以手動減少透過在DataFrame上呼叫coalesce
來減輕此問題的分割區數量。
desiredPartitionCount = 5
dfNew = df.coalesce(desiredPartitionCount)
dfNew.write
經驗法則是具有至少1GB資料的單一分區手把。
另請注意,執行writeAtLeastOnce
屬性的作業將不會遇到CreateWritestream配額錯誤。
該連接器需要一個GoogleCredentials的實例才能連接到BigQuery API。有多種選擇可以提供:
GOOGLE_APPLICATION_CREDENTIALS
環境變數載入JSON金鑰。 // Globally
spark.conf.set("credentialsFile", "</path/to/key/file>")
// Per read/Write
spark.read.format("bigquery").option("credentialsFile", "</path/to/key/file>")
// Globally
spark.conf.set("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
// Per read/Write
spark.read.format("bigquery").option("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
gcpAccessTokenProvider
選項中提供。 AccessTokenProvider
必須以Java或其他JVM語言(例如Scala或Kotlin)實作。它必須具有一個no-arg建構函數,也必須一個建構子接受一個java.util.String
參數。可以使用gcpAccessTokenProviderConfig
選項提供此設定參數。如果未提供此功能,則將呼叫No-Arg構造函數。包含實現的罐子應在集群的類別路徑上。 // Globally
spark.conf.set("gcpAccessTokenProvider", "com.example.ExampleAccessTokenProvider")
// Per read/Write
spark.read.format("bigquery").option("gcpAccessTokenProvider", "com.example.ExampleAccessTokenProvider")
可以為特定使用者名稱和群組名稱配置服務帳戶的模擬,或預設使用下列屬性為所有使用者配置:
gcpImpersonationServiceAccountForUser_<USER_NAME>
(預設)
特定使用者的服務帳戶模仿。
gcpImpersonationServiceAccountForGroup_<GROUP_NAME>
(預設設定)
特定群組的服務帳戶模仿。
gcpImpersonationServiceAccount
(預設未設定)
所有使用者的預設服務帳戶模仿。
如果設定了上述任何屬性,則指定的服務帳戶將透過在存取BigQuery時產生短暫的憑證來模仿。
如果設定了多個屬性,則與使用者名稱關聯的服務帳戶將優先於與符合使用者和群組的群組名稱相關的服務帳戶,而該群組名稱將優先於預設服務帳戶。
對於不需要存取令牌刷新的更簡單的應用程序,另一種選擇是將存取權杖作為gcpAccessToken
配置選項傳遞。您可以透過執行gcloud auth application-default print-access-token
來取得存取權杖。
// Globally
spark.conf.set("gcpAccessToken", "<access-token>")
// Per read/Write
spark.read.format("bigquery").option("gcpAccessToken", "<acccess-token>")
重要的是:需要在Java或其他JVM語言(例如Scala或Kotlin)中實現CredentialsProvider
和AccessTokenProvider
。包含實現的罐子應在集群的類別路徑上。
注意:僅應提供以上選項之一。
若要連線到向前代理並驗證使用者憑證,請配置以下選項。
proxyAddress
:代理伺服器的位址。代理必須是HTTP代理,位址應在host:port
格式。
proxyUsername
:用於連接代理的使用者名稱。
proxyPassword
:用於連接到代理的密碼。
val df = spark.read.format("bigquery")
.option("proxyAddress", "http://my-proxy:1234")
.option("proxyUsername", "my-username")
.option("proxyPassword", "my-password")
.load("some-table")
同一代理參數也可以使用Spark的RuntimeConfig進行全域設定:
spark.conf.set("proxyAddress", "http://my-proxy:1234")
spark.conf.set("proxyUsername", "my-username")
spark.conf.set("proxyPassword", "my-password")
val df = spark.read.format("bigquery")
.load("some-table")
您也可以在Hadoop配置中設定以下內容。
fs.gs.proxy.address
(類似「 proxyAddress」), fs.gs.proxy.username
(類似「 proxyusername」)和fs.gs.proxy.password
(類似「 proxypassword」)。
如果在多個位置設定相同的參數,則優先順序如下:
選項(“鍵”,“ value”)> spark.conf> hadoop配置