O conector oferece suporte à leitura de tabelas do Google BigQuery nos DataFrames do Spark e à gravação de DataFrames de volta no BigQuery. Isso é feito usando a API Spark SQL Data Source para se comunicar com o BigQuery.
A API Storage transmite dados em paralelo diretamente do BigQuery via gRPC sem usar o Google Cloud Storage como intermediário.
Ele tem uma série de vantagens em relação ao fluxo de leitura anterior baseado em exportação que geralmente deve levar a um melhor desempenho de leitura:
Não deixa nenhum arquivo temporário no Google Cloud Storage. As linhas são lidas diretamente dos servidores do BigQuery usando os formatos de ligação Arrow ou Avro.
A nova API permite que a filtragem de colunas e predicados leia apenas os dados de seu interesse.
Como o BigQuery é apoiado por um armazenamento de dados colunar, ele pode transmitir dados com eficiência sem ler todas as colunas.
A API Storage oferece suporte ao empilhamento arbitrário de filtros predicados. A versão 0.8.0-beta e superior do conector oferece suporte ao pushdown de filtros arbitrários para o Bigquery.
Há um problema conhecido no Spark que não permite o empilhamento de filtros em campos aninhados. Por exemplo - filtros como address.city = "Sunnyvale"
não serão empilhados no Bigquery.
A API reequilibra os registros entre os leitores até que todos sejam concluídos. Isso significa que todas as fases do Mapa terminarão quase simultaneamente. Consulte este artigo do blog sobre como a fragmentação dinâmica é usada de forma semelhante no Google Cloud Dataflow.
Consulte Configurando o particionamento para obter mais detalhes.
Siga estas instruções.
Se você não tiver um ambiente Apache Spark, poderá criar um cluster do Cloud Dataproc com autenticação pré-configurada. Os exemplos a seguir pressupõem que você esteja usando o Cloud Dataproc, mas é possível usar spark-submit
em qualquer cluster.
Qualquer cluster do Dataproc que usa a API precisa dos escopos 'bigquery' ou 'cloud-platform'. Os clusters do Dataproc têm o escopo 'bigquery' por padrão. Portanto, a maioria dos clusters em projetos habilitados deve funcionar por padrão, por exemplo
MY_CLUSTER=...
gcloud dataproc clusters create "$MY_CLUSTER"
A versão mais recente do conector está disponível publicamente nos seguintes links:
versão | Link |
---|---|
Faísca 3.5 | gs://spark-lib/bigquery/spark-3.5-bigquery-0.41.0.jar (link HTTP) |
Faísca 3.4 | gs://spark-lib/bigquery/spark-3.4-bigquery-0.41.0.jar (link HTTP) |
Faísca 3.3 | gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar (link HTTP) |
Faísca 3.2 | gs://spark-lib/bigquery/spark-3.2-bigquery-0.41.0.jar (link HTTP) |
Faísca 3.1 | gs://spark-lib/bigquery/spark-3.1-bigquery-0.41.0.jar (link HTTP) |
Faísca 2.4 | gs://spark-lib/bigquery/spark-2.4-bigquery-0.37.0.jar (link HTTP) |
Escala 2.13 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-0.41.0.jar (link HTTP) |
Escala 2.12 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.41.0.jar (link HTTP) |
Escala 2.11 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.29.0.jar (link HTTP) |
As primeiras seis versões são conectores baseados em Java direcionados ao Spark 2.4/3.1/3.2/3.3/3.4/3.5 de todas as versões do Scala criadas nas novas APIs de fonte de dados (API de fonte de dados v2) do Spark.
Os dois conectores finais são conectores baseados em Scala. Use o jar relevante para sua instalação do Spark conforme descrito abaixo.
Conector Faísca | 2.3 | 2.4 | 3,0 | 3.1 | 3.2 | 3.3 | 3.4 | 3.5 |
---|---|---|---|---|---|---|---|---|
faísca-3.5-bigquery | ✓ | |||||||
faísca-3.4-bigquery | ✓ | ✓ | ||||||
spark-3.3-bigquery | ✓ | ✓ | ✓ | |||||
spark-3.2-bigquery | ✓ | ✓ | ✓ | ✓ | ||||
spark-3.1-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | |||
spark-2.4-bigquery | ✓ | |||||||
spark-bigquery-com-dependências_2.13 | ✓ | ✓ | ✓ | ✓ | ||||
spark-bigquery-com-dependências_2.12 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | |
spark-bigquery-com-dependências_2.11 | ✓ | ✓ |
Conector Imagem do Dataproc | 1.3 | 1.4 | 1,5 | 2,0 | 2.1 | 2.2 | Sem servidor Imagem 1.0 | Sem servidor Imagem 2.0 | Sem servidor Imagem 2.1 | Sem servidor Imagem 2.2 |
---|---|---|---|---|---|---|---|---|---|---|
faísca-3.5-bigquery | ✓ | ✓ | ||||||||
faísca-3.4-bigquery | ✓ | ✓ | ✓ | |||||||
spark-3.3-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ||||
spark-3.2-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ||||
spark-3.1-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | |||
spark-2.4-bigquery | ✓ | ✓ | ||||||||
spark-bigquery-com-dependências_2.13 | ✓ | ✓ | ✓ | |||||||
spark-bigquery-com-dependências_2.12 | ✓ | ✓ | ✓ | ✓ | ✓ | |||||
spark-bigquery-com-dependências_2.11 | ✓ | ✓ |
O conector também está disponível no repositório Maven Central. Ele pode ser usado usando a opção --packages
ou a propriedade de configuração spark.jars.packages
. Use o seguinte valor
versão | Artefato de conector |
---|---|
Faísca 3.5 | com.google.cloud.spark:spark-3.5-bigquery:0.41.0 |
Faísca 3.4 | com.google.cloud.spark:spark-3.4-bigquery:0.41.0 |
Faísca 3.3 | com.google.cloud.spark:spark-3.3-bigquery:0.41.0 |
Faísca 3.2 | com.google.cloud.spark:spark-3.2-bigquery:0.41.0 |
Faísca 3.1 | com.google.cloud.spark:spark-3.1-bigquery:0.41.0 |
Faísca 2.4 | com.google.cloud.spark:spark-2.4-bigquery:0.37.0 |
Escala 2.13 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.13:0.41.0 |
Escala 2.12 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0 |
Escala 2.11 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.29.0 |
Os clusters do Dataproc criados usando a imagem 2.1 e superior ou os lotes que usam o serviço sem servidor do Dataproc vêm com o conector Spark BigQuery integrado. Usar o padrão --jars
ou --packages
(ou alternativamente, a configuração spark.jars
/ spark.jars.packages
) não ajudará neste caso, pois o conector integrado tem precedência.
Para usar outra versão que não a integrada, siga um destes procedimentos:
--metadata SPARK_BQ_CONNECTOR_VERSION=0.41.0
ou --metadata SPARK_BQ_CONNECTOR_URL=gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar
para criar o cluster com um jar diferente. A URL pode apontar para qualquer JAR de conector válido para a versão Spark do cluster.--properties dataproc.sparkBqConnector.version=0.41.0
ou --properties dataproc.sparkBqConnector.uri=gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar
para criar o lote com um jar diferente. A URL pode apontar para qualquer JAR de conector válido para a versão Spark do runtime. Você pode executar uma contagem de palavras simples do PySpark na API sem compilação executando
Imagem do Dataproc 1.5 e superior
gcloud dataproc jobs submit pyspark --cluster "$MY_CLUSTER"
--jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.41.0.jar
examples/python/shakespeare.py
Imagem do Dataproc 1.4 e anteriores
gcloud dataproc jobs submit pyspark --cluster "$MY_CLUSTER"
--jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.29.0.jar
examples/python/shakespeare.py
https://codelabs.developers.google.com/codelabs/pyspark-bigquery
O conector usa a API de fonte de dados Spark SQL de linguagem cruzada:
df = spark.read
.format("bigquery")
.load("bigquery-public-data.samples.shakespeare")
ou a API implícita apenas do Scala:
import com.google.cloud.spark.bigquery._
val df = spark.read.bigquery("bigquery-public-data.samples.shakespeare")
Para obter mais informações, consulte exemplos de código adicionais em Python, Scala e Java.
O conector permite executar qualquer consulta SQL SELECT padrão no BigQuery e buscar seus resultados diretamente em um Spark Dataframe. Isso é feito facilmente conforme descrito no exemplo de código a seguir:
spark.conf.set("viewsEnabled","true")
spark.conf.set("materializationDataset","<dataset>")
sql = """
SELECT tag, COUNT(*) c
FROM (
SELECT SPLIT(tags, '|') tags
FROM `bigquery-public-data.stackoverflow.posts_questions` a
WHERE EXTRACT(YEAR FROM creation_date)>=2014
), UNNEST(tags) tag
GROUP BY 1
ORDER BY 2 DESC
LIMIT 10
"""
df = spark.read.format("bigquery").load(sql)
df.show()
O que produz o resultado
+----------+-------+
| tag| c|
+----------+-------+
|javascript|1643617|
| python|1352904|
| java|1218220|
| android| 913638|
| php| 911806|
| c#| 905331|
| html| 769499|
| jquery| 608071|
| css| 510343|
| c++| 458938|
+----------+-------+
Uma segunda opção é usar a opção query
assim:
df = spark.read.format("bigquery").option("query", sql).load()
Observe que a execução deve ser mais rápida, pois apenas o resultado é transmitido pela rede. De maneira semelhante, as consultas podem incluir JOINs com mais eficiência do que executar junções no Spark ou usar outros recursos do BigQuery, como subconsultas, funções definidas pelo usuário do BigQuery, tabelas curinga, BigQuery ML e muito mais.
Para usar este recurso, as seguintes configurações DEVEM ser definidas:
viewsEnabled
deve ser definido como true
.materializationDataset
deve ser definido como um conjunto de dados em que o usuário do GCP tenha permissão de criação de tabela. materializationProject
é opcional. Observação: conforme mencionado na documentação do BigQuery, as tabelas consultadas devem estar no mesmo local que materializationDataset
. Além disso, se as tabelas na SQL statement
forem de projetos diferentes de parentProject
, use o nome de tabela totalmente qualificado, ou seja, [project].[dataset].[table]
.
Importante: esse recurso é implementado executando a consulta no BigQuery e salvando o resultado em uma tabela temporária, da qual o Spark lerá os resultados. Isso pode adicionar custos adicionais à sua conta do BigQuery.
O conector tem suporte preliminar para leitura de visualizações do BigQuery. Observe que existem algumas advertências:
collect()
ou count()
.materializationProject
e materializationDataset
, respectivamente. Essas opções também podem ser definidas globalmente chamando spark.conf.set(...)
antes de ler as visualizações..option("viewsEnabled", "true")
) ou defina-a globalmente chamando spark.conf.set("viewsEnabled", "true")
.materializationDataset
deve estar no mesmo local da visualização.A gravação de DataFrames no BigQuery pode ser feita usando dois métodos: Direto e Indireto.
Neste método, os dados são gravados diretamente no BigQuery usando a API BigQuery Storage Write. Para habilitar esta opção, defina a opção writeMethod
como direct
, conforme mostrado abaixo:
df.write
.format("bigquery")
.option("writeMethod", "direct")
.save("dataset.table")
A gravação em tabelas particionadas existentes (particionadas por data, particionadas por tempo de ingestão e particionadas por intervalo) no modo de salvamento APPEND e no modo OVERWRITE (somente particionadas por data e intervalo) é totalmente compatível com o conector e a API BigQuery Storage Write. O uso de datePartition
, partitionField
, partitionType
, partitionRangeStart
, partitionRangeEnd
, partitionRangeInterval
descrito abaixo não é suportado neste momento pelo método de gravação direta.
Importante: consulte a página de preços de ingestão de dados sobre os preços da API BigQuery Storage Write.
Importante: Use a versão 0.24.2 e superior para gravações diretas, pois as versões anteriores possuem um bug que pode causar a exclusão da tabela em certos casos.
Neste método, os dados são gravados primeiro no GCS e depois carregados no BigQuery. Um bucket do GCS deve ser configurado para indicar o local temporário dos dados.
df.write
.format("bigquery")
.option("temporaryGcsBucket","some-bucket")
.save("dataset.table")
Os dados são armazenados temporariamente usando os formatos Apache Parquet, Apache ORC ou Apache Avro.
O bucket do GCS e o formato também podem ser definidos globalmente usando o RuntimeConfig do Spark assim:
spark.conf.set("temporaryGcsBucket","some-bucket")
df.write
.format("bigquery")
.save("dataset.table")
Ao fazer streaming de um DataFrame para o BigQuery, cada lote é gravado da mesma maneira que um DataFrame sem streaming. Observe que um local de ponto de verificação compatível com HDFS (por exemplo: path/to/HDFS/dir
ou gs://checkpoint-bucket/checkpointDir
) deve ser especificado.
df.writeStream
.format("bigquery")
.option("temporaryGcsBucket","some-bucket")
.option("checkpointLocation", "some-location")
.option("table", "dataset.table")
Importante: O conector não configura o conector GCS, para evitar conflito com outro conector GCS, se existir. Para usar os recursos de gravação do conector, configure o conector GCS em seu cluster conforme explicado aqui.
A API oferece suporte a várias opções para configurar a leitura
<style> tabela#propertytable td, tabela th { quebra de palavra: quebra de palavra } </style>Propriedade | Significado | Uso |
---|---|---|
table | A tabela do BigQuery no formato [[project:]dataset.]table . Recomenda-se usar o parâmetro de path load() / save() . Esta opção foi descontinuada e será removida em uma versão futura.(Descontinuado) | Ler/Escrever |
dataset | O conjunto de dados que contém a tabela. Esta opção deve ser usada com tabelas e visualizações padrão, mas não ao carregar resultados de consulta. (Opcional, a menos que omitido na table ) | Ler/Escrever |
project | O ID do projeto do Google Cloud da tabela. Esta opção deve ser usada com tabelas e visualizações padrão, mas não ao carregar resultados de consulta. (Opcional. O padrão é o projeto da conta de serviço que está sendo usada) | Ler/Escrever |
parentProject | O ID do projeto do Google Cloud da tabela a ser faturada pela exportação. (Opcional. O padrão é o projeto da conta de serviço que está sendo usada) | Ler/Escrever |
maxParallelism | O número máximo de partições nas quais dividir os dados. O número real poderá ser menor se o BigQuery considerar os dados pequenos o suficiente. Se não houver executores suficientes para agendar um leitor por partição, algumas partições poderão ficar vazias. Importante: O parâmetro antigo ( parallelism ) ainda é suportado, mas em modo obsoleto. Ele será removido na versão 1.0 do conector.(Opcional. O padrão é o maior entre MinParallelism preferido e 20.000).) | Ler |
preferredMinParallelism | O número mínimo preferido de partições para dividir os dados. O número real poderá ser menor se o BigQuery considerar os dados pequenos o suficiente. Se não houver executores suficientes para agendar um leitor por partição, algumas partições poderão ficar vazias. (Opcional. O padrão é o menor entre 3 vezes o paralelismo e maxParallelism padrão do aplicativo.) | Ler |
viewsEnabled | Permite que o conector leia visualizações e não apenas tabelas. Por favor, leia a seção relevante antes de ativar esta opção. (Opcional. O padrão é false ) | Ler |
materializationProject | O ID do projeto onde a visão materializada será criada (Opcional. O padrão é o ID do projeto da visualização) | Ler |
materializationDataset | O conjunto de dados onde a visão materializada será criada. Este conjunto de dados deve estar no mesmo local que a visualização ou as tabelas consultadas. (Opcional. O padrão é visualizar o conjunto de dados) | Ler |
materializationExpirationTimeInMinutes | O tempo de expiração da tabela temporária que contém os dados materializados de uma visualização ou consulta, em minutos. Observe que o conector pode reutilizar a tabela temporária devido ao uso do cache local e para reduzir a computação do BigQuery. Portanto, valores muito baixos podem causar erros. O valor deve ser um número inteiro positivo. (Opcional. O padrão é 1440 ou 24 horas) | Ler |
readDataFormat | Formato de dados para leitura do BigQuery. Opções: ARROW , AVRO (Opcional. O padrão é ARROW ) | Ler |
optimizedEmptyProjection | O conector usa uma lógica de projeção vazia otimizada (seleção sem colunas), usada para execução count() . Esta lógica pega os dados diretamente dos metadados da tabela ou executa um `SELECT COUNT(*) WHERE...` muito eficiente caso haja um filtro. Você pode cancelar o uso desta lógica definindo esta opção como false .(Opcional, o padrão é true ) | Ler |
pushAllFilters | Se definido como true , o conector envia todos os filtros que o Spark pode delegar à API BigQuery Storage. Isso reduz a quantidade de dados que precisam ser enviados dos servidores da API BigQuery Storage para clientes Spark. Esta opção foi descontinuada e será removida em uma versão futura.(Opcional, o padrão é true )(Descontinuado) | Ler |
bigQueryJobLabel | Pode ser usado para adicionar rótulos à consulta iniciada pelo conector e carregar jobs do BigQuery. Vários rótulos podem ser definidos. (Opcional) | Ler |
bigQueryTableLabel | Pode ser usado para adicionar rótulos à tabela enquanto grava em uma tabela. Vários rótulos podem ser definidos. (Opcional) | Escrever |
traceApplicationName | Nome do aplicativo usado para rastrear sessões de leitura e gravação do BigQuery Storage. A configuração do nome do aplicativo é necessária para configurar o ID de rastreamento nas sessões. (Opcional) | Ler |
traceJobId | ID do job usado para rastrear sessões de leitura e gravação do BigQuery Storage. (Opcional, o padrão é o ID do job do Dataproc, caso contrário, usa o ID do aplicativo Spark) | Ler |
createDisposition | Especifica se o trabalho tem permissão para criar novas tabelas. Os valores permitidos são:
(Opcional. O padrão é CREATE_IF_NEEDED). | Escrever |
writeMethod | Controla o método no qual os dados são gravados no BigQuery. Os valores disponíveis são direct para usar a API BigQuery Storage Write e indirect , que gravam os dados primeiro no GCS e depois acionam uma operação de carregamento do BigQuery. Veja mais aqui(Opcional, o padrão é indirect ) | Escrever |
writeAtLeastOnce | Garante que os dados sejam gravados no BigQuery pelo menos uma vez. Esta é uma garantia menor do que exatamente uma vez. Isso é adequado para cenários de streaming nos quais os dados são gravados continuamente em pequenos lotes. (Opcional. O padrão é false )Suportado apenas pelo método de gravação `DIRECT` e o modo NÃO é `Overwrite`. | Escrever |
temporaryGcsBucket | O bucket do GCS que contém temporariamente os dados antes de serem carregados no BigQuery. Obrigatório, a menos que seja definido na configuração do Spark ( spark.conf.set(...) ).Não suportado pelo método de gravação `DIRECT`. | Escrever |
persistentGcsBucket | O bucket do GCS que contém os dados antes de serem carregados no BigQuery. Se informado, os dados não serão excluídos após gravá-los no BigQuery. Não suportado pelo método de gravação `DIRECT`. | Escrever |
persistentGcsPath | O caminho do GCS que contém os dados antes de serem carregados no BigQuery. Usado apenas com persistentGcsBucket .Não suportado pelo método de gravação `DIRECT`. | Escrever |
intermediateFormat | O formato dos dados antes de serem carregados no BigQuery. Os valores podem ser "parquet", "orc" ou "avro". Para usar o formato Avro, o pacote spark-avro deve ser adicionado em tempo de execução. (Opcional. O padrão é parquet ). Somente na gravação. Suportado apenas para o método de gravação `INDIRETO`. | Escrever |
useAvroLogicalTypes | Ao carregar do Avro (`.option("intermediateFormat", "avro")`), o BigQuery usa os tipos Avro subjacentes em vez dos tipos lógicos [por padrão](https://cloud.google.com/bigquery/docs/ carregando-dados-armazenamento em nuvem-avro#logical_types). Fornecer esta opção converte os tipos lógicos Avro em seus tipos de dados correspondentes do BigQuery. (Opcional. O padrão é false ). Somente na gravação. | Escrever |
datePartition | A partição de data na qual os dados serão gravados. Deve ser uma string de data fornecida no formato YYYYMMDD . Pode ser usado para sobrescrever os dados de uma única partição, assim:
(Opcional). Somente na gravação. Também pode ser usado com diferentes tipos de partição, como: HORA: YYYYMMDDHH MÊS: YYYYMM ANO: YYYY Não suportado pelo método de gravação `DIRECT`. | Escrever |
partitionField | Se este campo for especificado, a tabela será particionada por este campo. Para particionamento de tempo, especifique junto com a opção `partitionType`. Para particionamento de intervalo inteiro, especifique junto com as 3 opções: `partitionRangeStart`, `partitionRangeEnd, `partitionRangeInterval`. O campo deve ser um campo TIMESTAMP ou DATE de nível superior para particionamento de tempo ou INT64 para particionamento de intervalo inteiro. Seu modo deve ser NULLABLE ou REQUIRED . Se a opção não estiver definida para uma tabela particionada por tempo, a tabela será particionada por pseudocoluna, referenciada por '_PARTITIONTIME' as TIMESTAMP ou '_PARTITIONDATE' as DATE .(Opcional). Não suportado pelo método de gravação `DIRECT`. | Escrever |
partitionExpirationMs | Número de milissegundos durante os quais manter o armazenamento das partições na tabela. O armazenamento em uma partição terá um tempo de expiração igual ao tempo de partição mais este valor. (Opcional). Não suportado pelo método de gravação `DIRECT`. | Escrever |
partitionType | Usado para especificar o particionamento de tempo. Os tipos suportados são: HOUR, DAY, MONTH, YEAR Esta opção é obrigatória para que uma tabela de destino seja particionada por tempo. (Opcional. O padrão é DAY se PartitionField for especificado). Não suportado pelo método de gravação `DIRECT`. | Escrever |
partitionRangeStart , partitionRangeEnd , partitionRangeInterval | Usado para especificar o particionamento de intervalo inteiro. Essas opções são obrigatórias para que uma tabela de destino seja particionada em intervalo inteiro. Todas as 3 opções devem ser especificadas. Não suportado pelo método de gravação `DIRECT`. | Escrever |
clusteredFields | Uma sequência de colunas de nível superior não repetidas, separadas por vírgula. (Opcional). | Escrever |
allowFieldAddition | Adiciona o ALLOW_FIELD_ADDITION SchemaUpdateOption ao BigQuery LoadJob. Os valores permitidos são true e false .(Opcional. O padrão é false ).Suportado apenas pelo método de gravação `INDIRETO`. | Escrever |
allowFieldRelaxation | Adiciona o ALLOW_FIELD_RELAXATION SchemaUpdateOption ao BigQuery LoadJob. Os valores permitidos são true e false .(Opcional. O padrão é false ).Suportado apenas pelo método de gravação `INDIRETO`. | Escrever |
proxyAddress | Endereço do servidor proxy. O proxy deve ser um proxy HTTP e o endereço deve estar no formato `host:port`. Pode ser definido alternativamente na configuração do Spark ( spark.conf.set(...) ) ou na configuração do Hadoop ( fs.gs.proxy.address ).(Opcional. Obrigatório apenas se estiver conectado ao GCP via proxy.) | Ler/Escrever |
proxyUsername | O userName usado para conectar-se ao proxy. Pode ser definido alternativamente na configuração do Spark ( spark.conf.set(...) ) ou na configuração do Hadoop ( fs.gs.proxy.username ).(Opcional. Obrigatório apenas se estiver conectado ao GCP por meio de proxy com autenticação.) | Ler/Escrever |
proxyPassword | A senha usada para conectar-se ao proxy. Pode ser definido alternativamente na configuração do Spark ( spark.conf.set(...) ) ou na configuração do Hadoop ( fs.gs.proxy.password ).(Opcional. Obrigatório apenas se estiver conectado ao GCP por meio de proxy com autenticação.) | Ler/Escrever |
httpMaxRetry | O número máximo de novas tentativas para solicitações HTTP de baixo nível para o BigQuery. Pode ser definido alternativamente na configuração do Spark ( spark.conf.set("httpMaxRetry", ...) ) ou na configuração do Hadoop ( fs.gs.http.max.retry ).(Opcional. O padrão é 10) | Ler/Escrever |
httpConnectTimeout | O tempo limite em milissegundos para estabelecer uma conexão com o BigQuery. Pode ser definido alternativamente na configuração do Spark ( spark.conf.set("httpConnectTimeout", ...) ) ou na configuração do Hadoop ( fs.gs.http.connect-timeout ).(Opcional. O padrão é 60.000 ms. 0 para um tempo limite infinito, um número negativo para 20.000) | Ler/Escrever |
httpReadTimeout | O tempo limite em milissegundos para ler dados de uma conexão estabelecida. Pode ser definido alternativamente na configuração do Spark ( spark.conf.set("httpReadTimeout", ...) ) ou na configuração do Hadoop ( fs.gs.http.read-timeout ).(Opcional. O padrão é 60.000 ms. 0 para um tempo limite infinito, um número negativo para 20.000) | Ler |
arrowCompressionCodec | Codec de compactação durante a leitura de uma tabela do BigQuery ao usar o formato Arrow. Opções: ZSTD (Zstandard compression) , LZ4_FRAME (https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md) , COMPRESSION_UNSPECIFIED . O codec de compactação recomendado é ZSTD ao usar Java.(Opcional. O padrão é COMPRESSION_UNSPECIFIED , o que significa que nenhuma compactação será usada) | Ler |
responseCompressionCodec | Codec de compactação usado para compactar os dados ReadRowsResponse. Opções: RESPONSE_COMPRESSION_CODEC_UNSPECIFIED , RESPONSE_COMPRESSION_CODEC_LZ4 (Opcional. O padrão é RESPONSE_COMPRESSION_CODEC_UNSPECIFIED , o que significa que nenhuma compactação será usada) | Ler |
cacheExpirationTimeInMinutes | O tempo de expiração do cache na memória que armazena informações de consulta. Para desativar o cache, defina o valor como 0. (Opcional. O padrão é 15 minutos) | Ler |
enableModeCheckForSchemaFields | Verifica se o modo de cada campo no esquema de destino é igual ao modo no esquema do campo de origem correspondente, durante a gravação DIRETA. O valor padrão é verdadeiro, ou seja, a verificação é feita por padrão. Se definido como falso, a verificação de modo será ignorada. | Escrever |
enableListInference | Indica se a inferência de esquema deve ser usada especificamente quando o modo é Parquet (https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions). O padrão é falso. | Escrever |
bqChannelPoolSize | O tamanho (fixo) do pool de canais gRPC criado pelo BigQueryReadClient. Para um desempenho ideal, isso deve ser definido como pelo menos o número de núcleos nos executores do cluster. | Ler |
createReadSessionTimeoutInSeconds | O tempo limite em segundos para criar uma ReadSession ao ler uma tabela. Para tabelas extremamente grandes este valor deve ser aumentado. (Opcional. O padrão é 600 segundos) | Ler |
queryJobPriority | Níveis de prioridade definidos para o trabalho durante a leitura de dados da consulta do BigQuery. Os valores permitidos são:
(Opcional. O padrão é INTERACTIVE ) | Ler/Escrever |
destinationTableKmsKeyName | Descreve a chave de criptografia do Cloud KMS que será usada para proteger a tabela de destino do BigQuery. A conta de serviço do BigQuery associada ao seu projeto requer acesso a essa chave de criptografia. para mais informações sobre como usar o CMEK com o BigQuery, consulte [aqui](https://cloud.google.com/bigquery/docs/customer-owned-encryption#key_resource_id). Aviso: A tabela será criptografada pela chave somente se for criada pelo conector. Uma tabela não criptografada pré-existente não será criptografada apenas definindo esta opção. (Opcional) | Escrever |
allowMapTypeConversion | Configuração booleana para desativar a conversão de registros do BigQuery para Spark MapType quando o registro tiver dois subcampos com nomes de campos como key e value . O valor padrão é true , o que permite a conversão.(Opcional) | Ler |
spark.sql.sources.partitionOverwriteMode | Configuração para especificar o modo de substituição na gravação quando a tabela é particionada por intervalo/tempo. Atualmente são suportados dois modos: STATIC e DYNAMIC . No modo STATIC , toda a tabela é sobrescrita. No modo DYNAMIC , os dados são substituídos por partições da tabela existente. O valor padrão é STATIC .(Opcional) | Escrever |
enableReadSessionCaching | Configuração booleana para desabilitar o cache da sessão de leitura. Armazena sessões de leitura do BigQuery em cache para permitir um planejamento mais rápido de consultas do Spark. O valor padrão é true .(Opcional) | Ler |
readSessionCacheDurationMins | Configuração para definir a duração do cache da sessão de leitura em minutos. Funciona apenas se enableReadSessionCaching for true (padrão). Permite especificar a duração das sessões de leitura em cache. O valor máximo permitido é 300 . O valor padrão é 5 .(Opcional) | Ler |
bigQueryJobTimeoutInMinutes | Configuração para definir o tempo limite do job do BigQuery em minutos. O valor padrão é 360 minutos.(Opcional) | Ler/Escrever |
snapshotTimeMillis | Um carimbo de data/hora especificado em milissegundos a ser usado para ler um instantâneo de tabela. Por padrão, isso não é definido e a versão mais recente de uma tabela é lida. (Opcional) | Ler |
bigNumericDefaultPrecision | Uma precisão padrão alternativa para campos BigNumeric, já que o padrão do BigQuery é muito amplo para o Spark. Os valores podem estar entre 1 e 38. Esse padrão é usado somente quando o campo possui um tipo BigNumeric não parametrizado. Observe que pode haver perda de dados se a precisão real dos dados for maior do que a especificada. (Opcional) | Ler/Escrever |
bigNumericDefaultScale | Uma escala padrão alternativa para campos BigNumeric. Os valores podem estar entre 0 e 38 e menores que bigNumericFieldsPrecision. Este padrão é usado somente quando o campo possui um tipo BigNumeric não parametrizado. Observe que pode haver perda de dados se a escala real dos dados for maior do que a especificada. (Opcional) | Ler/Escrever |
As opções também podem ser definidas fora do código, usando o parâmetro --conf
do spark-submit
ou o parâmetro --properties
do gcloud dataproc submit spark
. Para usar isso, acrescente o prefixo spark.datasource.bigquery.
para qualquer uma das opções, por exemplo spark.conf.set("temporaryGcsBucket", "some-bucket")
também pode ser definido como --conf spark.datasource.bigquery.temporaryGcsBucket=some-bucket
.
Com exceção de DATETIME
e TIME
todos os tipos de dados direcionados do BigQuery são mapeados para o tipo de dados Spark SQL correspondente. Aqui estão todos os mapeamentos:
Tipo de dados SQL padrão do BigQuery | Faísca SQL Tipo de dados | Notas |
BOOL | BooleanType | |
INT64 | LongType | |
FLOAT64 | DoubleType | |
NUMERIC | DecimalType | Consulte o suporte numérico e BigNumeric |
BIGNUMERIC | DecimalType | Consulte o suporte numérico e BigNumeric |
STRING | StringType | |
BYTES | BinaryType | |
STRUCT | StructType | |
ARRAY | ArrayType | |
TIMESTAMP | TimestampType | |
DATE | DateType | |
DATETIME | StringType , TimestampNTZType * | Spark não tem tipo DATETIME. A string Spark pode ser gravada em uma coluna BQ DATETIME existente, desde que esteja no formato para literais BQ DATETIME. * Para Spark 3.4+, BQ DATETIME é lido como o tipo TimestampNTZ do Spark, ou seja, java LocalDateTime |
TIME | LongType , StringType * | Spark não tem tipo TIME. Os longos gerados, que indicam microssegundos desde a meia-noite, podem ser convertidos com segurança para TimestampType, mas isso faz com que a data seja inferida como o dia atual. Assim, os tempos são deixados longos e o usuário pode lançar se quiser. Ao transmitir para Timestamp TIME, há os mesmos problemas de fuso horário que DATETIME * A string Spark pode ser gravada em uma coluna BQ TIME existente, desde que esteja no formato para literais BQ TIME. |
JSON | StringType | Spark não tem tipo JSON. Os valores são lidos como String. Para gravar JSON de volta no BigQuery, as seguintes condições são NECESSÁRIAS :
|
ARRAY<STRUCT<key,value>> | MapType | O BigQuery não tem tipo MAP, portanto, semelhante a outras conversões, como trabalhos Apache Avro e BigQuery Load, o conector converte um Spark Map em um REPEATED STRUCT<key,value>. Isso significa que, embora a gravação e a leitura de mapas estejam disponíveis, a execução de um SQL no BigQuery que usa semântica de mapa não é compatível. Para consultar os valores do mapa usando o BigQuery SQL, consulte a documentação do BigQuery. Devido a estas incompatibilidades, aplicam-se algumas restrições:
|
O Spark ML Vector e Matrix são suportados, incluindo suas versões densa e esparsa. Os dados são salvos como um BigQuery RECORD. Observe que um sufixo é adicionado à descrição do campo que inclui o tipo spark do campo.
Para gravar esses tipos no BigQuery, use o formato intermediário ORC ou Avro e coloque-os como coluna da linha (ou seja, não como um campo em uma estrutura).
O BigNumeric do BigQuery tem uma precisão de 76,76 (o 77º dígito é parcial) e uma escala de 38. Como essa precisão e escala estão além do suporte a DecimalType do Spark (escala 38 e precisão 38), isso significa que os campos BigNumeric com precisão maior que 38 não podem ser usados . Assim que esta limitação do Spark for atualizada, o conector será atualizado de acordo.
A conversão Spark Decimal/BigQuery Numeric tenta preservar a parametrização do tipo, ou seja, NUMERIC(10,2)
será convertido para Decimal(10,2)
e vice-versa. Observe, entretanto, que há casos em que os parâmetros são perdidos. Isso significa que os parâmetros serão revertidos para os padrões - NUMERIC (38,9) e BIGNUMERIC(76,38). Isso significa que, no momento, a leitura do BigNumeric é compatível apenas em uma tabela padrão, mas não na visualização do BigQuery ou ao ler dados de uma consulta do BigQuery.
O conector calcula automaticamente a coluna e o pushdown filtra a instrução SELECT
do DataFrame, por exemplo
spark.read.bigquery("bigquery-public-data:samples.shakespeare")
.select("word")
.where("word = 'Hamlet' or word = 'Claudius'")
.collect()
filtros para a coluna word
e empurrou para baixo o filtro de predicado word = 'hamlet' or word = 'Claudius'
.
Se não quiser fazer várias solicitações de leitura ao BigQuery, você pode armazenar em cache o DataFrame antes de filtrar, por exemplo:
val cachedDF = spark.read.bigquery("bigquery-public-data:samples.shakespeare").cache()
val rows = cachedDF.select("word")
.where("word = 'Hamlet'")
.collect()
// All of the table was cached and this doesn't require an API call
val otherRows = cachedDF.select("word_count")
.where("word = 'Romeo'")
.collect()
Você também pode especificar manualmente a opção filter
, que substituirá o pushdown automático e o Spark fará o restante da filtragem no cliente.
As pseudocolunas _PARTITIONDATE e _PARTITIONTIME não fazem parte do esquema da tabela. Portanto, para consultar pelas partições de tabelas particionadas, não use o método where() mostrado acima. Em vez disso, adicione uma opção de filtro da seguinte maneira:
val df = spark.read.format("bigquery")
.option("filter", "_PARTITIONDATE > '2019-01-01'")
...
.load(TABLE)
Por padrão, o conector cria uma partição por 400 MB na tabela que está sendo lida (antes da filtragem). Isso deve corresponder aproximadamente ao número máximo de leitores compatíveis com a API BigQuery Storage. Isso pode ser configurado explicitamente com a propriedade maxParallelism
. O BigQuery pode limitar o número de partições com base nas restrições do servidor.
Para oferecer suporte ao rastreamento do uso de recursos do BigQuery, os conectores oferecem as seguintes opções para marcar recursos do BigQuery:
O conector pode iniciar jobs de carga e consulta do BigQuery. A adição de rótulos aos trabalhos é feita da seguinte maneira:
spark.conf.set("bigQueryJobLabel.cost_center", "analytics")
spark.conf.set("bigQueryJobLabel.usage", "nightly_etl")
Isso criará os rótulos cost_center
= analytics
e usage
= nightly_etl
.
Usado para anotar as sessões de leitura e gravação. O ID de rastreamento tem o formato Spark:ApplicationName:JobID
. Esta é uma opção opcional e para usá-la o usuário precisa configurar a propriedade traceApplicationName
. JobID é gerado automaticamente pelo ID do job do Dataproc, com um substituto para o ID do aplicativo Spark (como application_1648082975639_0001
). O ID do trabalho pode ser substituído configurando a opção traceJobId
. Observe que o comprimento total do ID de rastreamento não pode exceder 256 caracteres.
O conector pode ser usado em notebooks Jupyter mesmo que não esteja instalado no cluster Spark. Ele pode ser adicionado como um jar externo usando o seguinte código:
Pitão:
from pyspark . sql import SparkSession
spark = SparkSession . builder
. config ( "spark.jars.packages" , "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0" )
. getOrCreate ()
df = spark . read . format ( "bigquery" )
. load ( "dataset.table" )
Escala:
val spark = SparkSession .builder
.config( " spark.jars.packages " , " com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0 " )
.getOrCreate()
val df = spark.read.format( " bigquery " )
.load( " dataset.table " )
Caso o cluster Spark esteja usando Scala 2.12 (é opcional para Spark 2.4.x, obrigatório em 3.0.x), o pacote relevante é com.google.cloud.spark:spark-bigquery-with-dependencies_ 2.12 :0.41.0. Para saber qual versão do Scala é usada, execute o seguinte código:
Pitão:
spark . sparkContext . _jvm . scala . util . Properties . versionString ()
Escala:
scala . util . Properties . versionString
A menos que você queira usar a API Scala implícita spark.read.bigquery("TABLE_ID")
, não há necessidade de compilar no conector.
Para incluir o conector em seu projeto:
< dependency >
< groupId >com.google.cloud.spark</ groupId >
< artifactId >spark-bigquery-with-dependencies_${scala.version}</ artifactId >
< version >0.41.0</ version >
</ dependency >
libraryDependencies + = " com.google.cloud.spark " %% " spark-bigquery-with-dependencies " % " 0.41.0 "
O Spark preenche muitas métricas que podem ser encontradas pelo usuário final na página de histórico do Spark. Mas todas essas métricas estão relacionadas ao Spark e são coletadas implicitamente sem qualquer alteração no conector. Mas existem poucas métricas que são preenchidas no BigQuery e atualmente estão visíveis nos logs do aplicativo que podem ser lidos nos logs do driver/executor.
A partir do Spark 3.2, a Spark forneceu a API para expor métricas personalizadas na página da interface do Spark https://spark.apache.org/docs/3.2.0/api/java/org/apache/spark/sql/connector/metrric /Customtric.html
Atualmente, usando esta API, o Connector expõe as seguintes métricas de BigQuery durante a leitura
<estilo> tabela#metricstable td, tabela th {word-break: break-word} </style>Nome métrico | Descrição |
---|---|
bytes read | Número de bytes de BigQuery lidos |
rows read | Número de linhas BigQuery lidas |
scan time | A quantidade de tempo gasto entre as linhas de leitura resposta solicitada a obtida em todos os executores, em milissegundos. |
parse time | A quantidade de tempo gasto para analisar as linhas lidas em todos os executores, em milissegundos. |
spark time | A quantidade de tempo gasto em Spark para processar as consultas (ou seja, além da digitalização e análise), em todos os executores, em milissegundos. |
Nota: Para usar as métricas na página da interface do usuário do Spark, você precisa garantir que o spark-bigquery-metrics-0.41.0.jar
seja o caminho da classe antes de iniciar o servidor histórico e a versão do conector é spark-3.2
ou acima.
Veja a documentação de preços do BigQuery.
Você pode definir manualmente o número de partições com a propriedade maxParallelism
. BigQuery pode fornecer menos partições do que você solicita. Consulte Configurando o particionamento.
Você também pode sempre repartição depois de ler em Spark.
Se houver muitas partições, as cotas da CreateWritestream ou da taxa de transferência poderão ser excedidas. Isso ocorre porque, embora os dados de cada partição sejam processados em série, partições independentes podem ser processadas em paralelo em diferentes nós dentro do cluster Spark. Geralmente, para garantir a taxa de transferência máxima sustentada, você deve registrar uma solicitação de aumento da cota. No entanto, você também pode reduzir manualmente o número de partições que estão sendo escritas chamando coalesce
no quadro de dados para mitigar esse problema.
desiredPartitionCount = 5
dfNew = df.coalesce(desiredPartitionCount)
dfNew.write
Uma regra geral é ter um único identificador de partição pelo menos 1 GB de dados.
Observe também que um trabalho em execução com a propriedade writeAtLeastOnce
ativada não encontrará erros de cotas CreateWritEstream.
O conector precisa de uma instância de um GoogleCrerencials para se conectar às APIs BigQuery. Existem várias opções para fornecê -lo:
GOOGLE_APPLICATION_CREDENTIALS
, conforme descrito aqui. // Globally
spark.conf.set("credentialsFile", "</path/to/key/file>")
// Per read/Write
spark.read.format("bigquery").option("credentialsFile", "</path/to/key/file>")
// Globally
spark.conf.set("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
// Per read/Write
spark.read.format("bigquery").option("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
gcpAccessTokenProvider
. AccessTokenProvider
deve ser implementado em Java ou em outro idioma JVM, como Scala ou Kotlin. Ele deve ter um construtor não-Arg ou um construtor aceitando um único argumento java.util.String
. Este parâmetro de configuração pode ser fornecido usando a opção gcpAccessTokenProviderConfig
. Se isso não for fornecido, o construtor não-Arg será chamado. O frasco que contém a implementação deve estar no caminho de classe do cluster. // Globally
spark.conf.set("gcpAccessTokenProvider", "com.example.ExampleAccessTokenProvider")
// Per read/Write
spark.read.format("bigquery").option("gcpAccessTokenProvider", "com.example.ExampleAccessTokenProvider")
A representação da conta de serviço pode ser configurada para um nome de usuário específico e um nome de grupo, ou para todos os usuários por padrão usando as propriedades abaixo:
gcpImpersonationServiceAccountForUser_<USER_NAME>
(não definido por padrão)
A representação da conta de serviço para um usuário específico.
gcpImpersonationServiceAccountForGroup_<GROUP_NAME>
(não definido por padrão)
A representação da conta de serviço para um grupo específico.
gcpImpersonationServiceAccount
(não definido por padrão)
Padrão da conta de serviço padrão para todos os usuários.
Se alguma das propriedades acima estiver definida, a conta de serviço especificada será representada gerando credenciais de curta duração ao acessar o BigQuery.
Se mais de uma propriedade estiver definida, a conta de serviço associada ao nome de usuário terá precedência sobre a conta de serviço associada ao nome do grupo para um usuário e grupo correspondentes, que por sua vez terá precedência sobre a representação da conta de serviço padrão.
Para um aplicativo mais simples, onde o Token ACCESS Atualize não é necessário, outra alternativa é passar no token de acesso como a opção de configuração gcpAccessToken
. Você pode obter o token de acesso executando gcloud auth application-default print-access-token
.
// Globally
spark.conf.set("gcpAccessToken", "<access-token>")
// Per read/Write
spark.read.format("bigquery").option("gcpAccessToken", "<acccess-token>")
IMPORTANTE: O CredentialsProvider
e AccessTokenProvider
precisam ser implementados em Java ou em outro idioma da JVM, como Scala ou Kotlin. O frasco que contém a implementação deve estar no caminho de classe do cluster.
Aviso: apenas uma das opções acima deve ser fornecida.
Para conectar -se a um proxy direto e autenticar as credenciais do usuário, configure as seguintes opções.
proxyAddress
: endereço do servidor proxy. O proxy deve ser um proxy HTTP e o endereço deve estar no host:port
.
proxyUsername
: o nome de usuário usado para se conectar ao proxy.
proxyPassword
: a senha usada para conectar -se ao proxy.
val df = spark.read.format("bigquery")
.option("proxyAddress", "http://my-proxy:1234")
.option("proxyUsername", "my-username")
.option("proxyPassword", "my-password")
.load("some-table")
Os mesmos parâmetros de proxy também podem ser definidos globalmente usando o RunTimeconfig do Spark como este:
spark.conf.set("proxyAddress", "http://my-proxy:1234")
spark.conf.set("proxyUsername", "my-username")
spark.conf.set("proxyPassword", "my-password")
val df = spark.read.format("bigquery")
.load("some-table")
Você também pode definir o seguinte na configuração do Hadoop.
fs.gs.proxy.address
(semelhante ao "proxyaddress"), fs.gs.proxy.username
(semelhante ao "proxyusername") e fs.gs.proxy.password
(semelhante ao "proxyPassword").
Se o mesmo parâmetro estiver definido em vários lugares, a ordem de prioridade for a seguinte:
Opção ("Key", "Value")> Spark.conf> Configuração do Hadoop