El conector admite la lectura de tablas de Google BigQuery en DataFrames de Spark y la escritura de DataFrames nuevamente en BigQuery. Esto se hace mediante el uso de la API Spark SQL Data Source para comunicarse con BigQuery.
La API de Storage transmite datos en paralelo directamente desde BigQuery a través de gRPC sin utilizar Google Cloud Storage como intermediario.
Tiene una serie de ventajas sobre el uso del flujo de lectura anterior basado en exportación que, en general, debería conducir a un mejor rendimiento de lectura:
No deja ningún archivo temporal en Google Cloud Storage. Las filas se leen directamente desde los servidores de BigQuery mediante los formatos de cable Arrow o Avro.
La nueva API permite el filtrado de columnas y predicados para leer solo los datos que le interesan.
Dado que BigQuery está respaldado por un almacén de datos en columnas, puede transmitir datos de manera eficiente sin leer todas las columnas.
La API de almacenamiento admite la inserción arbitraria de filtros de predicados. La versión 0.8.0-beta y superiores del conector admiten la inserción de filtros arbitrarios en Bigquery.
Hay un problema conocido en Spark que no permite la inserción de filtros en campos anidados. Por ejemplo, filtros como address.city = "Sunnyvale"
no se enviarán a Bigquery.
La API reequilibra los registros entre lectores hasta que se completan todos. Esto significa que todas las fases del Mapa finalizarán casi al mismo tiempo. Consulte este artículo de blog sobre cómo se utiliza de manera similar la fragmentación dinámica en Google Cloud Dataflow.
Consulte Configuración de particiones para obtener más detalles.
Siga estas instrucciones.
Si no tiene un entorno Apache Spark, puede crear un clúster de Cloud Dataproc con autenticación preconfigurada. En los siguientes ejemplos se supone que estás usando Cloud Dataproc, pero puedes usar spark-submit
en cualquier clúster.
Cualquier clúster de Dataproc que utilice la API necesita los alcances "bigquery" o "plataforma en la nube". Los clústeres de Dataproc tienen el alcance 'bigquery' de forma predeterminada, por lo que la mayoría de los clústeres en proyectos habilitados deberían funcionar de forma predeterminada, por ejemplo.
MY_CLUSTER=...
gcloud dataproc clusters create "$MY_CLUSTER"
La última versión del conector está disponible públicamente en los siguientes enlaces:
versión | Enlace |
---|---|
Chispa 3.5 | gs://spark-lib/bigquery/spark-3.5-bigquery-0.41.0.jar (enlace HTTP) |
Chispa 3.4 | gs://spark-lib/bigquery/spark-3.4-bigquery-0.41.0.jar (enlace HTTP) |
Chispa 3.3 | gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar (enlace HTTP) |
Chispa 3.2 | gs://spark-lib/bigquery/spark-3.2-bigquery-0.41.0.jar (enlace HTTP) |
Chispa 3.1 | gs://spark-lib/bigquery/spark-3.1-bigquery-0.41.0.jar (enlace HTTP) |
Chispa 2.4 | gs://spark-lib/bigquery/spark-2.4-bigquery-0.37.0.jar (enlace HTTP) |
Escala 2.13 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-0.41.0.jar (enlace HTTP) |
Escala 2.12 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.41.0.jar (enlace HTTP) |
Escala 2.11 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.29.0.jar (enlace HTTP) |
Las primeras seis versiones son conectores basados en Java destinados a Spark 2.4/3.1/3.2/3.3/3.4/3.5 de todas las versiones de Scala creadas en las nuevas API de origen de datos (API de origen de datos v2) de Spark.
Los dos últimos conectores son conectores basados en Scala; utilice el contenedor correspondiente a su instalación Spark como se describe a continuación.
Conector Chispa | 2.3 | 2.4 | 3.0 | 3.1 | 3.2 | 3.3 | 3.4 | 3.5 |
---|---|---|---|---|---|---|---|---|
chispa-3.5-bigquery | ✓ | |||||||
chispa-3.4-bigquery | ✓ | ✓ | ||||||
chispa-3.3-bigquery | ✓ | ✓ | ✓ | |||||
chispa-3.2-bigquery | ✓ | ✓ | ✓ | ✓ | ||||
chispa-3.1-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | |||
chispa-2.4-bigquery | ✓ | |||||||
spark-bigquery-con-dependencias_2.13 | ✓ | ✓ | ✓ | ✓ | ||||
spark-bigquery-con-dependencias_2.12 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | |
spark-bigquery-con-dependencias_2.11 | ✓ | ✓ |
Conector Imagen de Dataproc | 1.3 | 1.4 | 1.5 | 2.0 | 2.1 | 2.2 | Sin servidor Imagen 1.0 | Sin servidor Imagen 2.0 | Sin servidor Imagen 2.1 | Sin servidor Imagen 2.2 |
---|---|---|---|---|---|---|---|---|---|---|
chispa-3.5-bigquery | ✓ | ✓ | ||||||||
chispa-3.4-bigquery | ✓ | ✓ | ✓ | |||||||
chispa-3.3-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ||||
chispa-3.2-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ||||
chispa-3.1-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | |||
chispa-2.4-bigquery | ✓ | ✓ | ||||||||
spark-bigquery-con-dependencias_2.13 | ✓ | ✓ | ✓ | |||||||
spark-bigquery-con-dependencias_2.12 | ✓ | ✓ | ✓ | ✓ | ✓ | |||||
spark-bigquery-con-dependencias_2.11 | ✓ | ✓ |
El conector también está disponible en el repositorio central de Maven. Se puede utilizar utilizando la opción --packages
o la propiedad de configuración spark.jars.packages
. Utilice el siguiente valor
versión | Artefacto conector |
---|---|
Chispa 3.5 | com.google.cloud.spark:spark-3.5-bigquery:0.41.0 |
Chispa 3.4 | com.google.cloud.spark:spark-3.4-bigquery:0.41.0 |
Chispa 3.3 | com.google.cloud.spark:spark-3.3-bigquery:0.41.0 |
Chispa 3.2 | com.google.cloud.spark:spark-3.2-bigquery:0.41.0 |
Chispa 3.1 | com.google.cloud.spark:spark-3.1-bigquery:0.41.0 |
Chispa 2.4 | com.google.cloud.spark:spark-2.4-bigquery:0.37.0 |
Escala 2.13 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.13:0.41.0 |
Escala 2.12 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0 |
Escala 2.11 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.29.0 |
Los clústeres de Dataproc creados con la imagen 2.1 y superior, o los lotes que utilizan el servicio sin servidor de Dataproc, vienen con un conector Spark BigQuery integrado. Usar los --jars
o --packages
estándar (o alternativamente, la configuración spark.jars
/ spark.jars.packages
) no ayudará en este caso ya que el conector integrado tiene prioridad.
Para utilizar otra versión además de la incorporada, realice una de las siguientes acciones:
--metadata SPARK_BQ_CONNECTOR_VERSION=0.41.0
o --metadata SPARK_BQ_CONNECTOR_URL=gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar
para crear el clúster con un jar diferente. La URL puede apuntar a cualquier JAR de conector válido para la versión de Spark del clúster.--properties dataproc.sparkBqConnector.version=0.41.0
o --properties dataproc.sparkBqConnector.uri=gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar
para crear el lote con un frasco diferente. La URL puede apuntar a cualquier conector JAR válido para la versión Spark del tiempo de ejecución. Puede ejecutar un recuento de palabras de PySpark simple en la API sin compilación ejecutando
Imagen de Dataproc 1.5 y superior
gcloud dataproc jobs submit pyspark --cluster "$MY_CLUSTER"
--jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.41.0.jar
examples/python/shakespeare.py
Imagen de Dataproc 1.4 y siguientes
gcloud dataproc jobs submit pyspark --cluster "$MY_CLUSTER"
--jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.29.0.jar
examples/python/shakespeare.py
https://codelabs.developers.google.com/codelabs/pyspark-bigquery
El conector utiliza la API de origen de datos Spark SQL en varios idiomas:
df = spark.read
.format("bigquery")
.load("bigquery-public-data.samples.shakespeare")
o la API implícita únicamente de Scala:
import com.google.cloud.spark.bigquery._
val df = spark.read.bigquery("bigquery-public-data.samples.shakespeare")
Para obtener más información, consulte ejemplos de código adicionales en Python, Scala y Java.
El conector le permite ejecutar cualquier consulta SQL SELECT estándar en BigQuery y obtener sus resultados directamente en un Spark Dataframe. Esto se hace fácilmente como se describe en el siguiente ejemplo de código:
spark.conf.set("viewsEnabled","true")
spark.conf.set("materializationDataset","<dataset>")
sql = """
SELECT tag, COUNT(*) c
FROM (
SELECT SPLIT(tags, '|') tags
FROM `bigquery-public-data.stackoverflow.posts_questions` a
WHERE EXTRACT(YEAR FROM creation_date)>=2014
), UNNEST(tags) tag
GROUP BY 1
ORDER BY 2 DESC
LIMIT 10
"""
df = spark.read.format("bigquery").load(sql)
df.show()
Lo que arroja el resultado
+----------+-------+
| tag| c|
+----------+-------+
|javascript|1643617|
| python|1352904|
| java|1218220|
| android| 913638|
| php| 911806|
| c#| 905331|
| html| 769499|
| jquery| 608071|
| css| 510343|
| c++| 458938|
+----------+-------+
Una segunda opción es utilizar la opción query
como esta:
df = spark.read.format("bigquery").option("query", sql).load()
Tenga en cuenta que la ejecución debería ser más rápida ya que solo el resultado se transmite por cable. De manera similar, las consultas pueden incluir JOIN de manera más eficiente que ejecutar uniones en Spark o usar otras funciones de BigQuery, como subconsultas, funciones definidas por el usuario de BigQuery, tablas comodín, BigQuery ML y más.
Para utilizar esta función DEBEN establecerse las siguientes configuraciones:
viewsEnabled
debe establecerse en true
.materializationDataset
debe configurarse en un conjunto de datos donde el usuario de GCP tenga permiso de creación de tablas. materializationProject
es opcional. Nota: Como se menciona en la documentación de BigQuery, las tablas consultadas deben estar en la misma ubicación que materializationDataset
. Además, si las tablas en la SQL statement
provienen de proyectos distintos del parentProject
, utilice el nombre de tabla completo, es decir, [project].[dataset].[table]
.
Importante: esta característica se implementa ejecutando la consulta en BigQuery y guardando el resultado en una tabla temporal, de la cual Spark leerá los resultados. Esto puede agregar costos adicionales en su cuenta de BigQuery.
El conector tiene soporte preliminar para leer desde vistas de BigQuery. Tenga en cuenta que hay algunas advertencias:
collect()
o count()
.materializationProject
y materializationDataset
, respectivamente. Estas opciones también se pueden configurar globalmente llamando spark.conf.set(...)
antes de leer las vistas..option("viewsEnabled", "true")
) o configúrela globalmente llamando spark.conf.set("viewsEnabled", "true")
.materializationDataset
debe estar en la misma ubicación que la vista.Se pueden escribir DataFrames en BigQuery mediante dos métodos: directo e indirecto.
En este método, los datos se escriben directamente en BigQuery mediante la API de escritura de almacenamiento de BigQuery. Para habilitar esta opción, configure la opción writeMethod
en direct
, como se muestra a continuación:
df.write
.format("bigquery")
.option("writeMethod", "direct")
.save("dataset.table")
La escritura en tablas particionadas existentes (participación por fecha, partición por tiempo de ingesta y partición por rango) en el modo de guardado APPEND y en el modo OVERWRITE (solo partición por fecha y rango) es totalmente compatible con el conector y la API de escritura de almacenamiento de BigQuery. El uso de datePartition
, partitionField
, partitionType
, partitionRangeStart
, partitionRangeEnd
, partitionRangeInterval
descrito a continuación no es compatible en este momento con el método de escritura directa.
Importante: consulte la página de precios de ingesta de datos para conocer los precios de la API de escritura de almacenamiento de BigQuery.
Importante: utilice la versión 0.24.2 y superior para escrituras directas, ya que las versiones anteriores tienen un error que puede provocar la eliminación de la tabla en ciertos casos.
En este método, los datos se escriben primero en GCS y luego se cargan en BigQuery. Se debe configurar un depósito de GCS para indicar la ubicación de los datos temporales.
df.write
.format("bigquery")
.option("temporaryGcsBucket","some-bucket")
.save("dataset.table")
Los datos se almacenan temporalmente utilizando los formatos Apache Parquet, Apache ORC o Apache Avro.
El depósito GCS y el formato también se pueden configurar globalmente usando RuntimeConfig de Spark de esta manera:
spark.conf.set("temporaryGcsBucket","some-bucket")
df.write
.format("bigquery")
.save("dataset.table")
Cuando se transmite un DataFrame a BigQuery, cada lote se escribe de la misma manera que un DataFrame que no se transmite. Tenga en cuenta que se debe especificar una ubicación de punto de control compatible con HDFS (por ejemplo: path/to/HDFS/dir
o gs://checkpoint-bucket/checkpointDir
).
df.writeStream
.format("bigquery")
.option("temporaryGcsBucket","some-bucket")
.option("checkpointLocation", "some-location")
.option("table", "dataset.table")
Importante: El conector no configura el conector GCS para evitar conflictos con otro conector GCS, si existe. Para utilizar las capacidades de escritura del conector, configure el conector GCS en su clúster como se explica aquí.
La API admite una serie de opciones para configurar la lectura.
<estilo> tabla#propertytable td, tabla th { word-break:break-word } </style>Propiedad | Significado | Uso |
---|---|---|
table | La tabla de BigQuery en el formato [[project:]dataset.]table . Se recomienda utilizar el parámetro de path de load() / save() en su lugar. Esta opción ha quedado obsoleta y se eliminará en una versión futura.(Obsoleto) | Leer/Escribir |
dataset | El conjunto de datos que contiene la tabla. Esta opción debe usarse con tablas y vistas estándar, pero no al cargar resultados de consultas. (Opcional a menos que se omita en table ) | Leer/Escribir |
project | El ID del proyecto de Google Cloud de la tabla. Esta opción debe usarse con tablas y vistas estándar, pero no al cargar resultados de consultas. (Opcional. El valor predeterminado es el proyecto de la cuenta de servicio que se está utilizando) | Leer/Escribir |
parentProject | El ID del proyecto de Google Cloud de la tabla para facturar por la exportación. (Opcional. El valor predeterminado es el proyecto de la cuenta de servicio que se está utilizando) | Leer/Escribir |
maxParallelism | El número máximo de particiones en las que dividir los datos. El número real puede ser menor si BigQuery considera que los datos son lo suficientemente pequeños. Si no hay suficientes ejecutores para programar un lector por partición, algunas particiones pueden estar vacías. Importante: El parámetro antiguo ( parallelism ) todavía se admite, pero en modo obsoleto. Se habrá eliminado en la versión 1.0 del conector.(Opcional. El valor predeterminado es el mayor entre el Paralelismo mínimo preferido y 20 000). | Leer |
preferredMinParallelism | El número mínimo preferido de particiones para dividir los datos. El número real puede ser menor si BigQuery considera que los datos son lo suficientemente pequeños. Si no hay suficientes ejecutores para programar un lector por partición, algunas particiones pueden estar vacías. (Opcional. El valor predeterminado es el mínimo de 3 veces el paralelismo predeterminado y maxParallelism de la aplicación). | Leer |
viewsEnabled | Permite que el conector lea desde vistas y no solo tablas. Lea la sección correspondiente antes de activar esta opción. (Opcional. El valor predeterminado es false ) | Leer |
materializationProject | La identificación del proyecto donde se creará la vista materializada. (Opcional. El valor predeterminado es la identificación del proyecto de la vista) | Leer |
materializationDataset | El conjunto de datos donde se creará la vista materializada. Este conjunto de datos debe estar en la misma ubicación que la vista o las tablas consultadas. (Opcional. El valor predeterminado es ver el conjunto de datos) | Leer |
materializationExpirationTimeInMinutes | El tiempo de vencimiento de la tabla temporal que contiene los datos materializados de una vista o consulta, en minutos. Tenga en cuenta que el conector puede reutilizar la tabla temporal debido al uso de la memoria caché local y para reducir el cálculo de BigQuery, por lo que los valores muy bajos pueden provocar errores. El valor debe ser un número entero positivo. (Opcional. El valor predeterminado es 1440 o 24 horas) | Leer |
readDataFormat | Formato de datos para leer desde BigQuery. Opciones: ARROW , AVRO (Opcional. El valor predeterminado es ARROW ) | Leer |
optimizedEmptyProjection | El conector utiliza una lógica de proyección vacía optimizada (seleccionar sin columnas), utilizada para la ejecución count() . Esta lógica toma los datos directamente de los metadatos de la tabla o realiza un `SELECT COUNT(*) WHERE...` mucho más eficiente en caso de que haya un filtro. Puede cancelar el uso de esta lógica estableciendo esta opción en false .(Opcional, el valor predeterminado es true ) | Leer |
pushAllFilters | Si se establece en true , el conector envía todos los filtros que Spark puede delegar a la API de BigQuery Storage. Esto reduce la cantidad de datos que deben enviarse desde los servidores de la API de BigQuery Storage a los clientes Spark. Esta opción ha quedado obsoleta y se eliminará en una versión futura.(Opcional, el valor predeterminado es true )(Obsoleto) | Leer |
bigQueryJobLabel | Se puede usar para agregar etiquetas a la consulta iniciada por el conector y cargar trabajos de BigQuery. Se pueden configurar varias etiquetas. (Opcional) | Leer |
bigQueryTableLabel | Se puede utilizar para agregar etiquetas a la tabla mientras se escribe en una tabla. Se pueden configurar varias etiquetas. (Opcional) | Escribir |
traceApplicationName | Nombre de la aplicación utilizada para rastrear las sesiones de lectura y escritura de BigQuery Storage. Es necesario configurar el nombre de la aplicación para configurar el ID de seguimiento en las sesiones. (Opcional) | Leer |
traceJobId | ID de trabajo utilizado para rastrear las sesiones de lectura y escritura de BigQuery Storage. (Opcional, el valor predeterminado es el ID del trabajo de Dataproc; de lo contrario, se usa el ID de la aplicación Spark) | Leer |
createDisposition | Especifica si el trabajo puede crear nuevas tablas. Los valores permitidos son:
(Opcional. El valor predeterminado es CREATE_IF_NEEDED). | Escribir |
writeMethod | Controla el método en el que se escriben los datos en BigQuery. Los valores disponibles son direct para usar la API de escritura de almacenamiento de BigQuery e indirect , que escriben los datos primero en GCS y luego activan una operación de carga de BigQuery. Ver más aquí(Opcional, por defecto es indirect ) | Escribir |
writeAtLeastOnce | Garantiza que los datos se escriban en BigQuery al menos una vez. Esta es una garantía menor que exactamente una vez. Esto es adecuado para escenarios de streaming en los que los datos se escriben continuamente en pequeños lotes. (Opcional. El valor predeterminado es false )Solo se admite con el método de escritura "DIRECT" y el modo NO es "Sobrescribir". | Escribir |
temporaryGcsBucket | El depósito de GCS que contiene temporalmente los datos antes de cargarlos en BigQuery. Requerido a menos que esté configurado en la configuración de Spark ( spark.conf.set(...) ).No es compatible con el método de escritura "DIRECT". | Escribir |
persistentGcsBucket | El depósito de GCS que contiene los datos antes de cargarlos en BigQuery. Si se informa, los datos no se eliminarán después de escribirlos en BigQuery. No es compatible con el método de escritura "DIRECT". | Escribir |
persistentGcsPath | La ruta de GCS que contiene los datos antes de cargarlos en BigQuery. Se usa solo con persistentGcsBucket .No es compatible con el método de escritura "DIRECT". | Escribir |
intermediateFormat | El formato de los datos antes de cargarlos en BigQuery; los valores pueden ser "parquet", "orc" o "avro". Para utilizar el formato Avro, se debe agregar el paquete spark-avro en tiempo de ejecución. (Opcional. Por defecto parquet ). Sólo en escritura. Solo se admite para el método de escritura "INDIRECT". | Escribir |
useAvroLogicalTypes | Al cargar desde Avro (`.option("intermediateFormat", "avro")`), BigQuery usa los tipos de Avro subyacentes en lugar de los tipos lógicos [de forma predeterminada](https://cloud.google.com/bigquery/docs/ carga-datos-almacenamiento-en-la-nube-avro#logic_types). Al proporcionar esta opción, se convierten los tipos lógicos de Avro en sus tipos de datos de BigQuery correspondientes. (Opcional. El valor predeterminado es false ). Sólo en escritura. | Escribir |
datePartition | La partición de fecha en la que se escribirán los datos. Debe ser una cadena de fecha con el formato YYYYMMDD . Se puede utilizar para sobrescribir los datos de una sola partición, así:
(Opcional). Sólo en escritura. También se puede utilizar con diferentes tipos de particiones como: HORA: YYYYMMDDHH MES: YYYYMM AÑO: YYYY No es compatible con el método de escritura "DIRECT". | Escribir |
partitionField | Si se especifica este campo, la tabla está particionada por este campo. Para la partición de tiempo, especifique junto con la opción `partitionType`. Para la partición de rango entero, especifique junto con las 3 opciones: `partitionRangeStart`, `partitionRangeEnd, `partitionRangeInterval`. El campo debe ser un campo TIMESTAMP o DATE de nivel superior para la partición de tiempo, o INT64 para la partición de rango entero. Su modo debe ser NULLABLE o REQUIRED . Si la opción no está configurada para una tabla particionada por tiempo, la tabla se dividirá mediante pseudocolumna, a la que se hará referencia mediante '_PARTITIONTIME' as TIMESTAMP o '_PARTITIONDATE' as DATE .(Opcional). No es compatible con el método de escritura "DIRECT". | Escribir |
partitionExpirationMs | Número de milisegundos durante los cuales se mantendrá el almacenamiento de las particiones en la tabla. El almacenamiento en una partición tendrá un tiempo de vencimiento igual al tiempo de su partición más este valor. (Opcional). No es compatible con el método de escritura "DIRECT". | Escribir |
partitionType | Se utiliza para especificar la partición del tiempo. Los tipos admitidos son: HOUR, DAY, MONTH, YEAR Esta opción es obligatoria para que una tabla de destino esté particionada por tiempo. (Opcional. El valor predeterminado es DÍA si se especifica PartitionField). No es compatible con el método de escritura "DIRECT". | Escribir |
partitionRangeStart , partitionRangeEnd , partitionRangeInterval | Se utiliza para especificar la partición de rango entero. Estas opciones son obligatorias para que una tabla de destino tenga particiones de rango entero. Se deben especificar las 3 opciones. No es compatible con el método de escritura "DIRECT". | Escribir |
clusteredFields | Una cadena de columnas de nivel superior no repetidas separadas por comas. (Opcional). | Escribir |
allowFieldAddition | Agrega ALLOW_FIELD_ADDITION SchemaUpdateOption a BigQuery LoadJob. Los valores permitidos son true y false .(Opcional. El valor predeterminado es false ).Compatible únicamente con el método de escritura "INDIRECT". | Escribir |
allowFieldRelaxation | Agrega ALLOW_FIELD_RELAXATION SchemaUpdateOption a BigQuery LoadJob. Los valores permitidos son true y false .(Opcional. El valor predeterminado es false ).Compatible únicamente con el método de escritura "INDIRECT". | Escribir |
proxyAddress | Dirección del servidor proxy. El proxy debe ser un proxy HTTP y la dirección debe estar en formato "host:puerto". Alternativamente, se puede configurar en la configuración de Spark ( spark.conf.set(...) ) o en la configuración de Hadoop ( fs.gs.proxy.address ).(Opcional. Requerido solo si se conecta a GCP a través de proxy). | Leer/Escribir |
proxyUsername | El nombre de usuario utilizado para conectarse al proxy. Alternativamente, se puede configurar en la configuración de Spark ( spark.conf.set(...) ) o en la configuración de Hadoop ( fs.gs.proxy.username ).(Opcional. Requerido solo si se conecta a GCP a través de un proxy con autenticación). | Leer/Escribir |
proxyPassword | La contraseña utilizada para conectarse al proxy. Alternativamente, se puede configurar en la configuración de Spark ( spark.conf.set(...) ) o en la configuración de Hadoop ( fs.gs.proxy.password ).(Opcional. Requerido solo si se conecta a GCP a través de un proxy con autenticación). | Leer/Escribir |
httpMaxRetry | La cantidad máxima de reintentos para las solicitudes HTTP de bajo nivel a BigQuery. Alternativamente, se puede configurar en la configuración de Spark ( spark.conf.set("httpMaxRetry", ...) ) o en la configuración de Hadoop ( fs.gs.http.max.retry ).(Opcional. El valor predeterminado es 10) | Leer/Escribir |
httpConnectTimeout | El tiempo de espera en milisegundos para establecer una conexión con BigQuery. Alternativamente, se puede configurar en la configuración de Spark ( spark.conf.set("httpConnectTimeout", ...) ) o en la configuración de Hadoop ( fs.gs.http.connect-timeout ).(Opcional. El valor predeterminado es 60000 ms. 0 para un tiempo de espera infinito, un número negativo para 20000) | Leer/Escribir |
httpReadTimeout | El tiempo de espera en milisegundos para leer datos de una conexión establecida. Alternativamente, se puede configurar en la configuración de Spark ( spark.conf.set("httpReadTimeout", ...) ) o en la configuración de Hadoop ( fs.gs.http.read-timeout ).(Opcional. El valor predeterminado es 60000 ms. 0 para un tiempo de espera infinito, un número negativo para 20000) | Leer |
arrowCompressionCodec | Códec de compresión al leer de una tabla de BigQuery cuando se usa el formato Arrow. Opciones: ZSTD (Zstandard compression) , LZ4_FRAME (https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md) , COMPRESSION_UNSPECIFIED . El códec de compresión recomendado es ZSTD cuando se utiliza Java.(Opcional. El valor predeterminado es COMPRESSION_UNSPECIFIED , lo que significa que no se utilizará compresión) | Leer |
responseCompressionCodec | Códec de compresión utilizado para comprimir los datos de ReadRowsResponse. Opciones: RESPONSE_COMPRESSION_CODEC_UNSPECIFIED , RESPONSE_COMPRESSION_CODEC_LZ4 (Opcional. El valor predeterminado es RESPONSE_COMPRESSION_CODEC_UNSPECIFIED , lo que significa que no se utilizará compresión) | Leer |
cacheExpirationTimeInMinutes | El tiempo de vencimiento de la caché en memoria que almacena información de consultas. Para deshabilitar el almacenamiento en caché, establezca el valor en 0. (Opcional. El valor predeterminado es 15 minutos) | Leer |
enableModeCheckForSchemaFields | Comprueba que el modo de cada campo en el esquema de destino sea igual al modo en el esquema del campo de origen correspondiente, durante la escritura DIRECTA. El valor predeterminado es verdadero, es decir, la verificación se realiza de forma predeterminada. Si se establece en falso, se ignora la verificación de modo. | Escribir |
enableListInference | Indica si se debe utilizar la inferencia de esquema específicamente cuando el modo es Parquet (https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions). El valor predeterminado es falso. | Escribir |
bqChannelPoolSize | El tamaño (fijo) del grupo de canales gRPC creado por BigQueryReadClient. Para un rendimiento óptimo, esto debe establecerse en al menos la cantidad de núcleos en los ejecutores del clúster. | Leer |
createReadSessionTimeoutInSeconds | El tiempo de espera en segundos para crear una ReadSession al leer una tabla. Para una mesa extremadamente grande, este valor debe aumentarse. (Opcional. El valor predeterminado es 600 segundos) | Leer |
queryJobPriority | Niveles de prioridad establecidos para el trabajo al leer datos de la consulta de BigQuery. Los valores permitidos son:
(Opcional. El valor predeterminado es INTERACTIVE ) | Leer/Escribir |
destinationTableKmsKeyName | Describe la clave de cifrado de Cloud KMS que se usará para proteger la tabla de BigQuery de destino. La cuenta de servicio de BigQuery asociada a tu proyecto requiere acceso a esta clave de cifrado. Para obtener más información sobre el uso de CMEK con BigQuery, consulte [aquí](https://cloud.google.com/bigquery/docs/customer-managed-encryption#key_resource_id). Aviso: La tabla se cifrará mediante la clave solo si la creó el conector. Una tabla no cifrada preexistente no se cifrará simplemente configurando esta opción. (Opcional) | Escribir |
allowMapTypeConversion | Configuración booleana para deshabilitar la conversión de registros de BigQuery a Spark MapType cuando el registro tiene dos subcampos con nombres de campo como key y value . El valor predeterminado es true , lo que permite la conversión.(Opcional) | Leer |
spark.sql.sources.partitionOverwriteMode | Configuración para especificar el modo de sobrescritura en escritura cuando la tabla está particionada por rango/tiempo. Actualmente se admiten dos modos: STATIC y DYNAMIC . En modo STATIC , se sobrescribe toda la tabla. En el modo DYNAMIC , los datos se sobrescriben con las particiones de la tabla existente. El valor predeterminado es STATIC .(Opcional) | Escribir |
enableReadSessionCaching | Configuración booleana para deshabilitar el almacenamiento en caché de la sesión de lectura. Almacena en caché las sesiones de lectura de BigQuery para permitir una planificación de consultas Spark más rápida. El valor predeterminado es true .(Opcional) | Leer |
readSessionCacheDurationMins | Configure para establecer la duración del almacenamiento en caché de la sesión de lectura en minutos. Solo funciona si enableReadSessionCaching es true (predeterminado). Permite especificar la duración de las sesiones de lectura en caché. El valor máximo permitido es 300 . El valor predeterminado es 5 .(Opcional) | Leer |
bigQueryJobTimeoutInMinutes | Configure para establecer el tiempo de espera del trabajo de BigQuery en minutos. El valor predeterminado es 360 minutos.(Opcional) | Leer/Escribir |
snapshotTimeMillis | Una marca de tiempo especificada en milisegundos que se usará para leer una instantánea de la tabla. De forma predeterminada, esto no está configurado y se lee la última versión de una tabla. (Opcional) | Leer |
bigNumericDefaultPrecision | Una precisión predeterminada alternativa para los campos BigNumeric, ya que el valor predeterminado de BigQuery es demasiado amplio para Spark. Los valores pueden estar entre 1 y 38. Este valor predeterminado se usa solo cuando el campo tiene un tipo BigNumeric sin parámetros. Tenga en cuenta que puede haber pérdida de datos si la precisión de los datos reales es mayor que la especificada. (Opcional) | Leer/Escribir |
bigNumericDefaultScale | Una escala predeterminada alternativa para campos BigNumeric. Los valores pueden estar entre 0 y 38, y menores que bigNumericFieldsPrecision. Este valor predeterminado se usa solo cuando el campo tiene un tipo BigNumeric sin parámetros. Tenga en cuenta que puede haber pérdida de datos si la escala de los datos reales es mayor que la especificada. (Opcional) | Leer/Escribir |
Las opciones también se pueden configurar fuera del código, usando el parámetro --conf
de spark-submit
o el parámetro --properties
de gcloud dataproc submit spark
. Para utilizar esto, anteponga el prefijo spark.datasource.bigquery.
a cualquiera de las opciones, por ejemplo, spark.conf.set("temporaryGcsBucket", "some-bucket")
también se puede configurar como --conf spark.datasource.bigquery.temporaryGcsBucket=some-bucket
.
Con la excepción de DATETIME
y TIME
todos los tipos de datos de BigQuery se asignan al tipo de datos Spark SQL correspondiente. Aquí están todas las asignaciones:
Tipo de datos SQL estándar de BigQuery | Chispa SQL Tipo de datos | Notas |
BOOL | BooleanType | |
INT64 | LongType | |
FLOAT64 | DoubleType | |
NUMERIC | DecimalType | Consulte el soporte numérico y BigNumeric. |
BIGNUMERIC | DecimalType | Consulte el soporte numérico y BigNumeric. |
STRING | StringType | |
BYTES | BinaryType | |
STRUCT | StructType | |
ARRAY | ArrayType | |
TIMESTAMP | TimestampType | |
DATE | DateType | |
DATETIME | StringType , TimestampNTZType * | Spark no tiene tipo DATETIME. La cadena Spark se puede escribir en una columna BQ DATETIME existente siempre que esté en el formato para literales BQ DATETIME. * Para Spark 3.4+, BQ DATETIME se lee como el tipo TimestampNTZ de Spark, es decir, java LocalDateTime |
TIME | LongType , StringType * | Spark no tiene tipo TIME. Los largos generados, que indican microsegundos desde la medianoche, se pueden convertir de forma segura a TimestampType, pero esto hace que la fecha se infiera como el día actual. Por lo tanto, los tiempos se dejan largos y el usuario puede emitir si lo desea. Al transmitir a Timestamp TIME, tengo los mismos problemas de zona horaria que DATETIME * La cadena Spark se puede escribir en una columna BQ TIME existente siempre que esté en el formato para literales BQ TIME. |
JSON | StringType | Spark no tiene tipo JSON. Los valores se leen como cadena. Para volver a escribir JSON en BigQuery, se REQUIEREN las siguientes condiciones:
|
ARRAY<STRUCT<key,value>> | MapType | BigQuery no tiene ningún tipo de MAP, por lo tanto, al igual que otras conversiones como Apache Avro y trabajos de carga de BigQuery, el conector convierte un mapa de Spark en una ESTRUCTURA REPETIDA <clave, valor>. Esto significa que, si bien la escritura y lectura de mapas está disponible, no se admite la ejecución de un SQL en BigQuery que use semántica de mapas. Para consultar los valores del mapa mediante BigQuery SQL, consulta la documentación de BigQuery. Debido a estas incompatibilidades, se aplican algunas restricciones:
|
Se admiten Spark ML Vector y Matrix, incluidas sus versiones densa y dispersa. Los datos se guardan como un REGISTRO de BigQuery. Observe que se agrega un sufijo a la descripción del campo que incluye el tipo de chispa del campo.
Para escribir esos tipos en BigQuery, use el formato intermedio ORC o Avro y téngalos como columna de la Fila (es decir, no como un campo en una estructura).
BigNumeric de BigQuery tiene una precisión de 76,76 (el dígito 77 es parcial) y una escala de 38. Dado que esta precisión y escala están más allá del soporte DecimalType de Spark (escala 38 y precisión 38), significa que los campos BigNumeric con una precisión mayor que 38 no se pueden usar. . Una vez que se actualice esta limitación de Spark, el conector se actualizará en consecuencia.
La conversión Spark Decimal/BigQuery Numeric intenta preservar la parametrización del tipo, es decir, NUMERIC(10,2)
se convertirá a Decimal(10,2)
y viceversa. Sin embargo, observe que hay casos en los que se pierden los parámetros. Esto significa que los parámetros volverán a los valores predeterminados: NUMÉRICOS (38,9) y BIGNUMÉRICOS (76,38). Esto significa que, por el momento, la lectura de BigNumeric solo se admite desde una tabla estándar, pero no desde la vista de BigQuery ni al leer datos de una consulta de BigQuery.
El conector calcula automáticamente los filtros de columna y pushdown de la instrucción SELECT
del DataFrame, por ejemplo
spark.read.bigquery("bigquery-public-data:samples.shakespeare")
.select("word")
.where("word = 'Hamlet' or word = 'Claudius'")
.collect()
filtra a la word
de la columna y presiona hacia abajo el filtro de predicado word = 'hamlet' or word = 'Claudius'
.
Si no desea realizar varias solicitudes de lectura a BigQuery, puede almacenar en caché el DataFrame antes de filtrar, por ejemplo:
val cachedDF = spark.read.bigquery("bigquery-public-data:samples.shakespeare").cache()
val rows = cachedDF.select("word")
.where("word = 'Hamlet'")
.collect()
// All of the table was cached and this doesn't require an API call
val otherRows = cachedDF.select("word_count")
.where("word = 'Romeo'")
.collect()
También puede especificar manualmente la opción filter
, que anulará la inserción automática y Spark hará el resto del filtrado en el cliente.
Las pseudocolumnas _PARTITIONDATE y _PARTITIONTIME no forman parte del esquema de la tabla. Por lo tanto, para realizar consultas por particiones de tablas particionadas, no utilice el método where() que se muestra arriba. En su lugar, agregue una opción de filtro de la siguiente manera:
val df = spark.read.format("bigquery")
.option("filter", "_PARTITIONDATE > '2019-01-01'")
...
.load(TABLE)
De forma predeterminada, el conector crea una partición por cada 400 MB en la tabla que se lee (antes de filtrar). Esto debería corresponder aproximadamente a la cantidad máxima de lectores admitidos por la API de BigQuery Storage. Esto se puede configurar explícitamente con la propiedad maxParallelism
. BigQuery puede limitar la cantidad de particiones según las restricciones del servidor.
Para admitir el seguimiento del uso de los recursos de BigQuery, los conectores ofrecen las siguientes opciones para etiquetar recursos de BigQuery:
El conector puede iniciar trabajos de consulta y carga de BigQuery. Agregar etiquetas a los trabajos se realiza de la siguiente manera:
spark.conf.set("bigQueryJobLabel.cost_center", "analytics")
spark.conf.set("bigQueryJobLabel.usage", "nightly_etl")
Esto creará etiquetas cost_center
= analytics
y usage
= nightly_etl
.
Se utiliza para anotar las sesiones de lectura y escritura. El ID de seguimiento tiene el formato Spark:ApplicationName:JobID
. Esta es una opción de participación voluntaria y, para usarla, el usuario debe configurar la propiedad traceApplicationName
. JobID se genera automáticamente mediante el ID de trabajo de Dataproc, con un respaldo al ID de la aplicación Spark (como application_1648082975639_0001
). El ID del trabajo se puede anular configurando la opción traceJobId
. Tenga en cuenta que la longitud total del ID de seguimiento no puede superar los 256 caracteres.
El conector se puede utilizar en portátiles Jupyter incluso si no está instalado en el clúster Spark. Se puede agregar como un jar externo usando el siguiente código:
Pitón:
from pyspark . sql import SparkSession
spark = SparkSession . builder
. config ( "spark.jars.packages" , "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0" )
. getOrCreate ()
df = spark . read . format ( "bigquery" )
. load ( "dataset.table" )
Escala:
val spark = SparkSession .builder
.config( " spark.jars.packages " , " com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0 " )
.getOrCreate()
val df = spark.read.format( " bigquery " )
.load( " dataset.table " )
En caso de que el clúster Spark use Scala 2.12 (es opcional para Spark 2.4.x, obligatorio en 3.0.x), entonces el paquete relevante es com.google.cloud.spark:spark-bigquery-with-dependencies_ 2.12 :0.41.0. Para saber qué versión de Scala se utiliza, ejecute el siguiente código:
Pitón:
spark . sparkContext . _jvm . scala . util . Properties . versionString ()
Escala:
scala . util . Properties . versionString
A menos que desee utilizar la API de Scala implícita spark.read.bigquery("TABLE_ID")
, no es necesario compilar con el conector.
Para incluir el conector en su proyecto:
< dependency >
< groupId >com.google.cloud.spark</ groupId >
< artifactId >spark-bigquery-with-dependencies_${scala.version}</ artifactId >
< version >0.41.0</ version >
</ dependency >
libraryDependencies + = " com.google.cloud.spark " %% " spark-bigquery-with-dependencies " % " 0.41.0 "
Spark completa muchas métricas que el usuario final puede encontrar en la página del historial de Spark. Pero todas estas métricas están relacionadas con la chispa y se recopilan implícitamente sin ningún cambio en el conector. Pero hay pocas métricas que se completan desde BigQuery y que actualmente son visibles en los registros de la aplicación que se pueden leer en los registros del controlador/ejecutor.
Desde Spark 3.2 en adelante, Spark ha proporcionado la API para exponer métricas personalizadas en la página de la interfaz de usuario de Spark https://spark.apache.org/docs/3.2.0/api/java/org/apache/spark/sql/connector/metric /CustomMetric.html
Actualmente, utilizando esta API, el conector expone las siguientes métricas BigQuery durante la lectura
<Syle> Tabla#Metricstable TD, Table Th {Word-Break: Break-Word} </style>Nombre métrico | Descripción |
---|---|
bytes read | Número de bytes de BigQuery leído |
rows read | Número de filas de grandes lecturas |
scan time | La cantidad de tiempo dedicado a la respuesta de las filas de lectura solicitada para obtener en todos los ejecutores, en milisegundos. |
parse time | La cantidad de tiempo dedicado a analizar las filas se lee en todos los ejecutores, en milisegundos. |
spark time | La cantidad de tiempo que pasó en Spark para procesar las consultas (es decir, aparte de escanear y análisis), en todos los ejecutores, en milisegundos. |
Nota: Para usar las métricas en la página de UI de Spark, debe asegurarse de que el spark-bigquery-metrics-0.41.0.jar
sea la ruta de clase antes de iniciar el servidor del historial y la versión del conector es spark-3.2
o superior.
Vea la documentación de precios BigQuery.
Puede establecer manualmente el número de particiones con la propiedad maxParallelism
. BigQuery puede proporcionar menos particiones de las que solicita. Consulte la configuración de la partición.
También siempre puede reparar después de leer en Spark.
Si hay demasiadas particiones, se pueden exceder las cuotas de creación o cuotas de rendimiento. Esto ocurre porque mientras los datos dentro de cada partición se procesan en serie, las particiones independientes pueden procesarse en paralelo en diferentes nodos dentro del clúster Spark. En general, para garantizar el máximo rendimiento sostenido, debe presentar una solicitud de aumento de cuotas. Sin embargo, también puede reducir manualmente el número de particiones que se escriben llamando coalesce
en el marco de datos para mitigar este problema.
desiredPartitionCount = 5
dfNew = df.coalesce(desiredPartitionCount)
dfNew.write
Una regla general es tener un manejo de una sola partición al menos 1 GB de datos.
También tenga en cuenta que un trabajo que se ejecuta con la propiedad writeAtLeastOnce
activada no encontrará errores de cuotas CreateWriteReam.
El conector necesita una instancia de una googlecredentials para conectarse a las API BigQuery. Hay múltiples opciones para proporcionarlo:
GOOGLE_APPLICATION_CREDENTIALS
, como se describe aquí. // Globally
spark.conf.set("credentialsFile", "</path/to/key/file>")
// Per read/Write
spark.read.format("bigquery").option("credentialsFile", "</path/to/key/file>")
// Globally
spark.conf.set("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
// Per read/Write
spark.read.format("bigquery").option("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
gcpAccessTokenProvider
. AccessTokenProvider
debe implementarse en Java u otro idioma JVM como Scala o Kotlin. Debe tener un constructor no arg o un constructor que acepte un solo argumento java.util.String
. Este parámetro de configuración se puede suministrar utilizando la opción gcpAccessTokenProviderConfig
. Si esto no se proporciona, entonces se llamará al constructor no Arg. El JAR que contiene la implementación debe estar en el ClassPath del clúster. // Globally
spark.conf.set("gcpAccessTokenProvider", "com.example.ExampleAccessTokenProvider")
// Per read/Write
spark.read.format("bigquery").option("gcpAccessTokenProvider", "com.example.ExampleAccessTokenProvider")
La suplantación de la cuenta de servicio se puede configurar para un nombre de usuario específico y un nombre de grupo, o para todos los usuarios de forma predeterminada utilizando las propiedades a continuación:
gcpImpersonationServiceAccountForUser_<USER_NAME>
(no configurado de forma predeterminada)
La suplantación de la cuenta de servicio para un usuario específico.
gcpImpersonationServiceAccountForGroup_<GROUP_NAME>
(no configurado de forma predeterminada)
La suplantación de la cuenta de servicio para un grupo específico.
gcpImpersonationServiceAccount
(no establecido por defecto)
Isplantación de cuenta de servicio predeterminada para todos los usuarios.
Si alguna de las propiedades anteriores se establece, la cuenta de servicio especificada se suplicará generando credenciales de corta duración al acceder a BigQuery.
Si se establece más de una propiedad, la cuenta de servicio asociada con el nombre de usuario tendrá prioridad sobre la cuenta de servicio asociada con el nombre del grupo para un usuario y grupo coincidente, lo que a su vez tendrá prioridad sobre la suplantación de la cuenta de servicio predeterminada.
Para una aplicación más simple, donde no se requiere actualización del token de acceso, otra alternativa es pasar el token de acceso como la opción de configuración de gcpAccessToken
. Puede obtener el token de acceso ejecutando gcloud auth application-default print-access-token
.
// Globally
spark.conf.set("gcpAccessToken", "<access-token>")
// Per read/Write
spark.read.format("bigquery").option("gcpAccessToken", "<acccess-token>")
IMPORTANTE: CredentialsProvider
y AccessTokenProvider
deben implementarse en Java u otro idioma JVM como Scala o Kotlin. El JAR que contiene la implementación debe estar en el ClassPath del clúster.
Aviso: solo se debe proporcionar una de las opciones anteriores.
Para conectarse a un proxy de reenvío y autenticar las credenciales del usuario, configure las siguientes opciones.
proxyAddress
: dirección del servidor proxy. El proxy debe ser un proxy HTTP y la dirección debe estar en el formato host:port
.
proxyUsername
: el nombre de usuario utilizado para conectarse al proxy.
proxyPassword
: la contraseña utilizada para conectarse al proxy.
val df = spark.read.format("bigquery")
.option("proxyAddress", "http://my-proxy:1234")
.option("proxyUsername", "my-username")
.option("proxyPassword", "my-password")
.load("some-table")
Los mismos parámetros proxy también se pueden establecer a nivel mundial utilizando RuntimeConfig de Spark como este:
spark.conf.set("proxyAddress", "http://my-proxy:1234")
spark.conf.set("proxyUsername", "my-username")
spark.conf.set("proxyPassword", "my-password")
val df = spark.read.format("bigquery")
.load("some-table")
También puede establecer lo siguiente en la configuración de Hadoop.
fs.gs.proxy.address
(similar a "proxyaddress"), fs.gs.proxy.username
(similar a "proxyusername") y fs.gs.proxy.password
(similar a "proxypassword").
Si el mismo parámetro se establece en múltiples lugares, el orden de prioridad es el siguiente:
opción ("clave", "valor")> Spark.conf> Configuración de Hadoop