Коннектор поддерживает чтение таблиц Google BigQuery в DataFrames Spark и запись DataFrames обратно в BigQuery. Это делается с помощью API источника данных Spark SQL для взаимодействия с BigQuery.
Storage API передает данные параллельно напрямую из BigQuery через gRPC без использования Google Cloud Storage в качестве посредника.
Он имеет ряд преимуществ по сравнению с предыдущим потоком чтения на основе экспорта, что обычно должно приводить к повышению производительности чтения:
Он не оставляет временных файлов в Google Cloud Storage. Строки считываются напрямую с серверов BigQuery с использованием форматов Arrow или Avro.
Новый API позволяет фильтровать столбцы и предикаты только для чтения тех данных, которые вас интересуют.
Поскольку BigQuery поддерживается хранилищем данных по столбцам, он может эффективно передавать данные без чтения всех столбцов.
API хранилища поддерживает произвольное изменение фильтров предикатов. Коннектор версии 0.8.0-бета и выше поддерживает передачу произвольных фильтров в Bigquery.
В Spark существует известная проблема, которая не позволяет сбрасывать фильтры во вложенных полях. Например, такие фильтры, как address.city = "Sunnyvale"
не будут передаваться в Bigquery.
API перебалансирует записи между читателями, пока все они не будут завершены. Это означает, что все этапы карты завершатся практически одновременно. Прочитайте эту статью в блоге о том, как динамическое сегментирование аналогичным образом используется в Google Cloud Dataflow.
Дополнительные сведения см. в разделе Настройка секционирования.
Следуйте этим инструкциям.
Если у вас нет среды Apache Spark, вы можете создать кластер Cloud Dataproc с предварительно настроенной аутентификацией. В следующих примерах предполагается, что вы используете Cloud Dataproc, но spark-submit
можно использовать в любом кластере.
Любому кластеру Dataproc, использующему API, необходимы области действия «bigquery» или «облачная платформа». Кластеры Dataproc по умолчанию имеют область действия bigquery, поэтому большинство кластеров в включенных проектах должны работать по умолчанию, например
MY_CLUSTER=...
gcloud dataproc clusters create "$MY_CLUSTER"
Последняя версия коннектора находится в публичном доступе по следующим ссылкам:
версия | Связь |
---|---|
Искра 3.5 | gs://spark-lib/bigquery/spark-3.5-bigquery-0.41.0.jar (ссылка HTTP) |
Искра 3.4 | gs://spark-lib/bigquery/spark-3.4-bigquery-0.41.0.jar (ссылка HTTP) |
Искра 3.3 | gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar (ссылка HTTP) |
Искра 3.2 | gs://spark-lib/bigquery/spark-3.2-bigquery-0.41.0.jar (ссылка HTTP) |
Искра 3.1 | gs://spark-lib/bigquery/spark-3.1-bigquery-0.41.0.jar (ссылка HTTP) |
Искра 2.4 | gs://spark-lib/bigquery/spark-2.4-bigquery-0.37.0.jar (ссылка HTTP) |
Скала 2.13 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-0.41.0.jar (HTTP-ссылка) |
Скала 2.12 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.41.0.jar (HTTP-ссылка) |
Скала 2.11 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.29.0.jar (HTTP-ссылка) |
Первые шесть версий представляют собой соединители на основе Java, предназначенные для Spark 2.4/3.1/3.2/3.3/3.4/3.5 всех версий Scala, построенных на новых API-интерфейсах источников данных (Data Source API v2) Spark.
Последние два соединителя — это соединители на основе Scala. Используйте jar, соответствующий вашей установке Spark, как описано ниже.
Разъем Искра | 2.3 | 2.4 | 3.0 | 3.1 | 3.2 | 3.3 | 3.4 | 3,5 |
---|---|---|---|---|---|---|---|---|
искра-3.5-bigquery | ✓ | |||||||
искра-3.4-bigquery | ✓ | ✓ | ||||||
искра-3.3-bigquery | ✓ | ✓ | ✓ | |||||
искра-3.2-bigquery | ✓ | ✓ | ✓ | ✓ | ||||
искра-3.1-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | |||
искра-2.4-bigquery | ✓ | |||||||
искра-bigquery-с-зависимостями_2.13 | ✓ | ✓ | ✓ | ✓ | ||||
искра-bigquery-с-зависимостями_2.12 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | |
искра-bigquery-с-зависимостями_2.11 | ✓ | ✓ |
Коннектор Образ Dataproc | 1.3 | 1,4 | 1,5 | 2.0 | 2.1 | 2.2 | Бессерверный Изображение 1.0 | Бессерверный Изображение 2.0 | Бессерверный Изображение 2.1 | Бессерверный Изображение 2.2 |
---|---|---|---|---|---|---|---|---|---|---|
искра-3.5-bigquery | ✓ | ✓ | ||||||||
искра-3.4-bigquery | ✓ | ✓ | ✓ | |||||||
искра-3.3-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ||||
искра-3.2-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ||||
искра-3.1-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | |||
искра-2.4-bigquery | ✓ | ✓ | ||||||||
искра-bigquery-с-зависимостями_2.13 | ✓ | ✓ | ✓ | |||||||
искра-bigquery-с-зависимостями_2.12 | ✓ | ✓ | ✓ | ✓ | ✓ | |||||
искра-bigquery-с-зависимостями_2.11 | ✓ | ✓ |
Соединитель также доступен в центральном репозитории Maven. Его можно использовать с помощью параметра --packages
или свойства конфигурации spark.jars.packages
. Используйте следующее значение
версия | Артефакт соединителя |
---|---|
Искра 3.5 | com.google.cloud.spark:spark-3.5-bigquery:0.41.0 |
Искра 3.4 | com.google.cloud.spark:spark-3.4-bigquery:0.41.0 |
Искра 3.3 | com.google.cloud.spark:spark-3.3-bigquery:0.41.0 |
Искра 3.2 | com.google.cloud.spark:spark-3.2-bigquery:0.41.0 |
Искра 3.1 | com.google.cloud.spark:spark-3.1-bigquery:0.41.0 |
Искра 2.4 | com.google.cloud.spark:spark-2.4-bigquery:0.37.0 |
Скала 2.13 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.13:0.41.0 |
Скала 2.12 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0 |
Скала 2.11 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.29.0 |
Кластеры Dataproc, созданные с использованием образа 2.1 и выше, или пакеты с использованием бессерверной службы Dataproc, поставляются со встроенным соединителем Spark BigQuery. Использование стандартных --jars
или --packages
(или, альтернативно, конфигурации spark.jars
/ spark.jars.packages
) в этом случае не поможет, поскольку встроенный соединитель имеет приоритет.
Чтобы использовать версию, отличную от встроенной, выполните одно из следующих действий:
--metadata SPARK_BQ_CONNECTOR_VERSION=0.41.0
или --metadata SPARK_BQ_CONNECTOR_URL=gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar
чтобы создать кластер с другим jar. URL-адрес может указывать на любой допустимый JAR-файл соединителя для версии Spark кластера.--properties dataproc.sparkBqConnector.version=0.41.0
или --properties dataproc.sparkBqConnector.uri=gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar
чтобы создать пакет с другой банкой. URL-адрес может указывать на любой допустимый JAR-файл соединителя для версии Spark среды выполнения. Вы можете запустить простой подсчет слов PySpark для API без компиляции, запустив
Образ Dataproc 1.5 и более поздних версий
gcloud dataproc jobs submit pyspark --cluster "$MY_CLUSTER"
--jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.41.0.jar
examples/python/shakespeare.py
Образ Dataproc 1.4 и ниже
gcloud dataproc jobs submit pyspark --cluster "$MY_CLUSTER"
--jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.29.0.jar
examples/python/shakespeare.py
https://codelabs.developers.google.com/codelabs/pyspark-bigquery
Соединитель использует межъязыковой API источника данных Spark SQL:
df = spark.read
.format("bigquery")
.load("bigquery-public-data.samples.shakespeare")
или только неявный API Scala:
import com.google.cloud.spark.bigquery._
val df = spark.read.bigquery("bigquery-public-data.samples.shakespeare")
Дополнительные сведения см. в дополнительных примерах кода на Python, Scala и Java.
Коннектор позволяет запускать любой стандартный запрос SQL SELECT в BigQuery и получать его результаты непосредственно в кадр данных Spark. Это легко сделать, как описано в следующем примере кода:
spark.conf.set("viewsEnabled","true")
spark.conf.set("materializationDataset","<dataset>")
sql = """
SELECT tag, COUNT(*) c
FROM (
SELECT SPLIT(tags, '|') tags
FROM `bigquery-public-data.stackoverflow.posts_questions` a
WHERE EXTRACT(YEAR FROM creation_date)>=2014
), UNNEST(tags) tag
GROUP BY 1
ORDER BY 2 DESC
LIMIT 10
"""
df = spark.read.format("bigquery").load(sql)
df.show()
Что дает результат
+----------+-------+
| tag| c|
+----------+-------+
|javascript|1643617|
| python|1352904|
| java|1218220|
| android| 913638|
| php| 911806|
| c#| 905331|
| html| 769499|
| jquery| 608071|
| css| 510343|
| c++| 458938|
+----------+-------+
Второй вариант — использовать опцию query
следующим образом:
df = spark.read.format("bigquery").option("query", sql).load()
Обратите внимание, что выполнение должно происходить быстрее, поскольку по сети передается только результат. Аналогичным образом запросы могут включать JOIN более эффективно, чем выполнение объединений в Spark, или использовать другие функции BigQuery, такие как подзапросы, пользовательские функции BigQuery, таблицы подстановочных знаков, BigQuery ML и многое другое.
Для использования этой функции ДОЛЖНЫ быть установлены следующие конфигурации:
viewsEnabled
должно быть установлено значение true
.materializationDataset
необходимо указать набор данных, в котором у пользователя GCP есть разрешение на создание таблицы. materializationProject
не является обязательным. Примечание. Как упоминалось в документации BigQuery, запрашиваемые таблицы должны находиться в том же месте, что и materializationDataset
. Кроме того, если таблицы в SQL statement
относятся к проектам, отличным от parentProject
, используйте полное имя таблицы, т. е [project].[dataset].[table]
.
Важно! Эта функция реализуется путем выполнения запроса в BigQuery и сохранения результата во временную таблицу, из которой Spark будет считывать результаты. Это может привести к дополнительным расходам в вашем аккаунте BigQuery.
В коннекторе имеется предварительная поддержка чтения из представлений BigQuery. Обратите внимание, есть несколько предостережений:
collect()
или count()
.materializationProject
и materializationDataset
соответственно. Эти параметры также можно установить глобально, вызвав spark.conf.set(...)
перед чтением представлений..option("viewsEnabled", "true")
), либо установите его глобально, вызвав spark.conf.set("viewsEnabled", "true")
.materializationDataset
должен находиться в том же месте, что и представление.Запись DataFrames в BigQuery может осуществляться двумя методами: прямым и косвенным.
В этом методе данные записываются непосредственно в BigQuery с использованием BigQuery Storage Write API. Чтобы включить эту опцию, установите для опции writeMethod
значение direct
, как показано ниже:
df.write
.format("bigquery")
.option("writeMethod", "direct")
.save("dataset.table")
Запись в существующие секционированные таблицы (секционированные по дате, секционированные по времени приема и секционированные по диапазону) в режиме сохранения APPEND и режиме OVERWRITE (секционированные только по дате и диапазону) полностью поддерживаются соединителем и BigQuery Storage Write API. Описанное ниже использование datePartition
, partitionField
, partitionType
, partitionRangeStart
, partitionRangeEnd
, partitionRangeInterval
в данный момент не поддерживается методом прямой записи.
Важно! Цены на BigQuery Storage Write API можно найти на странице цен на прием данных.
Важно: для прямой записи используйте версию 0.24.2 и выше, так как в предыдущих версиях есть ошибка, которая в некоторых случаях может привести к удалению таблицы.
В этом методе данные сначала записываются в GCS, а затем загружаются в BigQuery. Сегмент GCS должен быть настроен для указания временного местоположения данных.
df.write
.format("bigquery")
.option("temporaryGcsBucket","some-bucket")
.save("dataset.table")
Данные временно сохраняются в форматах Apache Parquet, Apache ORC или Apache Avro.
Корзину GCS и формат также можно установить глобально с помощью RuntimeConfig Spark следующим образом:
spark.conf.set("temporaryGcsBucket","some-bucket")
df.write
.format("bigquery")
.save("dataset.table")
При потоковой передаче DataFrame в BigQuery каждый пакет записывается так же, как и непоточный DataFrame. Обратите внимание, что необходимо указать местоположение контрольной точки, совместимое с HDFS (например: path/to/HDFS/dir
или gs://checkpoint-bucket/checkpointDir
).
df.writeStream
.format("bigquery")
.option("temporaryGcsBucket","some-bucket")
.option("checkpointLocation", "some-location")
.option("table", "dataset.table")
Важно! Соединитель не настраивает соединитель GCS, чтобы избежать конфликта с другим соединителем GCS, если он существует. Чтобы использовать возможности записи соединителя, настройте соединитель GCS в своем кластере, как описано здесь.
API поддерживает ряд опций для настройки чтения.
<style> table#propertytable td, table th { word-break:break-word } </style>Свойство | Значение | Использование |
---|---|---|
table | Таблица BigQuery в формате [[project:]dataset.]table . Вместо этого рекомендуется использовать параметр path функции load() / save() . Эта опция устарела и будет удалена в будущей версии.(Устарело) | Чтение/запись |
dataset | Набор данных, содержащий таблицу. Эту опцию следует использовать со стандартными таблицами и представлениями, но не при загрузке результатов запроса. (Необязательно, если не опущено в table ) | Чтение/запись |
project | Идентификатор проекта Google Cloud для таблицы. Эту опцию следует использовать со стандартными таблицами и представлениями, но не при загрузке результатов запроса. (Необязательно. По умолчанию используется проект используемой учетной записи службы) | Чтение/запись |
parentProject | Идентификатор проекта Google Cloud для таблицы, для которой будет выставлен счет за экспорт. (Необязательно. По умолчанию используется проект используемой учетной записи службы) | Чтение/запись |
maxParallelism | Максимальное количество разделов, на которые можно разбить данные. Фактическое число может быть меньше, если BigQuery сочтет данные достаточно маленькими. Если исполнителей недостаточно для планирования чтения для каждого раздела, некоторые разделы могут быть пустыми. Важно: старый параметр ( parallelism ) по-прежнему поддерживается, но в устаревшем режиме. Он будет удален в версии соединителя 1.0.(Необязательно. По умолчанию используется большее из предпочтительных значений MinParallelism и 20 000).) | Читать |
preferredMinParallelism | Предпочтительное минимальное количество разделов для разделения данных. Фактическое число может быть меньше, если BigQuery сочтет данные достаточно маленькими. Если исполнителей недостаточно для планирования чтения для каждого раздела, некоторые разделы могут быть пустыми. (Необязательно. По умолчанию используется наименьшее из трех значений параллелизма приложения по умолчанию и maxParallelism.) | Читать |
viewsEnabled | Позволяет соединителю читать данные из представлений, а не только из таблиц. Пожалуйста, прочтите соответствующий раздел, прежде чем активировать эту опцию. (Необязательно. По умолчанию false ) | Читать |
materializationProject | Идентификатор проекта, в котором будет создано материализованное представление. (Необязательно. По умолчанию используется идентификатор проекта представления) | Читать |
materializationDataset | Набор данных, в котором будет создано материализованное представление. Этот набор данных должен находиться в том же месте, что и представление или запрошенные таблицы. (Необязательно. По умолчанию используется набор данных представления) | Читать |
materializationExpirationTimeInMinutes | Срок действия временной таблицы, содержащей материализованные данные представления или запроса, в минутах. Обратите внимание, что соединитель может повторно использовать временную таблицу из-за использования локального кеша и для сокращения вычислений BigQuery, поэтому очень низкие значения могут привести к ошибкам. Значение должно быть положительным целым числом. (Необязательно. По умолчанию 1440 или 24 часа) | Читать |
readDataFormat | Формат данных для чтения из BigQuery. Опции: ARROW , AVRO (Необязательно. По умолчанию ARROW ) | Читать |
optimizedEmptyProjection | Коннектор использует оптимизированную логику пустой проекции (выбор без каких-либо столбцов), используемую для выполнения count() . Эта логика берет данные непосредственно из метаданных таблицы или выполняет более эффективную операцию `SELECT COUNT(*) WHERE...` в случае наличия фильтра. Вы можете отменить использование этой логики, установив для этой опции значение false .(Необязательно, по умолчанию true ) | Читать |
pushAllFilters | Если установлено значение true , соединитель передает все фильтры, которые Spark может делегировать BigQuery Storage API. Это уменьшает объем данных, которые необходимо отправлять с серверов BigQuery Storage API клиентам Spark. Эта опция устарела и будет удалена в будущей версии.(Необязательно, по умолчанию true )(Устарело) | Читать |
bigQueryJobLabel | Может использоваться для добавления меток к запросу, инициированному коннектором, и загрузки заданий BigQuery. Можно установить несколько меток. (Необязательный) | Читать |
bigQueryTableLabel | Может использоваться для добавления меток к таблице во время записи в таблицу. Можно установить несколько меток. (Необязательный) | Писать |
traceApplicationName | Имя приложения, используемое для отслеживания сеансов чтения и записи BigQuery Storage. Установка имени приложения необходима для установки идентификатора трассировки в сеансах. (Необязательный) | Читать |
traceJobId | Идентификатор задания, используемый для отслеживания сеансов чтения и записи BigQuery Storage. (Необязательно, по умолчанию существует идентификатор задания Dataproc, в противном случае используется идентификатор приложения Spark) | Читать |
createDisposition | Указывает, разрешено ли заданию создавать новые таблицы. Разрешенные значения:
(Необязательно. По умолчанию CREATE_IF_NEEDED). | Писать |
writeMethod | Управляет методом записи данных в BigQuery. Доступные значения — direct для использования BigQuery Storage Write API и indirect , которые сначала записывают данные в GCS, а затем запускают операцию загрузки BigQuery. Посмотреть больше здесь(Необязательно, по умолчанию — indirect ) | Писать |
writeAtLeastOnce | Гарантирует, что данные будут записаны в BigQuery хотя бы один раз. Это меньшая гарантия, чем ровно один раз. Это подходит для сценариев потоковой передачи, в которых данные постоянно записываются небольшими пакетами. (Необязательно. По умолчанию false )Поддерживается только методом записи DIRECT и режимом НЕ Overwrite. | Писать |
temporaryGcsBucket | Корзина GCS, в которой временно хранятся данные перед их загрузкой в BigQuery. Требуется, если не установлено в конфигурации Spark ( spark.conf.set(...) ).Не поддерживается методом записи DIRECT. | Писать |
persistentGcsBucket | Корзина GCS, в которой хранятся данные перед их загрузкой в BigQuery. Если будет сообщено, данные не будут удалены после записи данных в BigQuery. Не поддерживается методом записи DIRECT. | Писать |
persistentGcsPath | Путь GCS, по которому хранятся данные перед их загрузкой в BigQuery. Используется только с persistentGcsBucket .Не поддерживается методом записи DIRECT. | Писать |
intermediateFormat | Формат данных перед загрузкой в BigQuery, значения могут быть «parquet», «orc» или «avro». Чтобы использовать формат Avro, во время выполнения необходимо добавить пакет spark-avro. (Необязательно. По умолчанию — parquet ). Только на запись. Поддерживается только для метода записи INDIRECT. | Писать |
useAvroLogicalTypes | При загрузке из Avro (`.option("intermediateFormat", "avro")`) BigQuery использует базовые типы Avro вместо логических типов [по умолчанию](https://cloud.google.com/bigquery/docs/ loading-data-cloud-storage-avro#ological_types). Указание этого параметра преобразует логические типы Avro в соответствующие им типы данных BigQuery. (Необязательно. По умолчанию false ). Только на запись. | Писать |
datePartition | Раздел даты, в который будут записаны данные. Должна представлять собой строку даты в формате YYYYMMDD . Может использоваться для перезаписи данных одного раздела, например:
(Необязательный). Только на запись. Также может использоваться с различными типами разделов, например: ЧАС: YYYYMMDDHH МЕСЯЦ: YYYYMM ГОД: YYYY Не поддерживается методом записи DIRECT. | Писать |
partitionField | Если это поле указано, таблица секционируется по этому полю. Для разделения по времени укажите вместе с опцией `partitionType`. Для секционирования целочисленного диапазона укажите вместе с тремя параметрами: `partitionRangeStart`, `partitionRangeEnd`, `partitionRangeInterval`. Поле должно быть полем TIMESTAMP или DATE верхнего уровня для разделения по времени или INT64 для разделения по целочисленному диапазону. Его режим должен быть NULLABLE или REQUIRED . Если этот параметр не установлен для таблицы, секционированной по времени, то таблица будет секционирована по псевдостолбцу, на который ссылаются либо через '_PARTITIONTIME' as TIMESTAMP , либо через '_PARTITIONDATE' as DATE .(Необязательный). Не поддерживается методом записи DIRECT. | Писать |
partitionExpirationMs | Количество миллисекунд, в течение которого сохраняется память для разделов в таблице. Хранилище в разделе будет иметь срок действия времени раздела плюс это значение. (Необязательный). Не поддерживается методом записи DIRECT. | Писать |
partitionType | Используется для указания разделения по времени. Поддерживаемые типы: HOUR, DAY, MONTH, YEAR Этот параметр является обязательным для целевой таблицы, секционированной по времени. (Необязательно. По умолчанию — ДЕНЬ, если указано PartitionField). Не поддерживается методом записи DIRECT. | Писать |
partitionRangeStart , partitionRangeEnd , partitionRangeInterval | Используется для указания секционирования в целочисленном диапазоне. Эти параметры являются обязательными для секционирования целевой таблицы в целочисленном диапазоне. Необходимо указать все 3 параметра. Не поддерживается методом записи DIRECT. | Писать |
clusteredFields | Строка неповторяющихся столбцов верхнего уровня, разделенных запятой. (Необязательный). | Писать |
allowFieldAddition | Добавляет параметр ALLOW_FIELD_ADDITION SchemaUpdateOption в задание загрузки BigQuery. Допустимые значения: true и false .(Необязательно. По умолчанию false ).Поддерживается только методом записи INDIRECT. | Писать |
allowFieldRelaxation | Добавляет параметр ALLOW_FIELD_RELAXATION SchemaUpdateOption в задание загрузки BigQuery. Допустимые значения: true и false .(Необязательно. По умолчанию false ).Поддерживается только методом записи INDIRECT. | Писать |
proxyAddress | Адрес прокси-сервера. Прокси-сервер должен быть HTTP-прокси, а адрес должен быть в формате «хост:порт». Альтернативно можно установить в конфигурации Spark ( spark.conf.set(...) ) или в конфигурации Hadoop ( fs.gs.proxy.address ).(Необязательно. Требуется только при подключении к GCP через прокси-сервер.) | Чтение/запись |
proxyUsername | Имя пользователя, используемое для подключения к прокси. Альтернативно можно установить в конфигурации Spark ( spark.conf.set(...) ) или в конфигурации Hadoop ( fs.gs.proxy.username ).(Необязательно. Требуется только при подключении к GCP через прокси-сервер с аутентификацией.) | Чтение/запись |
proxyPassword | Пароль, используемый для подключения к прокси. Альтернативно можно установить в конфигурации Spark ( spark.conf.set(...) ) или в конфигурации Hadoop ( fs.gs.proxy.password ).(Необязательно. Требуется только при подключении к GCP через прокси-сервер с аутентификацией.) | Чтение/запись |
httpMaxRetry | Максимальное количество повторов низкоуровневых HTTP-запросов к BigQuery. Альтернативно можно установить в конфигурации Spark ( spark.conf.set("httpMaxRetry", ...) ) или в конфигурации Hadoop ( fs.gs.http.max.retry ).(Необязательно. По умолчанию — 10) | Чтение/запись |
httpConnectTimeout | Таймаут в миллисекундах для установления соединения с BigQuery. Альтернативно можно установить в конфигурации Spark ( spark.conf.set("httpConnectTimeout", ...) ) или в конфигурации Hadoop ( fs.gs.http.connect-timeout ).(Необязательно. По умолчанию — 60 000 мс. 0 — для бесконечного тайм-аута, отрицательное число — для 20 000). | Чтение/запись |
httpReadTimeout | Таймаут в миллисекундах для чтения данных из установленного соединения. Альтернативно можно установить в конфигурации Spark ( spark.conf.set("httpReadTimeout", ...) ) или в конфигурации Hadoop ( fs.gs.http.read-timeout ).(Необязательно. По умолчанию — 60 000 мс. 0 — для бесконечного тайм-аута, отрицательное число — для 20 000). | Читать |
arrowCompressionCodec | Кодек сжатия при чтении таблицы BigQuery при использовании формата Arrow. Опции: ZSTD (Zstandard compression) , LZ4_FRAME (https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md) , COMPRESSION_UNSPECIFIED . При использовании Java рекомендуемый кодек сжатия — ZSTD .(Необязательно. По умолчанию установлено значение COMPRESSION_UNSPECIFIED , что означает, что сжатие использоваться не будет) | Читать |
responseCompressionCodec | Кодек сжатия, используемый для сжатия данных ReadRowsResponse. Опции: RESPONSE_COMPRESSION_CODEC_UNSPECIFIED , RESPONSE_COMPRESSION_CODEC_LZ4 .(Необязательно. По умолчанию установлено значение RESPONSE_COMPRESSION_CODEC_UNSPECIFIED , что означает, что сжатие использоваться не будет) | Читать |
cacheExpirationTimeInMinutes | Время истечения срока действия кэша в памяти, в котором хранится информация запроса. Чтобы отключить кеширование, установите значение 0. (Необязательно. По умолчанию 15 минут) | Читать |
enableModeCheckForSchemaFields | Проверяет режим каждого поля в схеме назначения на соответствие режиму в соответствующей схеме исходного поля во время ПРЯМОЙ записи. Значение по умолчанию — true, т.е. проверка выполняется по умолчанию. Если установлено значение false, проверка режима игнорируется. | Писать |
enableListInference | Указывает, следует ли использовать вывод схемы, особенно если выбран режим Parquet (https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions). По умолчанию ложь. | Писать |
bqChannelPoolSize | (Фиксированный) размер пула каналов gRPC, созданного BigQueryReadClient. Для оптимальной производительности это значение должно быть установлено как минимум равным количеству ядер исполнителей кластера. | Читать |
createReadSessionTimeoutInSeconds | Таймаут в секундах для создания ReadSession при чтении таблицы. Для чрезвычайно большой таблицы это значение следует увеличить. (Необязательно. По умолчанию 600 секунд) | Читать |
queryJobPriority | Уровни приоритета, установленные для задания при чтении данных из запроса BigQuery. Разрешенные значения:
(Необязательно. По умолчанию — INTERACTIVE ) | Чтение/запись |
destinationTableKmsKeyName | Описывает ключ шифрования Cloud KMS, который будет использоваться для защиты целевой таблицы BigQuery. Сервисному аккаунту BigQuery, связанному с вашим проектом, требуется доступ к этому ключу шифрования. Дополнительную информацию об использовании CMEK с BigQuery см. [здесь](https://cloud.google.com/bigquery/docs/customer-managed-encryption#key_resource_id). Примечание. Таблица будет зашифрована ключом только в том случае, если она создана соединителем. Уже существующая незашифрованная таблица не будет зашифрована, просто установив этот параметр. (Необязательный) | Писать |
allowMapTypeConversion | Логическая конфигурация для отключения преобразования записей BigQuery в Spark MapType, когда запись имеет два подполя с именами полей в качестве key и value . Значение по умолчанию — true , что позволяет выполнить преобразование.(Необязательный) | Читать |
spark.sql.sources.partitionOverwriteMode | Конфигурация для указания режима перезаписи при записи, когда таблица секционирована по диапазону/времени. На данный момент поддерживается два режима: STATIC и DYNAMIC . В режиме STATIC перезаписывается вся таблица. В DYNAMIC режиме данные перезаписываются разделами существующей таблицы. Значение по умолчанию — STATIC .(Необязательный) | Писать |
enableReadSessionCaching | Логическая конфигурация для отключения кэширования сеанса чтения. Кэширует сеансы чтения BigQuery, чтобы ускорить планирование запросов Spark. Значение по умолчанию — true .(Необязательный) | Читать |
readSessionCacheDurationMins | Конфигурация для установки продолжительности кэширования сеанса чтения в минутах. Работает только в том случае, если для enableReadSessionCaching установлено true (по умолчанию). Позволяет указать продолжительность кэширования сеансов чтения. Максимально допустимое значение — 300 . Значение по умолчанию — 5 .(Необязательный) | Читать |
bigQueryJobTimeoutInMinutes | Конфигурация для установки времени ожидания задания BigQuery в минутах. Значение по умолчанию — 360 минут.(Необязательный) | Чтение/запись |
snapshotTimeMillis | Временная метка, указанная в миллисекундах, которая используется для чтения снимка таблицы. По умолчанию этот параметр не установлен, и считывается последняя версия таблицы. (Необязательный) | Читать |
bigNumericDefaultPrecision | Альтернативная точность по умолчанию для полей BigNumeric, поскольку значение по умолчанию BigQuery слишком велико для Spark. Значения могут быть от 1 до 38. Это значение по умолчанию используется только в том случае, если поле имеет непараметризованный тип BigNumeric. Обратите внимание: данные могут быть потеряны, если фактическая точность данных превышает указанную. (Необязательный) | Чтение/запись |
bigNumericDefaultScale | Альтернативный масштаб по умолчанию для полей BigNumeric. Значения могут быть от 0 до 38 и меньше, чем bigNumericFieldsPrecision. Это значение по умолчанию используется только в том случае, если поле имеет непараметризованный тип BigNumeric. Обратите внимание: данные могут быть потеряны, если фактический масштаб данных превышает указанный. (Необязательный) | Чтение/запись |
Параметры также можно установить вне кода, используя параметр --conf
для spark-submit
или параметр --properties
для gcloud dataproc submit spark
. Чтобы использовать это, добавьте префикс spark.datasource.bigquery.
к любому из параметров, например spark.conf.set("temporaryGcsBucket", "some-bucket")
также можно установить как --conf spark.datasource.bigquery.temporaryGcsBucket=some-bucket
.
За исключением DATETIME
и TIME
все типы данных BigQuery сопоставляются с соответствующим типом данных Spark SQL. Вот все сопоставления:
Стандартный тип данных SQL BigQuery | Искровой SQL Тип данных | Примечания |
BOOL | BooleanType | |
INT64 | LongType | |
FLOAT64 | DoubleType | |
NUMERIC | DecimalType | Пожалуйста, обратитесь к поддержке Numeric и BigNumeric. |
BIGNUMERIC | DecimalType | Пожалуйста, обратитесь к поддержке Numeric и BigNumeric. |
STRING | StringType | |
BYTES | BinaryType | |
STRUCT | StructType | |
ARRAY | ArrayType | |
TIMESTAMP | TimestampType | |
DATE | DateType | |
DATETIME | StringType , TimestampNTZType * | Spark не имеет типа DATETIME. Строку Spark можно записать в существующий столбец BQ DATETIME, если она имеет формат литералов BQ DATETIME. * Для Spark 3.4+ BQ DATETIME читается как тип TimestampNTZ Spark, т.е. Java LocalDateTime. |
TIME | LongType , StringType * | Spark не имеет типа TIME. Сгенерированные длинные значения, обозначающие микросекунды с полуночи, можно безопасно преобразовать в TimestampType, но в этом случае дата будет считаться текущим днем. Таким образом, время остается таким же длинным, и пользователь может кастовать, если захочет. При приведении к временной метке TIME возникают те же проблемы с TimeZone, что и DATETIME. * Строку Spark можно записать в существующий столбец BQ TIME, если она имеет формат литералов BQ TIME. |
JSON | StringType | Spark не имеет типа JSON. Значения читаются как строка. Чтобы записать JSON обратно в BigQuery, ТРЕБУЮТСЯ следующие условия:
|
ARRAY<STRUCT<key,value>> | MapType | BigQuery не имеет типа MAP, поэтому, как и другие преобразования, такие как Apache Avro и задания загрузки BigQuery, соединитель преобразует карту Spark в REPEATED STRUCT<key,value>. Это означает, что хотя запись и чтение карт доступны, запуск SQL в BigQuery, использующий семантику карт, не поддерживается. Чтобы обратиться к значениям карты с помощью BigQuery SQL, ознакомьтесь с документацией BigQuery. Из-за этой несовместимости применяются несколько ограничений:
|
Поддерживаются Spark ML Vector и Matrix, включая их плотные и разреженные версии. Данные сохраняются как BigQuery RECORD. Обратите внимание, что к описанию поля добавляется суффикс, который включает тип искры поля.
Чтобы записать эти типы в BigQuery, используйте промежуточный формат ORC или Avro и сделайте их столбцом строки (т. е. не полем в структуре).
BigNumeric в BigQuery имеет точность 76,76 (77-я цифра является частичной) и масштаб 38. Поскольку эта точность и масштаб выходят за рамки поддержки DecimalType в Spark (масштаб 38 и точность 38), это означает, что поля BigNumeric с точностью больше 38 нельзя использовать. . Как только это ограничение Spark будет обновлено, соединитель будет обновлен соответствующим образом.
Преобразование Spark Decimal/BigQuery Numeric пытается сохранить параметризацию типа, т. е. NUMERIC(10,2)
будет преобразовано в Decimal(10,2)
и наоборот. Однако обратите внимание, что бывают случаи, когда параметры теряются. Это означает, что параметры будут возвращены к значениям по умолчанию — NUMERIC (38,9) и BIGNUMERIC(76,38). Это означает, что на данный момент чтение BigNumeric поддерживается только из стандартной таблицы, но не из представления BigQuery или при чтении данных из запроса BigQuery.
Коннектор автоматически вычисляет столбец и фильтрует оператор SELECT
DataFrame, например
spark.read.bigquery("bigquery-public-data:samples.shakespeare")
.select("word")
.where("word = 'Hamlet' or word = 'Claudius'")
.collect()
фильтрует word
столбца и опускает фильтр предиката word = 'hamlet' or word = 'Claudius'
.
Если вы не хотите отправлять несколько запросов на чтение в BigQuery, вы можете кэшировать DataFrame перед фильтрацией, например:
val cachedDF = spark.read.bigquery("bigquery-public-data:samples.shakespeare").cache()
val rows = cachedDF.select("word")
.where("word = 'Hamlet'")
.collect()
// All of the table was cached and this doesn't require an API call
val otherRows = cachedDF.select("word_count")
.where("word = 'Romeo'")
.collect()
Вы также можете вручную указать параметр filter
, который будет переопределять автоматическое перемещение вниз, а Spark выполнит остальную часть фильтрации в клиенте.
Псевдостолбцы _PARTITIONDATE и _PARTITIONTIME не являются частью схемы таблицы. Поэтому для запроса по секциям секционированных таблиц не используйте методwhere(), показанный выше. Вместо этого добавьте параметр фильтра следующим образом:
val df = spark.read.format("bigquery")
.option("filter", "_PARTITIONDATE > '2019-01-01'")
...
.load(TABLE)
По умолчанию коннектор создает один раздел на каждые 400 МБ в читаемой таблице (перед фильтрацией). Это примерно должно соответствовать максимальному количеству читателей, поддерживаемому BigQuery Storage API. Это можно настроить явно с помощью свойства maxParallelism
. BigQuery может ограничивать количество разделов в зависимости от ограничений сервера.
Чтобы поддерживать отслеживание использования ресурсов BigQuery, коннекторы предлагают следующие варианты тегирования ресурсов BigQuery:
Коннектор может запускать задания загрузки и запроса BigQuery. Добавление меток к заданиям осуществляется следующим образом:
spark.conf.set("bigQueryJobLabel.cost_center", "analytics")
spark.conf.set("bigQueryJobLabel.usage", "nightly_etl")
Это создаст метки cost_center
= analytics
и usage
= nightly_etl
.
Используется для аннотирования сеансов чтения и записи. Идентификатор трассировки имеет формат Spark:ApplicationName:JobID
. Это добровольная опция, и чтобы использовать ее, пользователю необходимо установить traceApplicationName
. JobID автоматически генерируется на основе идентификатора задания Dataproc с резервным идентификатором приложения Spark (например, application_1648082975639_0001
). Идентификатор задания можно переопределить, задав параметр traceJobId
. Обратите внимание, что общая длина идентификатора трассировки не может превышать 256 символов.
Разъем можно использовать в ноутбуках Jupyter, даже если он не установлен в кластере Spark. Его можно добавить как внешний jar, используя следующий код:
Питон:
from pyspark . sql import SparkSession
spark = SparkSession . builder
. config ( "spark.jars.packages" , "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0" )
. getOrCreate ()
df = spark . read . format ( "bigquery" )
. load ( "dataset.table" )
Скала:
val spark = SparkSession .builder
.config( " spark.jars.packages " , " com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0 " )
.getOrCreate()
val df = spark.read.format( " bigquery " )
.load( " dataset.table " )
Если кластер Spark использует Scala 2.12 (это необязательно для Spark 2.4.x, обязательно в 3.0.x), тогда соответствующий пакет — com.google.cloud.spark:spark-bigquery-with-dependents_ 2.12 :0.41.0. Чтобы узнать, какая версия Scala используется, запустите следующий код:
Питон:
spark . sparkContext . _jvm . scala . util . Properties . versionString ()
Скала:
scala . util . Properties . versionString
Если вы не хотите использовать неявный Scala API spark.read.bigquery("TABLE_ID")
, нет необходимости компилировать с использованием соединителя.
Чтобы включить соединитель в проект:
< dependency >
< groupId >com.google.cloud.spark</ groupId >
< artifactId >spark-bigquery-with-dependencies_${scala.version}</ artifactId >
< version >0.41.0</ version >
</ dependency >
libraryDependencies + = " com.google.cloud.spark " %% " spark-bigquery-with-dependencies " % " 0.41.0 "
Spark заполняет множество показателей, которые конечный пользователь может найти на странице истории Spark. Но все эти показатели связаны с искрой и собираются неявно, без каких-либо изменений в разъеме. Но есть несколько метрик, которые заполняются из BigQuery и в настоящее время видны в журналах приложений, которые можно прочитать в журналах драйвера/исполнителя.
Начиная с Spark 3.2, Spark предоставил API для разоблачения пользовательских метрик на странице Spark UI https://spark.apache.org/docs/3.2.0/api/java/org/apache/spark/sql/connector/metric /Custommetric.html
В настоящее время, используя этот API, Connector раскрывает следующие метрики BigQuery во время чтения
<style> Таблица#Metricstable TD, таблица Th {Word-Break: Break-Word} </style>Метрическое название | Описание |
---|---|
bytes read | Количество BigQuery Bytes читается |
rows read | Количество рядов BigQuery читается |
scan time | Количество времени, проведенного между ответом на чтение строк, запрошенное для получения для всех исполнителей, в миллисекундах. |
parse time | Количество времени, проведенного на анализ рядов, читается по всем исполнителям, в миллисекундах. |
spark time | Количество времени, проведенного в Spark для обработки запросов (то есть, кроме сканирования и анализа), для всех исполнителей, в миллисекундах. |
Примечание. Чтобы использовать метрики на странице UI Spark, вам необходимо убедиться, что spark-bigquery-metrics-0.41.0.jar
-это путь класса, прежде чем запустить исторический сервер, а версия Connector- spark-3.2
или выше.
Смотрите документацию по ценам BigQuery.
Вы можете вручную установить количество разделов со свойством maxParallelism
. BigQuery может предоставить меньше разделов, чем вы просите. См. Настройка разделения.
Вы также всегда можете перераспределять после прочтения в Spark.
Если существует слишком много разделов, могут быть превышены квоты CreateWritestream или пропускная способность. Это происходит потому, что хотя данные в каждом разделе обрабатываются последовательно, независимые разделы могут обрабатываться параллельно на разных узлах в кластере Spark. Как правило, для обеспечения максимальной устойчивой пропускной способности вы должны подать запрос на увеличение квоты. Тем не менее, вы также можете вручную уменьшить количество разделов, записанных, вызывая coalesce
на DataFrame, чтобы смягчить эту проблему.
desiredPartitionCount = 5
dfNew = df.coalesce(desiredPartitionCount)
dfNew.write
Правило эмпирического правила состоит в том, чтобы иметь ручку одного раздела не менее 1 ГБ данных.
Также обратите внимание, что работа, работающая с включенной собственностью writeAtLeastOnce
, не столкнется с ошибками CreateWriteStream Quatats.
Разъем нуждается в экземпляре Googlecredentials, чтобы подключиться к API BigQuery. Есть несколько вариантов для его предоставления:
GOOGLE_APPLICATION_CREDENTIALS
, как описано здесь. // Globally
spark.conf.set("credentialsFile", "</path/to/key/file>")
// Per read/Write
spark.read.format("bigquery").option("credentialsFile", "</path/to/key/file>")
// Globally
spark.conf.set("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
// Per read/Write
spark.read.format("bigquery").option("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
gcpAccessTokenProvider
. AccessTokenProvider
должен быть реализован на Java или на другом языке JVM, таком как Scala или Kotlin. Он должен иметь либо конструктор без арг, либо конструктор, принимающий один аргумент java.util.String
. Этот параметр конфигурации может быть предоставлен с использованием опции gcpAccessTokenProviderConfig
. Если это не предоставлено, то будет вызван конструктор без арга. JAR, содержащая реализацию, должна находиться на класте. // Globally
spark.conf.set("gcpAccessTokenProvider", "com.example.ExampleAccessTokenProvider")
// Per read/Write
spark.read.format("bigquery").option("gcpAccessTokenProvider", "com.example.ExampleAccessTokenProvider")
Выражаемая учетная запись может быть настроена для конкретного имени пользователя и имени группы, или для всех пользователей по умолчанию, используя свойства ниже:
gcpImpersonationServiceAccountForUser_<USER_NAME>
(не установлен по умолчанию)
Ориентировочное возмещение учетной записи для конкретного пользователя.
gcpImpersonationServiceAccountForGroup_<GROUP_NAME>
(не установлен по умолчанию)
Ориентировочное возмездие счета для конкретной группы.
gcpImpersonationServiceAccount
(не установлен по умолчанию)
По умолчанию Сервисная учетная запись подражания всем пользователям.
Если какое-либо из вышеперечисленных свойств установлено, то указанная учетная запись сервиса будет выдаваться за счет получения недолговечных учетных данных при доступе к BigQuery.
Если установлено более одного свойства, то учетная запись службы, связанная с именем пользователя, будет иметь приоритет над учетной записью службы, связанной с именем группы для соответствующего пользователя и группы, которая, в свою очередь, будет иметь приоритет над имрекацией учетной записи службы по умолчанию.
Для более простого приложения, где обновление токена доступа не требуется, другой альтернативой является передача токена Access в качестве опции конфигурации gcpAccessToken
. Вы можете получить токен Access, запустив gcloud auth application-default print-access-token
.
// Globally
spark.conf.set("gcpAccessToken", "<access-token>")
// Per read/Write
spark.read.format("bigquery").option("gcpAccessToken", "<acccess-token>")
Важно: CredentialsProvider
и AccessTokenProvider
должны быть реализованы на Java или на другом языке JVM, таком как Scala или Kotlin. JAR, содержащая реализацию, должна находиться на класте.
Примечание: должен быть предоставлен только один из вышеперечисленных вариантов.
Чтобы подключиться к прямому прокси и для аутентификации учетных данных пользователя, настройте следующие параметры.
proxyAddress
: адрес прокси -сервера. Прокси должен быть HTTP -прокси, и адрес должен быть в host:port
.
proxyUsername
: имя пользователя, используемое для подключения к прокси.
proxyPassword
: пароль, используемый для подключения к прокси.
val df = spark.read.format("bigquery")
.option("proxyAddress", "http://my-proxy:1234")
.option("proxyUsername", "my-username")
.option("proxyPassword", "my-password")
.load("some-table")
Те же самые прокси -параметры также могут быть установлены по всему миру, используя Spark's RuntimeConfig, как это:
spark.conf.set("proxyAddress", "http://my-proxy:1234")
spark.conf.set("proxyUsername", "my-username")
spark.conf.set("proxyPassword", "my-password")
val df = spark.read.format("bigquery")
.load("some-table")
Вы также можете установить следующее в конфигурации Hadoop.
fs.gs.proxy.address
(аналогично «proxyaddress»), fs.gs.proxy.username
(аналогично «proxyusername») и fs.gs.proxy.password
(аналогично «proxypassword»).
Если один и тот же параметр установлен в нескольких местах, порядок приоритета заключается в следующем:
опция ("key", "value")> spark.conf> конфигурация Hadoop