该连接器支持将 Google BigQuery 表读入 Spark 的 DataFrame,以及将 DataFrame 写回 BigQuery。这是通过使用 Spark SQL 数据源 API 与 BigQuery 通信来完成的。
Storage API 通过 gRPC 直接从 BigQuery 并行传输数据,无需使用 Google Cloud Storage 作为中介。
与使用以前的基于导出的读取流程相比,它具有许多优点,通常会带来更好的读取性能:
它不会在 Google Cloud Storage 中留下任何临时文件。使用 Arrow 或 Avro 有线格式直接从 BigQuery 服务器读取行。
新的 API 允许列和谓词过滤以仅读取您感兴趣的数据。
由于 BigQuery 由列式数据存储支持,因此它可以高效地流式传输数据,而无需读取所有列。
存储 API 支持谓词过滤器的任意下推。连接器版本 0.8.0-beta 及更高版本支持将任意过滤器下推到 Bigquery。
Spark 中有一个已知问题,不允许在嵌套字段上下推过滤器。例如,像address.city = "Sunnyvale"
这样的过滤器将不会下推到 Bigquery。
API 会在读取器之间重新平衡记录,直到它们全部完成。这意味着所有 Map 阶段将几乎同时完成。请参阅此博客文章,了解如何在 Google Cloud Dataflow 中类似地使用动态分片。
有关更多详细信息,请参阅配置分区。
请遵循这些说明。
如果您没有 Apache Spark 环境,您可以使用预配置的身份验证创建 Cloud Dataproc 集群。以下示例假设您使用的是 Cloud Dataproc,但您可以在任何集群上使用spark-submit
。
任何使用 API 的 Dataproc 集群都需要“bigquery”或“cloud-platform”范围。 Dataproc 集群默认具有“bigquery”范围,因此启用的项目中的大多数集群应该默认工作,例如
MY_CLUSTER=...
gcloud dataproc clusters create "$MY_CLUSTER"
连接器的最新版本可通过以下链接公开获取:
版本 | 关联 |
---|---|
火花3.5 | gs://spark-lib/bigquery/spark-3.5-bigquery-0.41.0.jar (HTTP 链接) |
火花3.4 | gs://spark-lib/bigquery/spark-3.4-bigquery-0.41.0.jar (HTTP 链接) |
火花3.3 | gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar (HTTP 链接) |
火花3.2 | gs://spark-lib/bigquery/spark-3.2-bigquery-0.41.0.jar (HTTP 链接) |
火花3.1 | gs://spark-lib/bigquery/spark-3.1-bigquery-0.41.0.jar (HTTP 链接) |
火花2.4 | gs://spark-lib/bigquery/spark-2.4-bigquery-0.37.0.jar (HTTP 链接) |
斯卡拉2.13 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-0.41.0.jar (HTTP 链接) |
斯卡拉2.12 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.41.0.jar (HTTP 链接) |
斯卡拉2.11 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.29.0.jar (HTTP 链接) |
前六个版本是基于 Java 的连接器,针对基于 Spark 的新数据源 API(数据源 API v2)构建的所有 Scala 版本中的 Spark 2.4/3.1/3.2/3.3/3.4/3.5。
最后两个连接器是基于 Scala 的连接器,请使用与您的 Spark 安装相关的 jar,如下所述。
连接器火花 | 2.3 | 2.4 | 3.0 | 3.1 | 3.2 | 3.3 | 3.4 | 3.5 |
---|---|---|---|---|---|---|---|---|
Spark-3.5-bigquery | ✓ | |||||||
Spark-3.4-bigquery | ✓ | ✓ | ||||||
Spark-3.3-bigquery | ✓ | ✓ | ✓ | |||||
Spark-3.2-bigquery | ✓ | ✓ | ✓ | ✓ | ||||
Spark-3.1-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | |||
Spark-2.4-bigquery | ✓ | |||||||
Spark-bigquery-with-dependencies_2.13 | ✓ | ✓ | ✓ | ✓ | ||||
Spark-bigquery-with-dependencies_2.12 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | |
Spark-bigquery-with-dependencies_2.11 | ✓ | ✓ |
连接器 Dataproc 图像 | 1.3 | 1.4 | 1.5 | 2.0 | 2.1 | 2.2 | 无服务器 图片1.0 | 无服务器 图像2.0 | 无服务器 图2.1 | 无服务器 图2.2 |
---|---|---|---|---|---|---|---|---|---|---|
Spark-3.5-bigquery | ✓ | ✓ | ||||||||
Spark-3.4-bigquery | ✓ | ✓ | ✓ | |||||||
Spark-3.3-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ||||
Spark-3.2-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ||||
Spark-3.1-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | |||
Spark-2.4-bigquery | ✓ | ✓ | ||||||||
Spark-bigquery-with-dependencies_2.13 | ✓ | ✓ | ✓ | |||||||
Spark-bigquery-with-dependencies_2.12 | ✓ | ✓ | ✓ | ✓ | ✓ | |||||
Spark-bigquery-with-dependencies_2.11 | ✓ | ✓ |
该连接器也可以从 Maven 中央存储库获取。可以使用--packages
选项或spark.jars.packages
配置属性来使用它。使用以下值
版本 | 连接器神器 |
---|---|
火花3.5 | com.google.cloud.spark:spark-3.5-bigquery:0.41.0 |
火花3.4 | com.google.cloud.spark:spark-3.4-bigquery:0.41.0 |
火花3.3 | com.google.cloud.spark:spark-3.3-bigquery:0.41.0 |
火花3.2 | com.google.cloud.spark:spark-3.2-bigquery:0.41.0 |
火花3.1 | com.google.cloud.spark:spark-3.1-bigquery:0.41.0 |
火花2.4 | com.google.cloud.spark:spark-2.4-bigquery:0.37.0 |
斯卡拉2.13 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.13:0.41.0 |
斯卡拉2.12 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0 |
斯卡拉2.11 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.29.0 |
使用映像 2.1 及更高版本创建的 Dataproc 集群或使用 Dataproc 无服务器服务的批处理附带内置 Spark BigQuery 连接器。在这种情况下,使用标准的--jars
或--packages
(或者, spark.jars
/ spark.jars.packages
配置)将无济于事,因为内置连接器优先。
要使用内置版本以外的其他版本,请执行以下操作之一:
--metadata SPARK_BQ_CONNECTOR_VERSION=0.41.0
或--metadata SPARK_BQ_CONNECTOR_URL=gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar
使用不同的 jar 创建集群。 URL 可以指向集群 Spark 版本的任何有效连接器 JAR。--properties dataproc.sparkBqConnector.version=0.41.0
或--properties dataproc.sparkBqConnector.uri=gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar
使用不同的 jar 创建批处理。 URL 可以指向运行时 Spark 版本的任何有效连接器 JAR。 您可以针对 API 运行简单的 PySpark 字数统计,无需编译,只需运行
Dataproc 映像 1.5 及更高版本
gcloud dataproc jobs submit pyspark --cluster "$MY_CLUSTER"
--jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.41.0.jar
examples/python/shakespeare.py
Dataproc 映像 1.4 及更低版本
gcloud dataproc jobs submit pyspark --cluster "$MY_CLUSTER"
--jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.29.0.jar
examples/python/shakespeare.py
https://codelabs.developers.google.com/codelabs/pyspark-bigquery
连接器使用跨语言 Spark SQL 数据源 API:
df = spark.read
.format("bigquery")
.load("bigquery-public-data.samples.shakespeare")
或仅 Scala 隐式 API:
import com.google.cloud.spark.bigquery._
val df = spark.read.bigquery("bigquery-public-data.samples.shakespeare")
有关更多信息,请参阅 Python、Scala 和 Java 中的其他代码示例。
该连接器允许您在 BigQuery 上运行任何标准 SQL SELECT 查询并将其结果直接提取到 Spark Dataframe。这很容易完成,如以下代码示例中所述:
spark.conf.set("viewsEnabled","true")
spark.conf.set("materializationDataset","<dataset>")
sql = """
SELECT tag, COUNT(*) c
FROM (
SELECT SPLIT(tags, '|') tags
FROM `bigquery-public-data.stackoverflow.posts_questions` a
WHERE EXTRACT(YEAR FROM creation_date)>=2014
), UNNEST(tags) tag
GROUP BY 1
ORDER BY 2 DESC
LIMIT 10
"""
df = spark.read.format("bigquery").load(sql)
df.show()
产生结果
+----------+-------+
| tag| c|
+----------+-------+
|javascript|1643617|
| python|1352904|
| java|1218220|
| android| 913638|
| php| 911806|
| c#| 905331|
| html| 769499|
| jquery| 608071|
| css| 510343|
| c++| 458938|
+----------+-------+
第二个选项是使用如下query
选项:
df = spark.read.format("bigquery").option("query", sql).load()
请注意,执行速度应该更快,因为只有结果通过线路传输。以类似的方式,查询可以更有效地包含 JOIN,然后在 Spark 上运行联接,或者使用其他 BigQuery 功能,例如子查询、BigQuery 用户定义函数、通配符表、BigQuery ML 等。
为了使用此功能,必须设置以下配置:
viewsEnabled
必须设置为true
。materializationDataset
必须设置为 GCP 用户具有建表权限的数据集。 materializationProject
是可选的。注意:如 BigQuery 文档中所述,查询的表必须与materializationDataset
位于同一位置。另外,如果SQL statement
中的表来自parentProject
以外的项目,则使用完全限定的表名称,即[project].[dataset].[table]
。
重要提示:此功能是通过在 BigQuery 上运行查询并将结果保存到临时表中来实现的,Spark 将从该临时表中读取结果。这可能会增加您的 BigQuery 帐户的额外费用。
该连接器初步支持从 BigQuery 视图中读取。请注意,有一些注意事项:
collect()
或count()
操作之前,此过程也会影响读取性能。materializationProject
和materializationDataset
选项进行配置。这些选项也可以通过在读取视图之前调用spark.conf.set(...)
来全局设置。.option("viewsEnabled", "true")
)时设置viewsEnabled选项,或者通过调用spark.conf.set("viewsEnabled", "true")
全局设置它。materializationDataset
应与视图位于同一位置。可以使用两种方法将 DataFrame 写入 BigQuery:直接方法和间接方法。
在此方法中,数据使用 BigQuery Storage Write API 直接写入 BigQuery。为了启用此选项,请将writeMethod
选项设置为direct
,如下所示:
df.write
.format("bigquery")
.option("writeMethod", "direct")
.save("dataset.table")
连接器和 BigQuery Storage Write API 完全支持在 APPEND 保存模式和 OVERWRITE 模式(仅日期和范围分区)下写入现有分区表(日期分区、摄取时间分区和范围分区)。直接写入方法目前不支持使用下面描述的datePartition
、 partitionField
、 partitionType
、 partitionRangeStart
、 partitionRangeEnd
、 partitionRangeInterval
。
重要提示:请参阅有关 BigQuery Storage Write API 定价的数据提取定价页面。
重要提示:请使用0.24.2及以上版本进行直接写入,因为之前的版本存在bug,在某些情况下可能会导致表删除。
在此方法中,数据首先写入 GCS,然后将其加载到 BigQuery。必须配置 GCS 存储桶来指示临时数据位置。
df.write
.format("bigquery")
.option("temporaryGcsBucket","some-bucket")
.save("dataset.table")
数据使用 Apache Parquet、Apache ORC 或 Apache Avro 格式临时存储。
GCS 存储桶和格式也可以使用 Spark 的 RuntimeConfig 进行全局设置,如下所示:
spark.conf.set("temporaryGcsBucket","some-bucket")
df.write
.format("bigquery")
.save("dataset.table")
将 DataFrame 流式传输到 BigQuery 时,每个批次的写入方式与非流式 DataFrame 相同。请注意,必须指定 HDFS 兼容的检查点位置(例如: path/to/HDFS/dir
或gs://checkpoint-bucket/checkpointDir
)。
df.writeStream
.format("bigquery")
.option("temporaryGcsBucket","some-bucket")
.option("checkpointLocation", "some-location")
.option("table", "dataset.table")
重要提示:连接器不会配置 GCS 连接器,以避免与其他 GCS 连接器(如果存在)发生冲突。为了使用连接器的写入功能,请按照此处的说明在集群上配置 GCS 连接器。
API 支持多种选项来配置读取
<style> table#propertytable td, table th { word-break:break-word } </style>财产 | 意义 | 用法 |
---|---|---|
table | BigQuery 表的格式为[[project:]dataset.]table 。建议使用load() / save() 的path 参数代替。此选项已被弃用,并将在未来版本中删除。(已弃用) | 读/写 |
dataset | 包含表的数据集。此选项应与标准表和视图一起使用,但不应在加载查询结果时使用。 (可选,除非 table 中省略) | 读/写 |
project | 表的 Google Cloud 项目 ID。此选项应与标准表和视图一起使用,但不应在加载查询结果时使用。 (可选。默认为正在使用的服务帐户的项目) | 读/写 |
parentProject | 要为导出计费的表的 Google Cloud 项目 ID。 (可选。默认为正在使用的服务帐户的项目) | 读/写 |
maxParallelism | 将数据分割成的最大分区数。如果 BigQuery 认为数据足够小,则实际数量可能会更少。如果没有足够的执行器来为每个分区安排一个读取器,则某些分区可能是空的。 重要提示:旧参数( parallelism )仍然受支持,但处于弃用模式。它将在连接器的 1.0 版本中删除。(可选。默认为 PreferredMinParallelism 和 20,000 中较大的一个)。) | 读 |
preferredMinParallelism | 将数据分割成的首选最小分区数。如果 BigQuery 认为数据足够小,则实际数量可能会更少。如果没有足够的执行器来为每个分区安排一个读取器,则某些分区可能是空的。 (可选。默认为应用程序默认并行度和 maxParallelism 的 3 倍中的最小值。) | 读 |
viewsEnabled | 使连接器能够从视图而不仅仅是表中读取。请在激活此选项之前阅读相关部分。 (可选。默认为 false ) | 读 |
materializationProject | 将在其中创建物化视图的项目 ID (可选。默认为视图的项目 ID) | 读 |
materializationDataset | 将在其中创建物化视图的数据集。该数据集应与视图或查询表位于同一位置。 (可选。默认为视图的数据集) | 读 |
materializationExpirationTimeInMinutes | 保存视图或查询的具体化数据的临时表的过期时间(以分钟为单位)。请注意,由于使用本地缓存以及为了减少 BigQuery 计算,连接器可能会重复使用临时表,因此非常低的值可能会导致错误。该值必须是正整数。 (可选。默认为 1440,即 24 小时) | 读 |
readDataFormat | 用于从 BigQuery 读取的数据格式。选项: ARROW 、 AVRO (可选。默认为 ARROW ) | 读 |
optimizedEmptyProjection | 连接器使用优化的空投影(选择不带任何列的)逻辑,用于count() 执行。该逻辑直接从表元数据获取数据,或者在存在过滤器的情况下执行高效的“SELECT COUNT(*) WHERE...”。您可以通过将此选项设置为false 来取消使用此逻辑。(可选,默认为 true ) | 读 |
pushAllFilters | 如果设置为true ,连接器会将 Spark 可以委托给 BigQuery Storage API 的所有筛选器推送。这减少了需要从 BigQuery Storage API 服务器发送到 Spark 客户端的数据量。此选项已被弃用,并将在未来版本中删除。(可选,默认为 true )(已弃用) | 读 |
bigQueryJobLabel | 可用于向连接器发起的查询添加标签并加载 BigQuery 作业。可以设置多个标签。 (选修的) | 读 |
bigQueryTableLabel | 可用于在写入表时向表添加标签。可以设置多个标签。 (选修的) | 写 |
traceApplicationName | 用于跟踪 BigQuery 存储读取和写入会话的应用程序名称。需要设置应用程序名称才能设置会话上的跟踪 ID。 (选修的) | 读 |
traceJobId | 用于跟踪 BigQuery 存储读取和写入会话的作业 ID。 (可选,默认 Dataproc 作业 ID 存在,否则使用 Spark 应用程序 ID) | 读 |
createDisposition | 指定是否允许作业创建新表。允许的值为:
(可选。默认为 CREATE_IF_NEEDED)。 | 写 |
writeMethod | 控制将数据写入 BigQuery 的方法。可用值包括direct 使用 BigQuery Storage Write API 和indirect ,即首先将数据写入 GCS,然后触发 BigQuery 加载操作。在这里查看更多内容(可选,默认为 indirect ) | 写 |
writeAtLeastOnce | 确保数据至少写入 BigQuery 一次。这是比恰好一次的保证要少的保证。适用于小批量连续写入数据的流式场景。 (可选。默认为 false )仅支持“DIRECT”写入方法,且模式不支持“Overwrite”。 | 写 |
temporaryGcsBucket | 在将数据加载到 BigQuery 之前临时保存数据的 GCS 存储桶。必需的,除非在 Spark 配置 ( spark.conf.set(...) ) 中设置。“DIRECT”写入方法不支持。 | 写 |
persistentGcsBucket | 在将数据加载到 BigQuery 之前保存数据的 GCS 存储桶。如果收到通知,则将数据写入 BigQuery 后不会删除数据。 “DIRECT”写入方法不支持。 | 写 |
persistentGcsPath | 在加载到 BigQuery 之前保存数据的 GCS 路径。仅与persistentGcsBucket 一起使用。“DIRECT”写入方法不支持。 | 写 |
intermediateFormat | 数据加载到 BigQuery 之前的格式,值可以是“parquet”、“orc”或“avro”。为了使用Avro格式,必须在运行时添加spark-avro包。 (可选。默认为 parquet )。只写。仅支持“间接”写入方法。 | 写 |
useAvroLogicalTypes | 从 Avro 加载时(`.option("intermediateFormat", "avro")`),BigQuery 使用底层 Avro 类型而不是逻辑类型[默认](https://cloud.google.com/bigquery/docs/加载数据云存储-avro#逻辑类型)。提供此选项会将 Avro 逻辑类型转换为其相应的 BigQuery 数据类型。 (可选。默认为 false )。只写。 | 写 |
datePartition | 数据将写入的日期分区。应该是以YYYYMMDD 格式给出的日期字符串。可用于覆盖单个分区的数据,如下所示:
(选修的)。只写。 还可以与不同的分区类型一起使用,例如: 时间: YYYYMMDDHH 月份: YYYYMM 年份: YYYY “DIRECT”写入方法不支持。 | 写 |
partitionField | 如果指定了该字段,则表将按该字段进行分区。 对于时间分区,请与选项“partitionType”一起指定。 对于整数范围分区,请指定 3 个选项:“partitionRangeStart”、“partitionRangeEnd”、“partitionRangeInterval”。 对于时间分区,该字段必须是顶级 TIMESTAMP 或 DATE 字段;对于整数范围分区,该字段必须是 INT64。它的模式必须是NULLABLE或REQUIRED 。如果没有为时间分区表设置该选项,则该表将按伪列分区,通过 '_PARTITIONTIME' as TIMESTAMP 类型或'_PARTITIONDATE' as DATE 类型引用。(选修的)。 “DIRECT”写入方法不支持。 | 写 |
partitionExpirationMs | 表中分区的存储保留的毫秒数。分区中的存储将具有其分区时间加上该值的过期时间。 (选修的)。 “DIRECT”写入方法不支持。 | 写 |
partitionType | 用于指定时间分区。 支持的类型有: HOUR, DAY, MONTH, YEAR 对于要进行时间分区的目标表,此选项是必需的。 (可选。如果指定了 PartitionField,则默认为 DAY)。 “DIRECT”写入方法不支持。 | 写 |
partitionRangeStart 、 partitionRangeEnd 、 partitionRangeInterval | 用于指定整数范围分区。 对于要进行整数范围分区的目标表来说,这些选项是必需的。 必须指定所有 3 个选项。 “DIRECT”写入方法不支持。 | 写 |
clusteredFields | 一串不重复的顶级列,以逗号分隔。 (选修的)。 | 写 |
allowFieldAddition | 将 ALLOW_FIELD_ADDITION SchemaUpdateOption 添加到 BigQuery LoadJob。允许的值为true 和false 。(可选。默认为 false )。仅由“INDIRECT”写入方法支持。 | 写 |
allowFieldRelaxation | 将 ALLOW_FIELD_RELAXATION SchemaUpdateOption 添加到 BigQuery LoadJob。允许的值为true 和false 。(可选。默认为 false )。仅受“INDIRECT”写入方法支持。 | 写 |
proxyAddress | 代理服务器的地址。代理必须是 HTTP 代理,地址应采用“主机:端口”格式。也可以在 Spark 配置 ( spark.conf.set(...) ) 或 Hadoop 配置 ( fs.gs.proxy.address ) 中设置。(可选。仅当通过代理连接到 GCP 时才需要。) | 读/写 |
proxyUsername | 用于连接到代理的用户名。也可以在 Spark 配置 ( spark.conf.set(...) ) 或 Hadoop 配置 ( fs.gs.proxy.username ) 中设置。(可选。仅当通过具有身份验证的代理连接到 GCP 时才需要。) | 读/写 |
proxyPassword | 用于连接到代理的密码。也可以在 Spark 配置 ( spark.conf.set(...) ) 或 Hadoop 配置 ( fs.gs.proxy.password ) 中设置。(可选。仅当通过具有身份验证的代理连接到 GCP 时才需要。) | 读/写 |
httpMaxRetry | 对 BigQuery 的低级别 HTTP 请求的最大重试次数。也可以在 Spark 配置 ( spark.conf.set("httpMaxRetry", ...) ) 或 Hadoop 配置 ( fs.gs.http.max.retry ) 中设置。(可选。默认值为 10) | 读/写 |
httpConnectTimeout | 与 BigQuery 建立连接的超时时间(以毫秒为单位)。也可以在 Spark 配置 ( spark.conf.set("httpConnectTimeout", ...) ) 或 Hadoop 配置 ( fs.gs.http.connect-timeout ) 中设置。(可选。默认为 60000 毫秒。0 表示无限超时,负数表示 20000) | 读/写 |
httpReadTimeout | 从已建立的连接读取数据的超时(以毫秒为单位)。也可以在 Spark 配置 ( spark.conf.set("httpReadTimeout", ...) ) 或 Hadoop 配置 ( fs.gs.http.read-timeout ) 中设置。(可选。默认为 60000 毫秒。0 表示无限超时,负数表示 20000) | 读 |
arrowCompressionCodec | 使用 Arrow 格式从 BigQuery 表读取时的压缩编解码器。选项: ZSTD (Zstandard compression) , LZ4_FRAME (https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md) , COMPRESSION_UNSPECIFIED 。使用 Java 时推荐的压缩编解码器是ZSTD 。(可选。默认为 COMPRESSION_UNSPECIFIED ,这意味着不会使用压缩) | 读 |
responseCompressionCodec | 压缩编解码器用于压缩 ReadRowsResponse 数据。选项: RESPONSE_COMPRESSION_CODEC_UNSPECIFIED 、 RESPONSE_COMPRESSION_CODEC_LZ4 (可选。默认为 RESPONSE_COMPRESSION_CODEC_UNSPECIFIED ,这意味着不会使用压缩) | 读 |
cacheExpirationTimeInMinutes | 存储查询信息的内存缓存的过期时间。 要禁用缓存,请将值设置为 0。 (可选。默认为 15 分钟) | 读 |
enableModeCheckForSchemaFields | 在直接写入期间,检查目标模式中每个字段的模式是否等于相应源字段模式中的模式。 默认值为 true,即默认进行检查。如果设置为 false,则忽略模式检查。 | 写 |
enableListInference | 指示当模式为 Parquet 时是否专门使用架构推断 (https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions)。 默认为 false。 | 写 |
bqChannelPoolSize | BigQueryReadClient 创建的 gRPC 通道池的(固定)大小。 为了获得最佳性能,应至少将其设置为集群执行器上的核心数量。 | 读 |
createReadSessionTimeoutInSeconds | 读取表时创建 ReadSession 的超时时间(以秒为单位)。 对于非常大的表,应该增加该值。 (可选。默认为 600 秒) | 读 |
queryJobPriority | 从 BigQuery 查询读取数据时为作业设置的优先级。允许的值为:
(可选。默认为 INTERACTIVE ) | 读/写 |
destinationTableKmsKeyName | 描述将用于保护目标 BigQuery 表的 Cloud KMS 加密密钥。与您的项目关联的 BigQuery 服务帐号需要访问此加密密钥。有关将 CMEK 与 BigQuery 结合使用的更多信息,请参阅[此处](https://cloud.google.com/bigquery/docs/customer-management-encryption#key_resource_id)。 注意:只有连接器创建的表才会被密钥加密。仅通过设置此选项不会对预先存在的未加密表进行加密。 (选修的) | 写 |
allowMapTypeConversion | 当记录有两个字段名称分别为key 和value 子字段时,布尔配置用于禁用从 BigQuery 记录到 Spark MapType 的转换。默认值为true ,允许转换。(选修的) | 读 |
spark.sql.sources.partitionOverwriteMode | 配置为在表进行范围/时间分区时指定写入时的覆盖模式。目前支持两种模式: STATIC 和DYNAMIC 。在STATIC 模式下,整个表都会被覆盖。在DYNAMIC 模式下,数据将被现有表的分区覆盖。默认值为STATIC 。(选修的) | 写 |
enableReadSessionCaching | 用于禁用读取会话缓存的布尔配置。缓存 BigQuery 读取会话以实现更快的 Spark 查询规划。默认值为true 。(选修的) | 读 |
readSessionCacheDurationMins | 配置以分钟为单位设置读取会话缓存持续时间。仅当enableReadSessionCaching 为true (默认)时才有效。允许指定缓存读取会话的持续时间。最大允许值为300 。默认值为5 。(选修的) | 读 |
bigQueryJobTimeoutInMinutes | 配置以设置 BigQuery 作业超时(以分钟为单位)。默认值为360 分钟。(选修的) | 读/写 |
snapshotTimeMillis | 以毫秒为单位指定的时间戳,用于读取表快照。默认情况下,未设置此项,并且会读取表的最新版本。 (选修的) | 读 |
bigNumericDefaultPrecision | BigNumeric 字段的替代默认精度,因为 BigQuery 默认值对于 Spark 来说太宽。值可以介于 1 和 38 之间。仅当字段具有未参数化的 BigNumeric 类型时,才使用此默认值。请注意,如果实际数据的精度超过指定的精度,可能会导致数据丢失。 (选修的) | 读/写 |
bigNumericDefaultScale | BigNumeric 字段的替代默认比例。值可以介于 0 到 38 之间,并且小于 bigNumericFieldsPrecision。仅当字段具有未参数化的 BigNumeric 类型时,才使用此默认值。请注意,如果实际数据规模大于指定规模,可能会导致数据丢失。 (选修的) | 读/写 |
还可以使用spark-submit
的--conf
参数或gcloud dataproc submit spark
的--properties
参数在代码外部设置选项。为了使用它,请在前面添加前缀spark.datasource.bigquery.
对于任何选项,例如, spark.conf.set("temporaryGcsBucket", "some-bucket")
也可以设置为--conf spark.datasource.bigquery.temporaryGcsBucket=some-bucket
。
除DATETIME
和TIME
之外,所有 BigQuery 数据类型都直接映射到相应的 Spark SQL 数据类型。以下是所有映射:
BigQuery 标准 SQL 数据类型 | 星火SQL 数据类型 | 笔记 |
BOOL | BooleanType | |
INT64 | LongType | |
FLOAT64 | DoubleType | |
NUMERIC | DecimalType | 请参阅数字和 BigNumeric 支持 |
BIGNUMERIC | DecimalType | 请参阅数字和 BigNumeric 支持 |
STRING | StringType | |
BYTES | BinaryType | |
STRUCT | StructType | |
ARRAY | ArrayType | |
TIMESTAMP | TimestampType | |
DATE | DateType | |
DATETIME | StringType , TimestampNTZType * | Spark 没有 DATETIME 类型。 Spark 字符串可以写入现有的 BQ DATETIME 列,前提是它采用 BQ DATETIME 文字的格式。 * 对于 Spark 3.4+,BQ DATETIME 被读取为 Spark 的 TimestampNTZ 类型,即 java LocalDateTime |
TIME | LongType 、 StringType * | Spark 没有 TIME 类型。生成的 long 表示自午夜以来的微秒数,可以安全地转换为 TimestampType,但这会导致日期被推断为当天。因此,时间可以保留为长,用户可以根据需要进行投射。 当转换为时间戳 TIME 时,与 DATETIME 具有相同的时区问题 * Spark 字符串可以写入现有的 BQ TIME 列,前提是它采用 BQ TIME 文字的格式。 |
JSON | StringType | Spark 没有 JSON 类型。这些值被读取为字符串。要将 JSON 写回 BigQuery,需要满足以下条件:
|
ARRAY<STRUCT<key,value>> | MapType | BigQuery 没有 MAP 类型,因此与 Apache Avro 和 BigQuery Load 作业等其他转换类似,连接器将 Spark Map 转换为 REPEATED STRUCT<key,value>。这意味着虽然可以写入和读取地图,但不支持在 BigQuery 上运行使用地图语义的 SQL。要使用 BigQuery SQL 引用地图的值,请查看 BigQuery 文档。由于这些不兼容性,存在一些限制:
|
支持 Spark ML Vector 和 Matrix,包括它们的密集和稀疏版本。数据保存为 BigQuery RECORD。请注意,字段的描述中添加了一个后缀,其中包括字段的 Spark 类型。
为了将这些类型写入 BigQuery,请使用 ORC 或 Avro 中间格式,并将它们作为 Row 的列(即不是结构中的字段)。
BigQuery的BigNumeric的精度为76.76(第77位为部分),小数位数为38。由于这个精度和小数位数超出了spark的DecimalType(38小数位数和38精度)支持,这意味着不能使用精度大于38的BigNumeric字段。一旦 Spark 限制被更新,连接器也会相应更新。
Spark Decimal/BigQuery Numeric 转换尝试保留类型的参数化,即NUMERIC(10,2)
将转换为Decimal(10,2)
,反之亦然。但请注意,有时参数会丢失。这意味着参数将恢复为默认值 - NUMERIC (38,9) 和 BIGNUMERIC(76,38)。这意味着目前仅支持从标准表读取 BigNumeric,而不支持从 BigQuery 视图或从 BigQuery 查询读取数据时读取。
连接器自动计算列并下推过滤 DataFrame 的SELECT
语句,例如
spark.read.bigquery("bigquery-public-data:samples.shakespeare")
.select("word")
.where("word = 'Hamlet' or word = 'Claudius'")
.collect()
过滤到列word
并下推谓词过滤器word = 'hamlet' or word = 'Claudius'
。
如果您不希望向 BigQuery 发出多个读取请求,您可以在过滤之前缓存 DataFrame,例如:
val cachedDF = spark.read.bigquery("bigquery-public-data:samples.shakespeare").cache()
val rows = cachedDF.select("word")
.where("word = 'Hamlet'")
.collect()
// All of the table was cached and this doesn't require an API call
val otherRows = cachedDF.select("word_count")
.where("word = 'Romeo'")
.collect()
您还可以手动指定filter
选项,这将覆盖自动下推,Spark 将在客户端中完成其余的过滤。
伪列 _PARTITIONDATE 和 _PARTITIONTIME 不是表架构的一部分。因此,为了按分区表的分区进行查询,不要使用上面所示的 where() 方法。相反,请按以下方式添加过滤器选项:
val df = spark.read.format("bigquery")
.option("filter", "_PARTITIONDATE > '2019-01-01'")
...
.load(TABLE)
默认情况下,连接器在正在读取的表中每 400MB 创建一个分区(过滤之前)。这应大致对应于 BigQuery Storage API 支持的最大读取器数量。这可以使用maxParallelism
属性显式配置。 BigQuery 可能会根据服务器限制限制分区数量。
为了支持跟踪 BigQuery 资源的使用情况,连接器提供以下选项来标记 BigQuery 资源:
连接器可以启动 BigQuery 加载和查询作业。向作业添加标签的方式如下:
spark.conf.set("bigQueryJobLabel.cost_center", "analytics")
spark.conf.set("bigQueryJobLabel.usage", "nightly_etl")
这将创建标签cost_center
= analytics
和usage
= nightly_etl
。
用于注释读取和写入会话。跟踪 ID 的格式为Spark:ApplicationName:JobID
。这是一个选择加入选项,要使用它,用户需要设置traceApplicationName
属性。 JobID 由 Dataproc 作业 ID 自动生成,并回退到 Spark 应用程序 ID(例如application_1648082975639_0001
)。可以通过设置traceJobId
选项来覆盖作业ID。请注意,跟踪 ID 的总长度不能超过 256 个字符。
即使未安装在 Spark 集群上,该连接器也可以在 Jupyter 笔记本中使用。可以使用以下代码将其添加为外部 jar:
Python:
from pyspark . sql import SparkSession
spark = SparkSession . builder
. config ( "spark.jars.packages" , "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0" )
. getOrCreate ()
df = spark . read . format ( "bigquery" )
. load ( "dataset.table" )
斯卡拉:
val spark = SparkSession .builder
.config( " spark.jars.packages " , " com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0 " )
.getOrCreate()
val df = spark.read.format( " bigquery " )
.load( " dataset.table " )
如果 Spark 集群使用 Scala 2.12(对于 Spark 2.4.x 是可选的,在 3.0.x 中是强制的),则相关包为 com.google.cloud.spark:spark-bigquery-with-dependency_ 2.12 :0.41.0。要了解使用的是哪个 Scala 版本,请运行以下代码:
Python:
spark . sparkContext . _jvm . scala . util . Properties . versionString ()
斯卡拉:
scala . util . Properties . versionString
除非您希望使用隐式 Scala API spark.read.bigquery("TABLE_ID")
,否则无需针对连接器进行编译。
要将连接器包含在您的项目中:
< dependency >
< groupId >com.google.cloud.spark</ groupId >
< artifactId >spark-bigquery-with-dependencies_${scala.version}</ artifactId >
< version >0.41.0</ version >
</ dependency >
libraryDependencies + = " com.google.cloud.spark " %% " spark-bigquery-with-dependencies " % " 0.41.0 "
Spark 填充了许多指标,最终用户可以在 Spark 历史记录页面中找到这些指标。但所有这些指标都与 Spark 相关,它们是隐式收集的,无需连接器进行任何更改。但是,从 BigQuery 填充并且目前在应用程序日志中可见的指标很少,可以在驱动程序/执行程序日志中读取。
从 Spark 3.2 开始,spark 提供了 API 来在 Spark UI 页面中公开自定义指标 https://spark.apache.org/docs/3.2.0/api/java/org/apache/spark/sql/connector/metric /CustomMetric.html
目前,使用此 API,连接器在读取期间公开以下 bigquery 指标
<样式>表#metricstable TD,表Th {word-break:break-word} </style>指标名称 | 描述 |
---|---|
bytes read | BigQuery字节的数量阅读 |
rows read | 读取的bigquery行读数 |
scan time | 要求在所有执行者中获得的读取行响应之间花费的时间,以毫秒为单位。 |
parse time | 分析行所花费的时间以毫秒为单位读取所有执行者。 |
spark time | 在Spark上花费的时间来处理查询(即,除了扫描和解析),所有执行者以毫秒为单位。 |
注意:要使用Spark UI页面中的指标,您需要确保在启动历史程序服务器之前, spark-bigquery-metrics-0.41.0.jar
是类路径,并且连接器版本为spark-3.2
或更高。
请参阅BigQuery定价文档。
您可以用maxParallelism
属性手动设置分区数。 BigQuery可能提供的分区少于您要求的。请参阅配置分区。
在Spark阅读后,您也可以始终重新分配。
如果分区太多,则可以超过CreateWritestream或吞吐量配额。之所以发生这种情况,是因为在每个分区中的数据串行处理时,可以在火花集群中的不同节点上并行处理独立的分区。通常,为了确保最大持续的吞吐量,您应提交配额增加请求。但是,您还可以手动减少通过在DataFrame上调用coalesce
来减轻此问题的分区数量。
desiredPartitionCount = 5
dfNew = df.coalesce(desiredPartitionCount)
dfNew.write
经验法则是具有至少1GB数据的单个分区手柄。
另请注意,运行writeAtLeastOnce
属性的作业将不会遇到CreateWritestream配额错误。
该连接器需要一个GoogleCredentials的实例才能连接到BigQuery API。有多种选择可以提供:
GOOGLE_APPLICATION_CREDENTIALS
环境变量加载JSON密钥。 // Globally
spark.conf.set("credentialsFile", "</path/to/key/file>")
// Per read/Write
spark.read.format("bigquery").option("credentialsFile", "</path/to/key/file>")
// Globally
spark.conf.set("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
// Per read/Write
spark.read.format("bigquery").option("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
gcpAccessTokenProvider
选项中提供。 AccessTokenProvider
必须以Java或其他JVM语言(例如Scala或Kotlin)实现。它必须具有一个no-arg构造函数,也必须一个构造函数接受一个java.util.String
参数。可以使用gcpAccessTokenProviderConfig
选项提供此配置参数。如果未提供此功能,则将调用No-Arg构造函数。包含实现的罐子应在集群的类路径上。 // Globally
spark.conf.set("gcpAccessTokenProvider", "com.example.ExampleAccessTokenProvider")
// Per read/Write
spark.read.format("bigquery").option("gcpAccessTokenProvider", "com.example.ExampleAccessTokenProvider")
可以为特定用户名和组名称配置服务帐户的模拟,或默认使用以下属性为所有用户配置:
gcpImpersonationServiceAccountForUser_<USER_NAME>
(默认设置)
特定用户的服务帐户模仿。
gcpImpersonationServiceAccountForGroup_<GROUP_NAME>
(默认设置)
特定组的服务帐户模仿。
gcpImpersonationServiceAccount
(默认未设置)
所有用户的默认服务帐户模仿。
如果设置了上述任何属性,则指定的服务帐户将通过在访问BigQuery时生成短暂的凭据来模仿。
如果设置了多个属性,则与用户名相关联的服务帐户将优先于与匹配用户和组的组名称相关的服务帐户,而该组名称将优先于默认服务帐户。
对于不需要访问令牌刷新的更简单的应用程序,另一种选择是将访问令牌作为gcpAccessToken
配置选项传递。您可以通过运行gcloud auth application-default print-access-token
来获取访问令牌。
// Globally
spark.conf.set("gcpAccessToken", "<access-token>")
// Per read/Write
spark.read.format("bigquery").option("gcpAccessToken", "<acccess-token>")
重要的是:需要在Java或其他JVM语言(例如Scala或Kotlin)中实现CredentialsProvider
和AccessTokenProvider
。包含实现的罐子应在集群的类路径上。
注意:仅应提供以上选项之一。
要连接到向前代理并验证用户凭据,请配置以下选项。
proxyAddress
:代理服务器的地址。代理必须是HTTP代理,地址应在host:port
格式。
proxyUsername
:用于连接代理的用户名。
proxyPassword
:用于连接到代理的密码。
val df = spark.read.format("bigquery")
.option("proxyAddress", "http://my-proxy:1234")
.option("proxyUsername", "my-username")
.option("proxyPassword", "my-password")
.load("some-table")
同一代理参数也可以使用Spark的RuntimeConfig进行全局设置:
spark.conf.set("proxyAddress", "http://my-proxy:1234")
spark.conf.set("proxyUsername", "my-username")
spark.conf.set("proxyPassword", "my-password")
val df = spark.read.format("bigquery")
.load("some-table")
您也可以在Hadoop配置中设置以下内容。
fs.gs.proxy.address
(类似于“ proxyAddress”), fs.gs.proxy.username
(类似于“ proxyusername”)和fs.gs.proxy.password
(类似于“ proxypassword”)。
如果在多个位置设置相同的参数,则优先顺序如下:
选项(“键”,“ value”)> spark.conf> hadoop配置