Der Connector unterstützt das Lesen von Google BigQuery-Tabellen in die DataFrames von Spark und das Zurückschreiben von DataFrames in BigQuery. Dies erfolgt durch die Verwendung der Spark SQL-Datenquellen-API zur Kommunikation mit BigQuery.
Die Storage API streamt Daten parallel direkt aus BigQuery über gRPC, ohne Google Cloud Storage als Vermittler zu nutzen.
Es bietet eine Reihe von Vorteilen gegenüber der Verwendung des bisherigen exportbasierten Leseflusses, die im Allgemeinen zu einer besseren Leseleistung führen sollten:
Es bleiben keine temporären Dateien im Google Cloud Storage. Zeilen werden mithilfe der Wire-Formate Arrow oder Avro direkt von BigQuery-Servern gelesen.
Die neue API ermöglicht die Spalten- und Prädikatfilterung, um nur die Daten zu lesen, die Sie interessieren.
Da BigQuery auf einem spaltenbasierten Datenspeicher basiert, können Daten effizient gestreamt werden, ohne dass alle Spalten gelesen werden müssen.
Die Storage-API unterstützt das beliebige Pushdown von Prädikatfiltern. Connector-Version 0.8.0-Beta und höher unterstützt das Pushdown beliebiger Filter an Bigquery.
Es gibt ein bekanntes Problem in Spark, das das Pushdown von Filtern für verschachtelte Felder nicht zulässt. Beispielsweise werden Filter wie address.city = "Sunnyvale"
nicht an Bigquery weitergeleitet.
Die API gleicht die Datensätze zwischen den Lesern neu aus, bis sie alle abgeschlossen sind. Das bedeutet, dass alle Kartenphasen fast gleichzeitig abgeschlossen werden. In diesem Blogartikel erfahren Sie, wie dynamisches Sharding in Google Cloud Dataflow auf ähnliche Weise verwendet wird.
Weitere Einzelheiten finden Sie unter Partitionierung konfigurieren.
Befolgen Sie diese Anweisungen.
Wenn Sie keine Apache Spark-Umgebung haben, können Sie einen Cloud Dataproc-Cluster mit vorkonfigurierter Authentifizierung erstellen. In den folgenden Beispielen wird davon ausgegangen, dass Sie Cloud Dataproc verwenden, Sie können spark-submit
jedoch auf jedem Cluster verwenden.
Jeder Dataproc-Cluster, der die API verwendet, benötigt die Bereiche „bigquery“ oder „cloud-platform“. Dataproc-Cluster haben standardmäßig den Bereich „bigquery“, daher sollten die meisten Cluster in aktivierten Projekten standardmäßig funktionieren, z
MY_CLUSTER=...
gcloud dataproc clusters create "$MY_CLUSTER"
Die neueste Version des Connectors ist unter den folgenden Links öffentlich verfügbar:
Version | Link |
---|---|
Spark 3.5 | gs://spark-lib/bigquery/spark-3.5-bigquery-0.41.0.jar (HTTP-Link) |
Spark 3.4 | gs://spark-lib/bigquery/spark-3.4-bigquery-0.41.0.jar (HTTP-Link) |
Spark 3.3 | gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar (HTTP-Link) |
Spark 3.2 | gs://spark-lib/bigquery/spark-3.2-bigquery-0.41.0.jar (HTTP-Link) |
Spark 3.1 | gs://spark-lib/bigquery/spark-3.1-bigquery-0.41.0.jar (HTTP-Link) |
Spark 2.4 | gs://spark-lib/bigquery/spark-2.4-bigquery-0.37.0.jar (HTTP-Link) |
Scala 2.13 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-0.41.0.jar (HTTP-Link) |
Scala 2.12 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.41.0.jar (HTTP-Link) |
Scala 2.11 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.29.0.jar (HTTP-Link) |
Die ersten sechs Versionen sind Java-basierte Konnektoren für Spark 2.4/3.1/3.2/3.3/3.4/3.5 aller Scala-Versionen, die auf den neuen Datenquellen-APIs (Datenquellen-API v2) von Spark basieren.
Bei den letzten beiden Konnektoren handelt es sich um Scala-basierte Konnektoren. Bitte verwenden Sie das für Ihre Spark-Installation relevante JAR, wie unten beschrieben.
Connector Spark | 2.3 | 2.4 | 3,0 | 3.1 | 3.2 | 3.3 | 3.4 | 3.5 |
---|---|---|---|---|---|---|---|---|
spark-3.5-bigquery | ✓ | |||||||
spark-3.4-bigquery | ✓ | ✓ | ||||||
spark-3.3-bigquery | ✓ | ✓ | ✓ | |||||
spark-3.2-bigquery | ✓ | ✓ | ✓ | ✓ | ||||
spark-3.1-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | |||
spark-2.4-bigquery | ✓ | |||||||
spark-bigquery-with-dependencies_2.13 | ✓ | ✓ | ✓ | ✓ | ||||
spark-bigquery-with-dependencies_2.12 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | |
spark-bigquery-with-dependencies_2.11 | ✓ | ✓ |
Connector Dataproc-Image | 1.3 | 1.4 | 1.5 | 2,0 | 2.1 | 2.2 | Serverlos Bild 1.0 | Serverlos Bild 2.0 | Serverlos Bild 2.1 | Serverlos Bild 2.2 |
---|---|---|---|---|---|---|---|---|---|---|
spark-3.5-bigquery | ✓ | ✓ | ||||||||
spark-3.4-bigquery | ✓ | ✓ | ✓ | |||||||
spark-3.3-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ||||
spark-3.2-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ||||
spark-3.1-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | |||
spark-2.4-bigquery | ✓ | ✓ | ||||||||
spark-bigquery-with-dependencies_2.13 | ✓ | ✓ | ✓ | |||||||
spark-bigquery-with-dependencies_2.12 | ✓ | ✓ | ✓ | ✓ | ✓ | |||||
spark-bigquery-with-dependencies_2.11 | ✓ | ✓ |
Der Connector ist auch im Maven Central-Repository verfügbar. Es kann mit der Option --packages
oder der Konfigurationseigenschaft spark.jars.packages
verwendet werden. Verwenden Sie den folgenden Wert
Version | Verbindungsartefakt |
---|---|
Spark 3.5 | com.google.cloud.spark:spark-3.5-bigquery:0.41.0 |
Spark 3.4 | com.google.cloud.spark:spark-3.4-bigquery:0.41.0 |
Spark 3.3 | com.google.cloud.spark:spark-3.3-bigquery:0.41.0 |
Spark 3.2 | com.google.cloud.spark:spark-3.2-bigquery:0.41.0 |
Spark 3.1 | com.google.cloud.spark:spark-3.1-bigquery:0.41.0 |
Spark 2.4 | com.google.cloud.spark:spark-2.4-bigquery:0.37.0 |
Scala 2.13 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.13:0.41.0 |
Scala 2.12 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0 |
Scala 2.11 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.29.0 |
Dataproc-Cluster, die mit Image 2.1 und höher erstellt wurden, oder Batches, die den serverlosen Dataproc-Dienst verwenden, verfügen über einen integrierten Spark BigQuery-Connector. Die Verwendung der Standardkonfigurationen --jars
oder --packages
(oder alternativ der Konfiguration spark.jars
/ spark.jars.packages
) hilft in diesem Fall nicht, da der integrierte Connector Vorrang hat.
Um eine andere Version als die integrierte Version zu verwenden, führen Sie bitte einen der folgenden Schritte aus:
--metadata SPARK_BQ_CONNECTOR_VERSION=0.41.0
oder --metadata SPARK_BQ_CONNECTOR_URL=gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar
um den Cluster mit einem anderen JAR zu erstellen. Die URL kann auf jede gültige Connector-JAR für die Spark-Version des Clusters verweisen.--properties dataproc.sparkBqConnector.version=0.41.0
oder --properties dataproc.sparkBqConnector.uri=gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar
um den Stapel mit einem anderen Glas zu erstellen. Die URL kann auf jede gültige Connector-JAR für die Spark-Version der Laufzeit verweisen. Sie können eine einfache PySpark-Wortzählung für die API ohne Kompilierung ausführen, indem Sie ausführen
Dataproc-Image 1.5 und höher
gcloud dataproc jobs submit pyspark --cluster "$MY_CLUSTER"
--jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.41.0.jar
examples/python/shakespeare.py
Dataproc-Image 1.4 und niedriger
gcloud dataproc jobs submit pyspark --cluster "$MY_CLUSTER"
--jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.29.0.jar
examples/python/shakespeare.py
https://codelabs.developers.google.com/codelabs/pyspark-bigquery
Der Connector verwendet die sprachübergreifende Spark SQL-Datenquellen-API:
df = spark.read
.format("bigquery")
.load("bigquery-public-data.samples.shakespeare")
oder die nur implizite Scala-API:
import com.google.cloud.spark.bigquery._
val df = spark.read.bigquery("bigquery-public-data.samples.shakespeare")
Weitere Informationen finden Sie in den zusätzlichen Codebeispielen in Python, Scala und Java.
Mit dem Connector können Sie jede Standard-SQL-SELECT-Abfrage in BigQuery ausführen und deren Ergebnisse direkt in einen Spark-Datenrahmen abrufen. Dies geht ganz einfach, wie im folgenden Codebeispiel beschrieben:
spark.conf.set("viewsEnabled","true")
spark.conf.set("materializationDataset","<dataset>")
sql = """
SELECT tag, COUNT(*) c
FROM (
SELECT SPLIT(tags, '|') tags
FROM `bigquery-public-data.stackoverflow.posts_questions` a
WHERE EXTRACT(YEAR FROM creation_date)>=2014
), UNNEST(tags) tag
GROUP BY 1
ORDER BY 2 DESC
LIMIT 10
"""
df = spark.read.format("bigquery").load(sql)
df.show()
Was das Ergebnis ergibt
+----------+-------+
| tag| c|
+----------+-------+
|javascript|1643617|
| python|1352904|
| java|1218220|
| android| 913638|
| php| 911806|
| c#| 905331|
| html| 769499|
| jquery| 608071|
| css| 510343|
| c++| 458938|
+----------+-------+
Eine zweite Möglichkeit besteht darin, die query
wie folgt zu verwenden:
df = spark.read.format("bigquery").option("query", sql).load()
Beachten Sie, dass die Ausführung schneller sein sollte, da nur das Ergebnis über die Leitung übertragen wird. Auf ähnliche Weise können die Abfragen JOINs effizienter einschließen als die Ausführung von Joins auf Spark oder andere BigQuery-Funktionen wie Unterabfragen, benutzerdefinierte BigQuery-Funktionen, Platzhaltertabellen, BigQuery ML und mehr nutzen.
Um diese Funktion nutzen zu können, MÜSSEN die folgenden Konfigurationen festgelegt werden:
viewsEnabled
muss auf true
gesetzt sein.materializationDataset
muss auf einen Datensatz festgelegt werden, bei dem der GCP-Benutzer über die Berechtigung zur Tabellenerstellung verfügt. materializationProject
ist optional. Hinweis: Wie in der BigQuery-Dokumentation erwähnt, müssen sich die abgefragten Tabellen am selben Speicherort wie materializationDataset
befinden. Wenn die Tabellen in der SQL statement
außerdem aus anderen Projekten als dem parentProject
stammen, verwenden Sie den vollständig qualifizierten Tabellennamen, z. B. [project].[dataset].[table]
.
Wichtig: Diese Funktion wird implementiert, indem die Abfrage in BigQuery ausgeführt und das Ergebnis in einer temporären Tabelle gespeichert wird, aus der Spark die Ergebnisse liest. Dadurch können zusätzliche Kosten für Ihr BigQuery-Konto entstehen.
Der Connector bietet vorläufige Unterstützung für das Lesen aus BigQuery-Ansichten. Bitte beachten Sie, dass es einige Einschränkungen gibt:
collect()
oder count()
Aktion ausgeführt wird.materializationProject
bzw. materializationDataset
konfiguriert werden. Diese Optionen können auch global festgelegt werden, indem spark.conf.set(...)
vor dem Lesen der Ansichten aufgerufen wird..option("viewsEnabled", "true")
) oder legen Sie sie global fest, indem Sie spark.conf.set("viewsEnabled", "true")
aufrufen.materializationDataset
am selben Ort wie die Ansicht befinden.Das Schreiben von DataFrames in BigQuery kann mit zwei Methoden erfolgen: direkt und indirekt.
Bei dieser Methode werden die Daten mithilfe der BigQuery Storage Write API direkt in BigQuery geschrieben. Um diese Option zu aktivieren, setzen Sie bitte die Option writeMethod
auf direct
, wie unten gezeigt:
df.write
.format("bigquery")
.option("writeMethod", "direct")
.save("dataset.table")
Das Schreiben in vorhandene partitionierte Tabellen (nach Datum, Aufnahmezeit und Bereich) im APPEND-Speichermodus und im OVERWRITE-Modus (nur Datum und Bereich) wird vom Connector und der BigQuery Storage Write API vollständig unterstützt. Die unten beschriebene Verwendung von datePartition
, partitionField
, partitionType
, partitionRangeStart
, partitionRangeEnd
und partitionRangeInterval
wird derzeit von der direkten Schreibmethode nicht unterstützt.
Wichtig: Informationen zu den Preisen für die BigQuery Storage Write API finden Sie auf der Seite mit den Preisen für die Datenaufnahme.
Wichtig: Bitte verwenden Sie für direkte Schreibvorgänge Version 0.24.2 und höher, da frühere Versionen einen Fehler aufweisen, der in bestimmten Fällen zum Löschen einer Tabelle führen kann.
Bei dieser Methode werden die Daten zuerst in GCS geschrieben und dann in BigQuery geladen. Ein GCS-Bucket muss konfiguriert werden, um den temporären Datenspeicherort anzugeben.
df.write
.format("bigquery")
.option("temporaryGcsBucket","some-bucket")
.save("dataset.table")
Die vorübergehende Speicherung der Daten erfolgt in den Formaten Apache Parquet, Apache ORC oder Apache Avro.
Der GCS-Bucket und das Format können auch global mit Sparks RuntimeConfig wie folgt festgelegt werden:
spark.conf.set("temporaryGcsBucket","some-bucket")
df.write
.format("bigquery")
.save("dataset.table")
Beim Streamen eines DataFrames an BigQuery wird jeder Batch auf die gleiche Weise geschrieben wie bei einem nicht streamenden DataFrame. Beachten Sie, dass ein HDFS-kompatibler Prüfpunktspeicherort (z. B. path/to/HDFS/dir
oder gs://checkpoint-bucket/checkpointDir
) angegeben werden muss.
df.writeStream
.format("bigquery")
.option("temporaryGcsBucket","some-bucket")
.option("checkpointLocation", "some-location")
.option("table", "dataset.table")
Wichtig: Der Connector konfiguriert nicht den GCS-Connector, um Konflikte mit einem anderen GCS-Connector (sofern vorhanden) zu vermeiden. Um die Schreibfunktionen des Connectors zu nutzen, konfigurieren Sie bitte den GCS-Connector in Ihrem Cluster wie hier beschrieben.
Die API unterstützt eine Reihe von Optionen zum Konfigurieren des Lesevorgangs
<style> table#propertytable td, table th { word-break:break-word } </style>Eigentum | Bedeutung | Verwendung |
---|---|---|
table | Die BigQuery-Tabelle im Format [[project:]dataset.]table . Es wird empfohlen, stattdessen den path von load() / save() zu verwenden. Diese Option ist veraltet und wird in einer zukünftigen Version entfernt.(Veraltet) | Lesen/Schreiben |
dataset | Der Datensatz, der die Tabelle enthält. Diese Option sollte mit Standardtabellen und -ansichten verwendet werden, jedoch nicht beim Laden von Abfrageergebnissen. (Optional, sofern nicht in table weggelassen) | Lesen/Schreiben |
project | Die Google Cloud-Projekt-ID der Tabelle. Diese Option sollte mit Standardtabellen und -ansichten verwendet werden, jedoch nicht beim Laden von Abfrageergebnissen. (Optional. Standardmäßig das Projekt des verwendeten Dienstkontos) | Lesen/Schreiben |
parentProject | Die Google Cloud-Projekt-ID der Tabelle, für die der Export in Rechnung gestellt werden soll. (Optional. Standardmäßig das Projekt des verwendeten Dienstkontos) | Lesen/Schreiben |
maxParallelism | Die maximale Anzahl von Partitionen, in die die Daten aufgeteilt werden sollen. Die tatsächliche Zahl kann geringer sein, wenn BigQuery die Daten für klein genug hält. Wenn nicht genügend Executoren vorhanden sind, um einen Leser pro Partition zu planen, sind einige Partitionen möglicherweise leer. Wichtig: Der alte Parameter ( parallelism ) wird weiterhin unterstützt, jedoch im veralteten Modus. Es wird in Version 1.0 des Connectors entfernt.(Optional. Standardmäßig wird der größere Wert von „preferredMinParallelism“ und „20.000“ verwendet.) | Lesen |
preferredMinParallelism | Die bevorzugte Mindestanzahl an Partitionen, in die die Daten aufgeteilt werden sollen. Die tatsächliche Zahl kann geringer sein, wenn BigQuery die Daten für klein genug hält. Wenn nicht genügend Executoren vorhanden sind, um einen Leser pro Partition zu planen, sind einige Partitionen möglicherweise leer. (Optional. Der Standardwert ist der kleinste von drei Malen der Standardparallelität und maxParallelism der Anwendung.) | Lesen |
viewsEnabled | Ermöglicht dem Connector das Lesen aus Ansichten und nicht nur aus Tabellen. Bitte lesen Sie den entsprechenden Abschnitt, bevor Sie diese Option aktivieren. (Optional. Standardmäßig ist false ) | Lesen |
materializationProject | Die Projekt-ID, in der die materialisierte Ansicht erstellt werden soll (Optional. Standardmäßig wird die Projekt-ID der Ansicht verwendet) | Lesen |
materializationDataset | Der Datensatz, in dem die materialisierte Ansicht erstellt werden soll. Dieser Datensatz sollte sich am selben Ort befinden wie die Ansicht oder die abgefragten Tabellen. (Optional. Standardmäßig wird der Datensatz der Ansicht verwendet) | Lesen |
materializationExpirationTimeInMinutes | Die Ablaufzeit der temporären Tabelle, die die materialisierten Daten einer Ansicht oder Abfrage enthält, in Minuten. Beachten Sie, dass der Connector die temporäre Tabelle aufgrund der Verwendung des lokalen Caches und zur Reduzierung der BigQuery-Berechnungen möglicherweise wiederverwendet, sodass sehr niedrige Werte zu Fehlern führen können. Der Wert muss eine positive ganze Zahl sein. (Optional. Standardmäßig 1440 oder 24 Stunden) | Lesen |
readDataFormat | Datenformat zum Lesen aus BigQuery. Optionen: ARROW , AVRO (Optional. Standardmäßig ist ARROW ) | Lesen |
optimizedEmptyProjection | Der Connector verwendet eine optimierte leere Projektionslogik (Auswahl ohne Spalten), die für die Ausführung count() verwendet wird. Diese Logik übernimmt die Daten direkt aus den Tabellenmetadaten oder führt ein sehr effizientes „SELECT COUNT(*) WHERE...“ durch, falls ein Filter vorhanden ist. Sie können die Verwendung dieser Logik abbrechen, indem Sie diese Option auf false setzen.(Optional, standardmäßig true ) | Lesen |
pushAllFilters | Wenn dieser Wert auf true gesetzt ist, überträgt der Connector alle Filter, die Spark an die BigQuery Storage API delegieren kann. Dadurch wird die Datenmenge reduziert, die von BigQuery Storage API-Servern an Spark-Clients gesendet werden muss. Diese Option ist veraltet und wird in einer zukünftigen Version entfernt.(Optional, standardmäßig true )(Veraltet) | Lesen |
bigQueryJobLabel | Kann verwendet werden, um Beschriftungen zur vom Connector initiierten Abfrage hinzuzufügen und BigQuery-Jobs zu laden. Es können mehrere Beschriftungen festgelegt werden. (Optional) | Lesen |
bigQueryTableLabel | Kann verwendet werden, um Beschriftungen zur Tabelle hinzuzufügen, während in eine Tabelle geschrieben wird. Es können mehrere Beschriftungen festgelegt werden. (Optional) | Schreiben |
traceApplicationName | Anwendungsname, der zum Verfolgen von BigQuery Storage-Lese- und Schreibsitzungen verwendet wird. Das Festlegen des Anwendungsnamens ist erforderlich, um die Trace-ID für die Sitzungen festzulegen. (Optional) | Lesen |
traceJobId | Job-ID, die zum Verfolgen von BigQuery Storage-Lese- und Schreibsitzungen verwendet wird. (Optional, standardmäßig wird die Dataproc-Job-ID verwendet, andernfalls wird die Spark-Anwendungs-ID verwendet.) | Lesen |
createDisposition | Gibt an, ob der Job neue Tabellen erstellen darf. Die zulässigen Werte sind:
(Optional. Standardmäßig CREATE_IF_NEEDED). | Schreiben |
writeMethod | Steuert die Methode, mit der die Daten in BigQuery geschrieben werden. Verfügbare Werte sind direct zur Verwendung der BigQuery Storage Write API und indirect , die die Daten zuerst in GCS schreiben und dann einen BigQuery-Ladevorgang auslösen. Weitere Informationen finden Sie hier(Optional, standardmäßig indirect ) | Schreiben |
writeAtLeastOnce | Garantiert, dass Daten mindestens einmal in BigQuery geschrieben werden. Dies ist eine geringere Garantie als genau einmal. Dies eignet sich für Streaming-Szenarien, in denen Daten kontinuierlich in kleinen Mengen geschrieben werden. (Optional. Standardmäßig ist false )Wird nur von der Schreibmethode „DIRECT“ unterstützt und der Modus ist NICHT „Overwrite“. | Schreiben |
temporaryGcsBucket | Der GCS-Bucket, der die Daten vorübergehend speichert, bevor sie in BigQuery geladen werden. Erforderlich, sofern nicht in der Spark-Konfiguration festgelegt ( spark.conf.set(...) ).Wird von der Schreibmethode „DIRECT“ nicht unterstützt. | Schreiben |
persistentGcsBucket | Der GCS-Bucket, der die Daten enthält, bevor sie in BigQuery geladen werden. Wenn Sie darüber informiert werden, werden die Daten nach dem Schreiben der Daten in BigQuery nicht gelöscht. Wird von der Schreibmethode „DIRECT“ nicht unterstützt. | Schreiben |
persistentGcsPath | Der GCS-Pfad, der die Daten enthält, bevor sie in BigQuery geladen werden. Wird nur mit persistentGcsBucket verwendet.Wird von der Schreibmethode „DIRECT“ nicht unterstützt. | Schreiben |
intermediateFormat | Das Format der Daten vor dem Laden in BigQuery. Die Werte können entweder „parquet“, „orc“ oder „avro“ sein. Um das Avro-Format verwenden zu können, muss das Paket spark-avro zur Laufzeit hinzugefügt werden. (Optional. Standardmäßig ist parquet ). Nur beim Schreiben. Wird nur für die Schreibmethode „INDIRECT“ unterstützt. | Schreiben |
useAvroLogicalTypes | Beim Laden aus Avro (`.option("intermediateFormat", "avro")` verwendet BigQuery [standardmäßig] die zugrunde liegenden Avro-Typen anstelle der logischen Typen (https://cloud.google.com/bigquery/docs/ Loading-Data-Cloud-Storage-Avro#logical_types). Durch die Bereitstellung dieser Option werden logische Avro-Typen in die entsprechenden BigQuery-Datentypen konvertiert. (Optional. Der Standardwert ist false ). Nur beim Schreiben. | Schreiben |
datePartition | Die Datumspartition, in die die Daten geschrieben werden. Sollte eine Datumszeichenfolge im Format YYYYMMDD sein. Kann verwendet werden, um die Daten einer einzelnen Partition wie folgt zu überschreiben:
(Optional). Nur beim Schreiben. Kann auch mit verschiedenen Partitionstypen verwendet werden wie: STUNDE: YYYYMMDDHH MONAT: YYYYMM JAHR: YYYY Wird von der Schreibmethode „DIRECT“ nicht unterstützt. | Schreiben |
partitionField | Wenn dieses Feld angegeben ist, wird die Tabelle nach diesem Feld partitioniert. Geben Sie für die Zeitpartitionierung zusammen mit der Option „partitionType“ an. Geben Sie für die Ganzzahlbereichspartitionierung zusammen mit den drei Optionen an: „partitionRangeStart“, „partitionRangeEnd“, „partitionRangeInterval“. Das Feld muss ein TIMESTAMP- oder DATE-Feld der obersten Ebene für die Zeitpartitionierung oder INT64 für die Ganzzahlbereichspartitionierung sein. Sein Modus muss NULLABLE oder REQUIRED sein. Wenn die Option für eine nach Zeit partitionierte Tabelle nicht festgelegt ist, wird die Tabelle nach Pseudospalten partitioniert, auf die entweder über '_PARTITIONTIME' as TIMESTAMP Typ oder '_PARTITIONDATE' as DATE Typ verwiesen wird.(Optional). Wird von der Schreibmethode „DIRECT“ nicht unterstützt. | Schreiben |
partitionExpirationMs | Anzahl der Millisekunden, für die der Speicherplatz für Partitionen in der Tabelle beibehalten werden soll. Der Speicher in einer Partition hat eine Ablaufzeit, die sich aus der Partitionszeit plus diesem Wert ergibt. (Optional). Wird von der Schreibmethode „DIRECT“ nicht unterstützt. | Schreiben |
partitionType | Wird verwendet, um die Zeitpartitionierung anzugeben. Unterstützte Typen sind: HOUR, DAY, MONTH, YEAR Diese Option ist für die Zeitpartitionierung einer Zieltabelle obligatorisch . (Optional. Standardmäßig ist DAY, wenn PartitionField angegeben ist). Wird von der Schreibmethode „DIRECT“ nicht unterstützt. | Schreiben |
partitionRangeStart , partitionRangeEnd , partitionRangeInterval | Wird verwendet, um die Partitionierung im Ganzzahlbereich anzugeben. Diese Optionen sind obligatorisch , damit eine Zieltabelle eine Ganzzahlbereichspartitionierung erhält. Alle 3 Optionen müssen angegeben werden. Wird von der Schreibmethode „DIRECT“ nicht unterstützt. | Schreiben |
clusteredFields | Eine Folge nicht wiederholter Spalten der obersten Ebene, getrennt durch Komma. (Optional). | Schreiben |
allowFieldAddition | Fügt die ALLOW_FIELD_ADDITION SchemaUpdateOption zum BigQuery LoadJob hinzu. Zulässige Werte sind true und false .(Optional. Standardmäßig ist false ).Wird nur von der Schreibmethode „INDIRECT“ unterstützt. | Schreiben |
allowFieldRelaxation | Fügt die ALLOW_FIELD_RELAXATION SchemaUpdateOption zum BigQuery LoadJob hinzu. Zulässige Werte sind true und false .(Optional. Standardmäßig ist false ).Wird nur von der Schreibmethode „INDIRECT“ unterstützt. | Schreiben |
proxyAddress | Adresse des Proxyservers. Der Proxy muss ein HTTP-Proxy sein und die Adresse sollte im Format „Host:Port“ vorliegen. Kann alternativ in der Spark-Konfiguration ( spark.conf.set(...) ) oder in der Hadoop-Konfiguration ( fs.gs.proxy.address ) festgelegt werden.(Optional. Nur erforderlich, wenn eine Verbindung zur GCP über einen Proxy hergestellt wird.) | Lesen/Schreiben |
proxyUsername | Der Benutzername, der für die Verbindung mit dem Proxy verwendet wird. Kann alternativ in der Spark-Konfiguration ( spark.conf.set(...) ) oder in der Hadoop-Konfiguration ( fs.gs.proxy.username ) festgelegt werden.(Optional. Nur erforderlich, wenn eine Verbindung zur GCP über einen Proxy mit Authentifizierung hergestellt wird.) | Lesen/Schreiben |
proxyPassword | Das Passwort, das für die Verbindung zum Proxy verwendet wird. Kann alternativ in der Spark-Konfiguration ( spark.conf.set(...) ) oder in der Hadoop-Konfiguration ( fs.gs.proxy.password ) festgelegt werden.(Optional. Nur erforderlich, wenn eine Verbindung zur GCP über einen Proxy mit Authentifizierung hergestellt wird.) | Lesen/Schreiben |
httpMaxRetry | Die maximale Anzahl von Wiederholungen für die Low-Level-HTTP-Anfragen an BigQuery. Kann alternativ in der Spark-Konfiguration ( spark.conf.set("httpMaxRetry", ...) ) oder in der Hadoop-Konfiguration ( fs.gs.http.max.retry ) festgelegt werden.(Optional. Standard ist 10) | Lesen/Schreiben |
httpConnectTimeout | Das Timeout in Millisekunden zum Herstellen einer Verbindung mit BigQuery. Kann alternativ in der Spark-Konfiguration ( spark.conf.set("httpConnectTimeout", ...) ) oder in der Hadoop-Konfiguration ( fs.gs.http.connect-timeout ) festgelegt werden.(Optional. Der Standardwert ist 60.000 ms. 0 für eine unendliche Zeitüberschreitung, eine negative Zahl für 20.000) | Lesen/Schreiben |
httpReadTimeout | Das Timeout in Millisekunden zum Lesen von Daten aus einer bestehenden Verbindung. Kann alternativ in der Spark-Konfiguration ( spark.conf.set("httpReadTimeout", ...) ) oder in der Hadoop-Konfiguration ( fs.gs.http.read-timeout ) festgelegt werden.(Optional. Der Standardwert ist 60.000 ms. 0 für ein unendliches Timeout, eine negative Zahl für 20.000) | Lesen |
arrowCompressionCodec | Komprimierungscodec beim Lesen aus einer BigQuery-Tabelle bei Verwendung des Arrow-Formats. Optionen: ZSTD (Zstandard compression) , LZ4_FRAME (https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md) , COMPRESSION_UNSPECIFIED . Der empfohlene Komprimierungscodec ist ZSTD bei Verwendung von Java.(Optional. Der Standardwert ist COMPRESSION_UNSPECIFIED , was bedeutet, dass keine Komprimierung verwendet wird.) | Lesen |
responseCompressionCodec | Komprimierungscodec, der zum Komprimieren der ReadRowsResponse-Daten verwendet wird. Optionen: RESPONSE_COMPRESSION_CODEC_UNSPECIFIED , RESPONSE_COMPRESSION_CODEC_LZ4 (Optional. Der Standardwert ist RESPONSE_COMPRESSION_CODEC_UNSPECIFIED was bedeutet, dass keine Komprimierung verwendet wird.) | Lesen |
cacheExpirationTimeInMinutes | Die Ablaufzeit des In-Memory-Cache, in dem Abfrageinformationen gespeichert werden. Um das Caching zu deaktivieren, legen Sie den Wert auf 0 fest. (Optional. Standardmäßig 15 Minuten) | Lesen |
enableModeCheckForSchemaFields | Überprüft, ob der Modus jedes Felds im Zielschema beim DIRECT-Schreiben mit dem Modus im entsprechenden Quellfeldschema übereinstimmt. Der Standardwert ist „true“, d. h. die Prüfung wird standardmäßig durchgeführt. Bei „false“ wird die Modusprüfung ignoriert. | Schreiben |
enableListInference | Gibt an, ob die Schemainferenz speziell im Parkettmodus verwendet werden soll (https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions). Der Standardwert ist „false“. | Schreiben |
bqChannelPoolSize | Die (feste) Größe des vom BigQueryReadClient erstellten gRPC-Kanalpools. Für eine optimale Leistung sollte dies mindestens auf die Anzahl der Kerne auf den Cluster-Executoren eingestellt werden. | Lesen |
createReadSessionTimeoutInSeconds | Das Zeitlimit in Sekunden zum Erstellen einer ReadSession beim Lesen einer Tabelle. Für extrem große Tabellen sollte dieser Wert erhöht werden. (Optional. Standardmäßig 600 Sekunden) | Lesen |
queryJobPriority | Für den Job festgelegte Prioritätsstufen beim Lesen von Daten aus der BigQuery-Abfrage. Die zulässigen Werte sind:
(Optional. Standardmäßig ist INTERACTIVE ) | Lesen/Schreiben |
destinationTableKmsKeyName | Beschreibt den Cloud KMS-Verschlüsselungsschlüssel, der zum Schutz der BigQuery-Zieltabelle verwendet wird. Das mit Ihrem Projekt verknüpfte BigQuery-Dienstkonto benötigt Zugriff auf diesen Verschlüsselungsschlüssel. Weitere Informationen zur Verwendung von CMEK mit BigQuery finden Sie [hier](https://cloud.google.com/bigquery/docs/customer-managed-encryption#key_resource_id). Hinweis: Die Tabelle wird nur dann mit dem Schlüssel verschlüsselt, wenn sie vom Connector erstellt wurde. Eine bereits vorhandene unverschlüsselte Tabelle wird nicht allein durch das Festlegen dieser Option verschlüsselt. (Optional) | Schreiben |
allowMapTypeConversion | Boolesche Konfiguration zum Deaktivieren der Konvertierung von BigQuery-Datensätzen in Spark MapType, wenn der Datensatz zwei Unterfelder mit Feldnamen als key und value hat. Der Standardwert ist true , was die Konvertierung ermöglicht.(Optional) | Lesen |
spark.sql.sources.partitionOverwriteMode | Konfigurieren Sie, um den Überschreibmodus beim Schreiben anzugeben, wenn die Tabelle bereichs-/zeitpartitioniert ist. Derzeit werden zwei Modi unterstützt: STATIC und DYNAMIC . Im STATIC Modus wird die gesamte Tabelle überschrieben. Im DYNAMIC Modus werden die Daten durch Partitionen der vorhandenen Tabelle überschrieben. Der Standardwert ist STATIC .(Optional) | Schreiben |
enableReadSessionCaching | Boolesche Konfiguration zum Deaktivieren des Lesesitzungs-Cachings. Speichert BigQuery-Lesesitzungen im Cache, um eine schnellere Planung von Spark-Abfragen zu ermöglichen. Der Standardwert ist true .(Optional) | Lesen |
readSessionCacheDurationMins | Konfigurieren Sie, um die Caching-Dauer der Lesesitzung in Minuten festzulegen. Funktioniert nur, wenn enableReadSessionCaching true ist (Standard). Ermöglicht die Angabe der Dauer, für die Lesesitzungen zwischengespeichert werden sollen. Der maximal zulässige Wert beträgt 300 . Der Standardwert ist 5 .(Optional) | Lesen |
bigQueryJobTimeoutInMinutes | Konfigurieren Sie, um das BigQuery-Job-Timeout in Minuten festzulegen. Der Standardwert beträgt 360 Minuten.(Optional) | Lesen/Schreiben |
snapshotTimeMillis | Ein in Millisekunden angegebener Zeitstempel, der zum Lesen eines Tabellen-Snapshots verwendet werden soll. Standardmäßig ist dies nicht festgelegt und es wird die neueste Version einer Tabelle gelesen. (Optional) | Lesen |
bigNumericDefaultPrecision | Eine alternative Standardgenauigkeit für BigNumeric-Felder, da der BigQuery-Standard zu breit für Spark ist. Die Werte können zwischen 1 und 38 liegen. Diese Standardeinstellung wird nur verwendet, wenn das Feld einen nicht parametrisierten BigNumeric-Typ hat. Bitte beachten Sie, dass es zu Datenverlusten kommen kann, wenn die Genauigkeit der tatsächlichen Daten höher ist als angegeben. (Optional) | Lesen/Schreiben |
bigNumericDefaultScale | Eine alternative Standardskala für BigNumeric-Felder. Die Werte können zwischen 0 und 38 liegen und kleiner als bigNumericFieldsPrecision sein. Dieser Standardwert wird nur verwendet, wenn das Feld einen nicht parametrisierten BigNumeric-Typ hat. Bitte beachten Sie, dass es zu Datenverlusten kommen kann, wenn der tatsächliche Datenumfang über den angegebenen Wert hinausgeht. (Optional) | Lesen/Schreiben |
Optionen können auch außerhalb des Codes festgelegt werden, indem der Parameter --conf
von spark-submit
oder der Parameter --properties
von gcloud dataproc submit spark
verwendet wird. Um dies zu verwenden, stellen Sie das Präfix spark.datasource.bigquery.
auf eine der Optionen, zum Beispiel spark.conf.set("temporaryGcsBucket", "some-bucket")
kann auch als --conf spark.datasource.bigquery.temporaryGcsBucket=some-bucket
festgelegt werden.
Mit Ausnahme von DATETIME
und TIME
werden alle BigQuery-Datentypen gezielt dem entsprechenden Spark SQL-Datentyp zugeordnet. Hier sind alle Zuordnungen:
BigQuery-Standard-SQL-Datentyp | Spark SQL Datentyp | Notizen |
BOOL | BooleanType | |
INT64 | LongType | |
FLOAT64 | DoubleType | |
NUMERIC | DecimalType | Weitere Informationen finden Sie unter Numeric- und BigNumeric-Unterstützung |
BIGNUMERIC | DecimalType | Weitere Informationen finden Sie unter Numeric- und BigNumeric-Unterstützung |
STRING | StringType | |
BYTES | BinaryType | |
STRUCT | StructType | |
ARRAY | ArrayType | |
TIMESTAMP | TimestampType | |
DATE | DateType | |
DATETIME | StringType , TimestampNTZType * | Spark hat keinen DATETIME-Typ. Die Spark-Zeichenfolge kann in eine vorhandene BQ-DATETIME-Spalte geschrieben werden, sofern sie das Format für BQ-DATETIME-Literale aufweist. * Für Spark 3.4+ wird BQ DATETIME als Sparks TimestampNTZ-Typ gelesen, d. h. Java LocalDateTime |
TIME | LongType , StringType * | Spark hat keinen TIME-Typ. Die generierten Longs, die Mikrosekunden seit Mitternacht angeben, können sicher in TimestampType umgewandelt werden, aber dadurch wird das Datum als aktueller Tag abgeleitet. Somit bleiben die Zeiten so lange bestehen und der Benutzer kann bei Bedarf wirken. Beim Umwandeln in den Zeitstempel TIME treten dieselben TimeZone-Probleme auf wie bei DATETIME * Spark-String kann in eine vorhandene BQ TIME-Spalte geschrieben werden, vorausgesetzt, sie hat das Format für BQ TIME-Literale. |
JSON | StringType | Spark hat keinen JSON-Typ. Die Werte werden als String gelesen. Um JSON zurück in BigQuery zu schreiben, sind die folgenden Bedingungen ERFORDERLICH :
|
ARRAY<STRUCT<key,value>> | MapType | BigQuery hat keinen MAP-Typ, daher konvertiert der Connector ähnlich wie bei anderen Konvertierungen wie Apache Avro und BigQuery Load-Jobs eine Spark Map in einen REPEATED STRUCT<key,value>. Das bedeutet, dass das Schreiben und Lesen von Karten zwar möglich ist, die Ausführung von SQL auf BigQuery, das Kartensemantik verwendet, jedoch nicht unterstützt wird. Informationen zum Verweisen auf die Werte der Karte mithilfe von BigQuery SQL finden Sie in der BigQuery-Dokumentation. Aufgrund dieser Inkompatibilitäten gelten einige Einschränkungen:
|
Der Spark ML-Vektor und die Spark ML-Matrix werden unterstützt, einschließlich ihrer dichten und spärlichen Versionen. Die Daten werden als BigQuery RECORD gespeichert. Beachten Sie, dass der Feldbeschreibung ein Suffix hinzugefügt wird, das den Spark-Typ des Felds enthält.
Um diese Typen in BigQuery zu schreiben, verwenden Sie das ORC- oder Avro-Zwischenformat und verwenden Sie sie als Spalte der Zeile (d. h. nicht als Feld in einer Struktur).
BigNumeric von BigQuery hat eine Genauigkeit von 76,76 (die 77. Ziffer ist teilweise) und eine Skalierung von 38. Da diese Präzision und Skalierung über die DecimalType-Unterstützung (38 Skalierung und 38 Genauigkeit) von Spark hinausgeht, bedeutet dies, dass BigNumeric-Felder mit einer Genauigkeit größer als 38 nicht verwendet werden können . Sobald diese Spark-Einschränkung aktualisiert wird, wird der Connector entsprechend aktualisiert.
Bei der Spark Decimal/BigQuery Numeric-Konvertierung wird versucht, die Parametrisierung des Typs beizubehalten, d. h. NUMERIC(10,2)
wird in Decimal(10,2)
konvertiert und umgekehrt. Beachten Sie jedoch, dass es Fälle gibt, in denen die Parameter verloren gehen. Dies bedeutet, dass die Parameter auf die Standardwerte zurückgesetzt werden – NUMERIC (38,9) und BIGNUMERIC(76,38). Das bedeutet, dass das BigNumeric-Lesen derzeit nur aus einer Standardtabelle unterstützt wird, nicht jedoch aus der BigQuery-Ansicht oder beim Lesen von Daten aus einer BigQuery-Abfrage.
Der Connector berechnet automatisch die Spalten- und Pushdown-Filter der SELECT
-Anweisung des DataFrame, z
spark.read.bigquery("bigquery-public-data:samples.shakespeare")
.select("word")
.where("word = 'Hamlet' or word = 'Claudius'")
.collect()
filtert nach der Spalte word
und drückt den Prädikatfilter word = 'hamlet' or word = 'Claudius'
nach unten.
Wenn Sie nicht mehrere Leseanfragen an BigQuery senden möchten, können Sie den DataFrame vor dem Filtern zwischenspeichern, z. B.:
val cachedDF = spark.read.bigquery("bigquery-public-data:samples.shakespeare").cache()
val rows = cachedDF.select("word")
.where("word = 'Hamlet'")
.collect()
// All of the table was cached and this doesn't require an API call
val otherRows = cachedDF.select("word_count")
.where("word = 'Romeo'")
.collect()
Sie können die filter
auch manuell angeben, wodurch das automatische Pushdown außer Kraft gesetzt wird und Spark den Rest der Filterung im Client übernimmt.
Die Pseudospalten _PARTITIONDATE und _PARTITIONTIME sind nicht Teil des Tabellenschemas. Um die Partitionen partitionierter Tabellen abzufragen, verwenden Sie daher nicht die oben gezeigte Methode where(). Fügen Sie stattdessen wie folgt eine Filteroption hinzu:
val df = spark.read.format("bigquery")
.option("filter", "_PARTITIONDATE > '2019-01-01'")
...
.load(TABLE)
Standardmäßig erstellt der Connector eine Partition pro 400 MB in der gelesenen Tabelle (vor dem Filtern). Dies sollte in etwa der maximalen Anzahl von Lesern entsprechen, die von der BigQuery Storage API unterstützt werden. Dies kann explizit mit der Eigenschaft maxParallelism
konfiguriert werden. BigQuery kann die Anzahl der Partitionen aufgrund von Servereinschränkungen begrenzen.
Um die Nachverfolgung der Nutzung von BigQuery-Ressourcen zu unterstützen, bieten die Connectors die folgenden Optionen zum Markieren von BigQuery-Ressourcen:
Der Connector kann BigQuery-Lade- und Abfragejobs starten. Das Hinzufügen von Labels zu den Jobs erfolgt auf folgende Weise:
spark.conf.set("bigQueryJobLabel.cost_center", "analytics")
spark.conf.set("bigQueryJobLabel.usage", "nightly_etl")
Dadurch werden die Labels cost_center
= analytics
und usage
= nightly_etl
erstellt.
Wird zum Kommentieren der Lese- und Schreibsitzungen verwendet. Die Trace-ID hat das Format Spark:ApplicationName:JobID
. Dies ist eine Opt-In-Option, und um sie verwenden zu können, muss der Benutzer die Eigenschaft traceApplicationName
festlegen. Die Job-ID wird automatisch von der Dataproc-Job-ID generiert, mit einem Fallback auf die Spark-Anwendungs-ID (z. B. application_1648082975639_0001
). Die Job-ID kann durch Festlegen der Option traceJobId
überschrieben werden. Beachten Sie, dass die Gesamtlänge der Trace-ID 256 Zeichen nicht überschreiten darf.
Der Connector kann in Jupyter-Notebooks verwendet werden, auch wenn er nicht auf dem Spark-Cluster installiert ist. Es kann mit dem folgenden Code als externes JAR hinzugefügt werden:
Python:
from pyspark . sql import SparkSession
spark = SparkSession . builder
. config ( "spark.jars.packages" , "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0" )
. getOrCreate ()
df = spark . read . format ( "bigquery" )
. load ( "dataset.table" )
Scala:
val spark = SparkSession .builder
.config( " spark.jars.packages " , " com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0 " )
.getOrCreate()
val df = spark.read.format( " bigquery " )
.load( " dataset.table " )
Falls der Spark-Cluster Scala 2.12 verwendet (optional für Spark 2.4.x, obligatorisch in 3.0.x), dann ist das relevante Paket com.google.cloud.spark:spark-bigquery-with-dependencies_ 2.12 :0.41.0. Um zu erfahren, welche Scala-Version verwendet wird, führen Sie bitte den folgenden Code aus:
Python:
spark . sparkContext . _jvm . scala . util . Properties . versionString ()
Scala:
scala . util . Properties . versionString
Sofern Sie nicht die implizite Scala-API spark.read.bigquery("TABLE_ID")
verwenden möchten, ist keine Kompilierung für den Connector erforderlich.
So fügen Sie den Connector in Ihr Projekt ein:
< dependency >
< groupId >com.google.cloud.spark</ groupId >
< artifactId >spark-bigquery-with-dependencies_${scala.version}</ artifactId >
< version >0.41.0</ version >
</ dependency >
libraryDependencies + = " com.google.cloud.spark " %% " spark-bigquery-with-dependencies " % " 0.41.0 "
Spark füllt viele Metriken aus, die der Endbenutzer auf der Spark-Verlaufsseite finden kann. Aber alle diese Metriken beziehen sich auf Funken und werden implizit erfasst, ohne dass Änderungen am Connector vorgenommen werden müssen. Es gibt jedoch nur wenige Metriken, die aus der BigQuery besiedelt sind und derzeit in den Anwendungsprotokollen sichtbar sind, die in den Treiber-/Ausführungs -Protokollen gelesen werden können.
Ab Spark 3.2 hat Spark die API zur Verfügung gestellt, um benutzerdefinierte Metriken auf der Spark UI -Seite https://spark.apache.org/docs/3.2.0/api/java/org/apache/spark/sql/connector/Metric aufzudecken. /Custommetric.html
Der Connector unter Verwendung dieser API zeigt derzeit die folgenden BigQuery -Metriken während des Lesens
<Styles> Tabelle#metricStable TD, Tabelle Th {Word-Break: Break-Wort} </style>Metrischer Name | Beschreibung |
---|---|
bytes read | Anzahl der BigQuery -Bytes gelesen |
rows read | Anzahl der BigQuery -Zeilen gelesen |
scan time | Die Zeit, die zwischen den Read -Zeilen -Antwort auf alle Ausführenden in Millisekunden aufgewendet wurde. |
parse time | Die Zeit, die für die Parsen der Reihen in allen Testamentsvollstreckern in Millisekunden aufgewendet wird. |
spark time | Die Zeit, die in Spark verbracht hat, um die Abfragen (dh abgesehen vom Scannen und Parsen) über alle Testamentsvollstrecker in Millisekunden zu verarbeiten. |
Hinweis: Um die Metriken auf der Funken-UI-Seite zu verwenden, müssen Sie sicherstellen, dass der spark-bigquery-metrics-0.41.0.jar
der Klassenpfad ist, bevor Sie mit dem Geschichte des Verlaufs beginnen, und die Anschlussversion ist spark-3.2
oder höher.
Siehe die BigQuery -Preisdokumentation.
Sie können die Anzahl der Partitionen manuell mit der maxParallelism
-Eigenschaft festlegen. BigQuery kann weniger Partitionen bieten, als Sie verlangen. Siehe Konfigurieren der Partitionierung.
Sie können auch nach dem Lesen in Spark immer wieder abgeben.
Wenn es zu viele Partitionen gibt, können die Kreaturen- oder Durchsatzquoten überschritten werden. Dies geschieht, da die Daten innerhalb jeder Partition seriell verarbeitet werden, unabhängige Partitionen parallel auf verschiedenen Knoten innerhalb des Spark -Clusters verarbeitet werden. Um einen maximalen anhaltenden Durchsatz sicherzustellen, sollten Sie im Allgemeinen eine Quoten erhöhen. Sie können jedoch auch die Anzahl der Partitionen reduzieren, indem Sie den coalesce
anrufen, um dieses Problem zu mildern.
desiredPartitionCount = 5
dfNew = df.coalesce(desiredPartitionCount)
dfNew.write
Eine Faustregel lautet, dass ein einzelner Partitionsgriff mindestens 1 GB Daten.
Beachten Sie auch, dass ein Job mit der writeAtLeastOnce
-Immobilie eingeschaltet wird.
Der Anschluss benötigt eine Instanz eines Googlecredentials, um eine Verbindung zu den BigQuery -APIs herzustellen. Es gibt mehrere Optionen, um es bereitzustellen:
GOOGLE_APPLICATION_CREDENTIALS
zu laden, wie hier beschrieben. // Globally
spark.conf.set("credentialsFile", "</path/to/key/file>")
// Per read/Write
spark.read.format("bigquery").option("credentialsFile", "</path/to/key/file>")
// Globally
spark.conf.set("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
// Per read/Write
spark.read.format("bigquery").option("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
gcpAccessTokenProvider
bereitgestellt werden. AccessTokenProvider
muss in Java oder einer anderen JVM -Sprache wie Scala oder Kotlin implementiert werden. Es muss entweder einen No-Arg-Konstruktor oder einen Konstruktor haben, der ein einzelnes Argument java.util.String
akzeptiert. Dieser Konfigurationsparameter kann mit der Option gcpAccessTokenProviderConfig
geliefert werden. Wenn dies nicht bereitgestellt wird, wird der No-Arg-Konstruktor aufgerufen. Das Glas, das die Implementierung enthält, sollte sich auf dem Klassenpfad des Clusters befinden. // Globally
spark.conf.set("gcpAccessTokenProvider", "com.example.ExampleAccessTokenProvider")
// Per read/Write
spark.read.format("bigquery").option("gcpAccessTokenProvider", "com.example.ExampleAccessTokenProvider")
Immobilienkonto -Imitieren können für einen bestimmten Benutzernamen und einen Gruppennamen oder für alle Benutzer standardmäßig unter Verwendung der folgenden Eigenschaften konfiguriert werden:
gcpImpersonationServiceAccountForUser_<USER_NAME>
(standardmäßig nicht festgelegt)
Das Service -Account -Imitieren für einen bestimmten Benutzer.
gcpImpersonationServiceAccountForGroup_<GROUP_NAME>
(standardmäßig nicht festgelegt)
Die Imitierung des Service -Kontos für eine bestimmte Gruppe.
gcpImpersonationServiceAccount
(standardmäßig nicht festgelegt)
Standard -Servicekonto Imitation für alle Benutzer.
Wenn eines der oben genannten Eigenschaften festgelegt wird, wird das angegebene Servicekonto durch die Generierung von kurzlebigen Anmeldeinformationen beim Zugriff auf BigQuery ausgeht.
Wenn mehr als eine Eigenschaft festgelegt wird, hat das mit dem Benutzernamen verbundene Servicekonto Vorrang vor dem Servicekonto, das dem Gruppennamen für einen passenden Benutzer und eine passende Gruppe zugeordnet ist, was wiederum Vorrang vor dem Stimmausfall -Imitieren von Service -Konto hat.
Für eine einfachere Anwendung, bei der keine Aktualisierung des Zugriffsantriebs erforderlich ist, besteht eine andere Alternative darin, das Zugriffs -Token als gcpAccessToken
-Konfigurationsoption zu übergeben. Sie können das Access-Token erhalten, indem Sie gcloud auth application-default print-access-token
ausführen.
// Globally
spark.conf.set("gcpAccessToken", "<access-token>")
// Per read/Write
spark.read.format("bigquery").option("gcpAccessToken", "<acccess-token>")
Wichtig: Der CredentialsProvider
und AccessTokenProvider
müssen in Java oder einer anderen JVM -Sprache wie Scala oder Kotlin implementiert werden. Das Glas, das die Implementierung enthält, sollte sich auf dem Klassenpfad des Clusters befinden.
Hinweis: Es sollte nur eine der oben genannten Optionen bereitgestellt werden.
So konfigurieren Sie die folgenden Optionen, um eine Verbindung zu einem Vorwärtsproxy herzustellen und die Benutzeranmeldeinformationen zu authentifizieren.
proxyAddress
: Adresse des Proxy -Servers. Der Proxy muss ein HTTP -Proxy sein, und die Adresse sollte im host:port
-Format sein.
proxyUsername
: Der Benutzername, mit dem eine Verbindung zum Proxy hergestellt wurde.
proxyPassword
: Das Kennwort, mit dem eine Verbindung zum Proxy hergestellt wird.
val df = spark.read.format("bigquery")
.option("proxyAddress", "http://my-proxy:1234")
.option("proxyUsername", "my-username")
.option("proxyPassword", "my-password")
.load("some-table")
Die gleichen Proxy -Parameter können auch weltweit mit wie folgt mit Sparks RunTimeConfig eingestellt werden:
spark.conf.set("proxyAddress", "http://my-proxy:1234")
spark.conf.set("proxyUsername", "my-username")
spark.conf.set("proxyPassword", "my-password")
val df = spark.read.format("bigquery")
.load("some-table")
Sie können Folgendes auch in der Hadoop -Konfiguration festlegen.
fs.gs.proxy.address
(ähnlich wie "proxyaddress"), fs.gs.proxy.username
(ähnlich wie "proxyusername") und fs.gs.proxy.password
(ähnlich wie "Proxypassword").
Wenn der gleiche Parameter auf mehreren Stellen eingestellt ist, lautet die Reihenfolge der Priorität wie folgt:
Option ("Schlüssel", "Wert")> spark.conf> Hadoop -Konfiguration