Le connecteur prend en charge la lecture des tables Google BigQuery dans les DataFrames de Spark et la réécriture des DataFrames dans BigQuery. Cela se fait en utilisant l'API Spark SQL Data Source pour communiquer avec BigQuery.
L'API Storage diffuse des données en parallèle directement depuis BigQuery via gRPC sans utiliser Google Cloud Storage comme intermédiaire.
Il présente un certain nombre d'avantages par rapport à l'utilisation du flux de lecture précédent basé sur l'exportation, qui devraient généralement conduire à de meilleures performances de lecture :
Il ne laisse aucun fichier temporaire dans Google Cloud Storage. Les lignes sont lues directement à partir des serveurs BigQuery à l'aide des formats de fil Arrow ou Avro.
La nouvelle API permet le filtrage des colonnes et des prédicats pour lire uniquement les données qui vous intéressent.
Étant donné que BigQuery s'appuie sur une banque de données en colonnes, il peut diffuser efficacement des données sans lire toutes les colonnes.
L'API Storage prend en charge la suppression arbitraire des filtres de prédicats. La version 0.8.0 bêta et supérieure du connecteur prend en charge le transfert de filtres arbitraires vers Bigquery.
Il existe un problème connu dans Spark qui ne permet pas le déploiement des filtres sur les champs imbriqués. Par exemple, les filtres comme address.city = "Sunnyvale"
ne seront pas transférés vers Bigquery.
L'API rééquilibre les enregistrements entre les lecteurs jusqu'à ce qu'ils soient tous terminés. Cela signifie que toutes les phases de la carte se termineront presque simultanément. Consultez cet article de blog pour découvrir comment le partitionnement dynamique est utilisé de la même manière dans Google Cloud Dataflow.
Voir Configuration du partitionnement pour plus de détails.
Suivez ces instructions.
Si vous ne disposez pas d'un environnement Apache Spark, vous pouvez créer un cluster Cloud Dataproc avec une authentification préconfigurée. Les exemples suivants supposent que vous utilisez Cloud Dataproc, mais vous pouvez utiliser spark-submit
sur n'importe quel cluster.
Tout cluster Dataproc utilisant l'API nécessite les étendues "bigquery" ou "cloud-platform". Les clusters Dataproc ont la portée « bigquery » par défaut. La plupart des clusters des projets activés devraient donc fonctionner par défaut, par exemple.
MY_CLUSTER=...
gcloud dataproc clusters create "$MY_CLUSTER"
La dernière version du connecteur est accessible au public via les liens suivants :
version | Lien |
---|---|
Étincelle 3.5 | gs://spark-lib/bigquery/spark-3.5-bigquery-0.41.0.jar (lien HTTP) |
Étincelle 3.4 | gs://spark-lib/bigquery/spark-3.4-bigquery-0.41.0.jar (lien HTTP) |
Étincelle 3.3 | gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar (lien HTTP) |
Étincelle 3.2 | gs://spark-lib/bigquery/spark-3.2-bigquery-0.41.0.jar (lien HTTP) |
Étincelle 3.1 | gs://spark-lib/bigquery/spark-3.1-bigquery-0.41.0.jar (lien HTTP) |
Étincelle 2.4 | gs://spark-lib/bigquery/spark-2.4-bigquery-0.37.0.jar (lien HTTP) |
Échelle 2.13 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-0.41.0.jar (lien HTTP) |
Échelle 2.12 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.41.0.jar (lien HTTP) |
Scala 2.11 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.29.0.jar (lien HTTP) |
Les six premières versions sont des connecteurs basés sur Java ciblant Spark 2.4/3.1/3.2/3.3/3.4/3.5 de toutes les versions Scala construites sur les nouvelles API Data Source (Data Source API v2) de Spark.
Les deux derniers connecteurs sont des connecteurs basés sur Scala, veuillez utiliser le fichier jar correspondant à votre installation Spark, comme indiqué ci-dessous.
Connecteur Étincelle | 2.3 | 2.4 | 3.0 | 3.1 | 3.2 | 3.3 | 3.4 | 3.5 |
---|---|---|---|---|---|---|---|---|
spark-3.5-bigquery | ✓ | |||||||
spark-3.4-bigquery | ✓ | ✓ | ||||||
spark-3.3-bigquery | ✓ | ✓ | ✓ | |||||
spark-3.2-bigquery | ✓ | ✓ | ✓ | ✓ | ||||
spark-3.1-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | |||
spark-2.4-bigquery | ✓ | |||||||
spark-bigquery-avec-dépendances_2.13 | ✓ | ✓ | ✓ | ✓ | ||||
spark-bigquery-avec-dépendances_2.12 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | |
spark-bigquery-avec-dépendances_2.11 | ✓ | ✓ |
Connecteur Image Dataproc | 1.3 | 1.4 | 1,5 | 2.0 | 2.1 | 2.2 | Sans serveur Image 1.0 | Sans serveur Image 2.0 | Sans serveur Image 2.1 | Sans serveur Image 2.2 |
---|---|---|---|---|---|---|---|---|---|---|
spark-3.5-bigquery | ✓ | ✓ | ||||||||
spark-3.4-bigquery | ✓ | ✓ | ✓ | |||||||
spark-3.3-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ||||
spark-3.2-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ||||
spark-3.1-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | |||
spark-2.4-bigquery | ✓ | ✓ | ||||||||
spark-bigquery-avec-dépendances_2.13 | ✓ | ✓ | ✓ | |||||||
spark-bigquery-avec-dépendances_2.12 | ✓ | ✓ | ✓ | ✓ | ✓ | |||||
spark-bigquery-avec-dépendances_2.11 | ✓ | ✓ |
Le connecteur est également disponible depuis le référentiel Maven Central. Il peut être utilisé à l'aide de l'option --packages
ou de la propriété de configuration spark.jars.packages
. Utilisez la valeur suivante
version | Artefact de connecteur |
---|---|
Étincelle 3.5 | com.google.cloud.spark:spark-3.5-bigquery:0.41.0 |
Étincelle 3.4 | com.google.cloud.spark:spark-3.4-bigquery:0.41.0 |
Étincelle 3.3 | com.google.cloud.spark:spark-3.3-bigquery:0.41.0 |
Étincelle 3.2 | com.google.cloud.spark:spark-3.2-bigquery:0.41.0 |
Étincelle 3.1 | com.google.cloud.spark:spark-3.1-bigquery:0.41.0 |
Étincelle 2.4 | com.google.cloud.spark:spark-2.4-bigquery:0.37.0 |
Échelle 2.13 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.13:0.41.0 |
Échelle 2.12 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0 |
Scala 2.11 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.29.0 |
Les clusters Dataproc créés à l'aide de l'image 2.1 et ultérieure, ou les lots utilisant le service sans serveur Dataproc, sont livrés avec le connecteur Spark BigQuery intégré. L'utilisation des standards --jars
ou --packages
(ou alternativement, la configuration spark.jars
/ spark.jars.packages
) n'aidera pas dans ce cas car le connecteur intégré est prioritaire.
Pour utiliser une autre version que celle intégrée, veuillez effectuer l'une des opérations suivantes :
--metadata SPARK_BQ_CONNECTOR_VERSION=0.41.0
, ou --metadata SPARK_BQ_CONNECTOR_URL=gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar
pour créer le cluster avec un fichier jar différent. L'URL peut pointer vers n'importe quel JAR de connecteur valide pour la version Spark du cluster.--properties dataproc.sparkBqConnector.version=0.41.0
ou --properties dataproc.sparkBqConnector.uri=gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar
pour créer le lot avec un pot différent. L'URL peut pointer vers n'importe quel JAR de connecteur valide pour la version Spark du runtime. Vous pouvez exécuter un simple décompte de mots PySpark sur l'API sans compilation en exécutant
Image Dataproc 1.5 et versions ultérieures
gcloud dataproc jobs submit pyspark --cluster "$MY_CLUSTER"
--jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.41.0.jar
examples/python/shakespeare.py
Image Dataproc 1.4 et versions antérieures
gcloud dataproc jobs submit pyspark --cluster "$MY_CLUSTER"
--jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.29.0.jar
examples/python/shakespeare.py
https://codelabs.developers.google.com/codelabs/pyspark-bigquery
Le connecteur utilise l'API multilingue Spark SQL Data Source :
df = spark.read
.format("bigquery")
.load("bigquery-public-data.samples.shakespeare")
ou l'API implicite Scala uniquement :
import com.google.cloud.spark.bigquery._
val df = spark.read.bigquery("bigquery-public-data.samples.shakespeare")
Pour plus d’informations, consultez des exemples de code supplémentaires en Python, Scala et Java.
Le connecteur vous permet d'exécuter n'importe quelle requête SQL SELECT standard sur BigQuery et de récupérer ses résultats directement dans un Spark Dataframe. Cela se fait facilement, comme décrit dans l'exemple de code suivant :
spark.conf.set("viewsEnabled","true")
spark.conf.set("materializationDataset","<dataset>")
sql = """
SELECT tag, COUNT(*) c
FROM (
SELECT SPLIT(tags, '|') tags
FROM `bigquery-public-data.stackoverflow.posts_questions` a
WHERE EXTRACT(YEAR FROM creation_date)>=2014
), UNNEST(tags) tag
GROUP BY 1
ORDER BY 2 DESC
LIMIT 10
"""
df = spark.read.format("bigquery").load(sql)
df.show()
Ce qui donne le résultat
+----------+-------+
| tag| c|
+----------+-------+
|javascript|1643617|
| python|1352904|
| java|1218220|
| android| 913638|
| php| 911806|
| c#| 905331|
| html| 769499|
| jquery| 608071|
| css| 510343|
| c++| 458938|
+----------+-------+
Une deuxième option consiste à utiliser l'option query
comme ceci :
df = spark.read.format("bigquery").option("query", sql).load()
Notez que l'exécution devrait être plus rapide car seul le résultat est transmis par fil. De la même manière, les requêtes peuvent inclure des JOIN plus efficacement que l'exécution de jointures sur Spark ou utiliser d'autres fonctionnalités BigQuery telles que des sous-requêtes, des fonctions définies par l'utilisateur BigQuery, des tables génériques, BigQuery ML, etc.
Afin d'utiliser cette fonctionnalité, les configurations suivantes DOIVENT être définies :
viewsEnabled
doit être défini sur true
.materializationDataset
doit être défini sur un ensemble de données pour lequel l'utilisateur GCP dispose de l'autorisation de création de table. materializationProject
est facultatif. Remarque : Comme mentionné dans la documentation BigQuery, les tables interrogées doivent se trouver au même emplacement que materializationDataset
. De plus, si les tables de l' SQL statement
proviennent de projets autres que parentProject
, utilisez le nom de table complet, c'est-à-dire [project].[dataset].[table]
.
Important : Cette fonctionnalité est implémentée en exécutant la requête sur BigQuery et en enregistrant le résultat dans une table temporaire, à partir de laquelle Spark lira les résultats. Cela peut entraîner des coûts supplémentaires sur votre compte BigQuery.
Le connecteur dispose d'une prise en charge préliminaire de la lecture à partir des vues BigQuery. Veuillez noter qu'il y a quelques mises en garde :
collect()
ou count()
.materializationProject
et materializationDataset
. Ces options peuvent également être définies globalement en appelant spark.conf.set(...)
avant de lire les vues..option("viewsEnabled", "true")
) ou définissez-la globalement en appelant spark.conf.set("viewsEnabled", "true")
.materializationDataset
doit se trouver au même emplacement que la vue.L'écriture de DataFrames dans BigQuery peut être effectuée à l'aide de deux méthodes : directe et indirecte.
Dans cette méthode, les données sont écrites directement dans BigQuery à l'aide de l'API BigQuery Storage Write. Afin d'activer cette option, veuillez définir l'option writeMethod
sur direct
, comme indiqué ci-dessous :
df.write
.format("bigquery")
.option("writeMethod", "direct")
.save("dataset.table")
L'écriture dans des tables partitionnées existantes (partitionnée par date, partitionnement par heure d'ingestion et partitionnement par plage) en mode de sauvegarde APPEND et en mode OVERWRITE (partitionné par date et plage uniquement) est entièrement prise en charge par le connecteur et l'API d'écriture BigQuery Storage. L'utilisation de datePartition
, partitionField
, partitionType
, partitionRangeStart
, partitionRangeEnd
, partitionRangeInterval
décrite ci-dessous n'est pas prise en charge pour le moment par la méthode d'écriture directe.
Important : Veuillez vous référer à la page de tarification de l'ingestion de données concernant la tarification de l'API BigQuery Storage Write.
Important : Veuillez utiliser la version 0.24.2 et supérieure pour les écritures directes, car les versions précédentes comportent un bug pouvant entraîner une suppression de table dans certains cas.
Dans cette méthode, les données sont d'abord écrites dans GCS, puis chargées dans BigQuery. Un compartiment GCS doit être configuré pour indiquer l'emplacement temporaire des données.
df.write
.format("bigquery")
.option("temporaryGcsBucket","some-bucket")
.save("dataset.table")
Les données sont stockées temporairement aux formats Apache Parquet, Apache ORC ou Apache Avro.
Le compartiment GCS et le format peuvent également être définis globalement à l'aide de RuntimeConfig de Spark comme ceci :
spark.conf.set("temporaryGcsBucket","some-bucket")
df.write
.format("bigquery")
.save("dataset.table")
Lors du streaming d'un DataFrame vers BigQuery, chaque lot est écrit de la même manière qu'un DataFrame sans streaming. Notez qu'un emplacement de point de contrôle compatible HDFS (par exemple : path/to/HDFS/dir
ou gs://checkpoint-bucket/checkpointDir
) doit être spécifié.
df.writeStream
.format("bigquery")
.option("temporaryGcsBucket","some-bucket")
.option("checkpointLocation", "some-location")
.option("table", "dataset.table")
Important : Le connecteur ne configure pas le connecteur GCS, afin d'éviter tout conflit avec un autre connecteur GCS, s'il existe. Afin d'utiliser les capacités d'écriture du connecteur, veuillez configurer le connecteur GCS sur votre cluster comme expliqué ici.
L'API prend en charge un certain nombre d'options pour configurer la lecture
<style> table#propertytable td, table th { word-break:break-word } </style>Propriété | Signification | Usage |
---|---|---|
table | La table BigQuery au format [[project:]dataset.]table . Il est recommandé d'utiliser plutôt le paramètre path de load() / save() . Cette option est obsolète et sera supprimée dans une prochaine version.(Obsolète) | Lire/écrire |
dataset | L'ensemble de données contenant la table. Cette option doit être utilisée avec les tables et vues standard, mais pas lors du chargement des résultats de requête. (Facultatif sauf si omis dans table ) | Lire/écrire |
project | ID de projet Google Cloud de la table. Cette option doit être utilisée avec les tables et vues standard, mais pas lors du chargement des résultats de requête. (Facultatif. La valeur par défaut est le projet du compte de service utilisé) | Lire/écrire |
parentProject | ID de projet Google Cloud de la table à facturer pour l'exportation. (Facultatif. La valeur par défaut est le projet du compte de service utilisé) | Lire/écrire |
maxParallelism | Le nombre maximal de partitions dans lesquelles diviser les données. Le nombre réel peut être inférieur si BigQuery estime que les données sont suffisamment petites. S'il n'y a pas assez d'exécuteurs pour planifier un lecteur par partition, certaines partitions peuvent être vides. Important : L'ancien paramètre ( parallelism ) est toujours pris en charge mais en mode obsolète. Il sera supprimé dans la version 1.0 du connecteur.(Facultatif. La valeur par défaut est la valeur la plus élevée entre PreferredMinParallelism et 20 000).) | Lire |
preferredMinParallelism | Nombre minimal préféré de partitions dans lesquelles diviser les données. Le nombre réel peut être inférieur si BigQuery estime que les données sont suffisamment petites. S'il n'y a pas assez d'exécuteurs pour planifier un lecteur par partition, certaines partitions peuvent être vides. (Facultatif. La valeur par défaut est la plus petite de 3 fois le parallélisme et maxParallelism par défaut de l'application.) | Lire |
viewsEnabled | Permet au connecteur de lire à partir de vues et pas seulement de tables. Veuillez lire la section correspondante avant d'activer cette option. (Facultatif. La valeur par défaut est false ) | Lire |
materializationProject | L'identifiant du projet où la vue matérialisée va être créée (Facultatif. Par défaut, l'ID du projet est affiché) | Lire |
materializationDataset | L'ensemble de données dans lequel la vue matérialisée va être créée. Cet ensemble de données doit se trouver au même emplacement que la vue ou les tables interrogées. (Facultatif. Par défaut, l'ensemble de données d'affichage est affiché) | Lire |
materializationExpirationTimeInMinutes | Le délai d'expiration de la table temporaire contenant les données matérialisées d'une vue ou d'une requête, en minutes. Notez que le connecteur peut réutiliser la table temporaire en raison de l'utilisation du cache local et afin de réduire le calcul BigQuery. Des valeurs très faibles peuvent donc provoquer des erreurs. La valeur doit être un entier positif. (Facultatif. La valeur par défaut est 1440 ou 24 heures) | Lire |
readDataFormat | Format de données pour la lecture à partir de BigQuery. Options : ARROW , AVRO (Facultatif. La valeur par défaut est ARROW ) | Lire |
optimizedEmptyProjection | Le connecteur utilise une logique de projection vide optimisée (sélection sans aucune colonne), utilisée pour l'exécution count() . Cette logique prend les données directement à partir des métadonnées de la table ou effectue un « SELECT COUNT(*) WHERE... » très efficace au cas où il y aurait un filtre. Vous pouvez annuler l'utilisation de cette logique en définissant cette option sur false .(Facultatif, la valeur par défaut est true ) | Lire |
pushAllFilters | S'il est défini sur true , le connecteur transmet tous les filtres que Spark peut déléguer à l'API BigQuery Storage. Cela réduit la quantité de données qui doivent être envoyées depuis les serveurs de l'API BigQuery Storage vers les clients Spark. Cette option est obsolète et sera supprimée dans une prochaine version.(Facultatif, la valeur par défaut est true )(Obsolète) | Lire |
bigQueryJobLabel | Peut être utilisé pour ajouter des étiquettes à la requête lancée par le connecteur et charger des tâches BigQuery. Plusieurs étiquettes peuvent être définies. (Facultatif) | Lire |
bigQueryTableLabel | Peut être utilisé pour ajouter des étiquettes au tableau lors de l’écriture dans un tableau. Plusieurs étiquettes peuvent être définies. (Facultatif) | Écrire |
traceApplicationName | Nom de l'application utilisé pour tracer les sessions de lecture et d'écriture BigQuery Storage. La définition du nom de l'application est requise pour définir l'ID de trace sur les sessions. (Facultatif) | Lire |
traceJobId | ID de tâche utilisé pour tracer les sessions de lecture et d'écriture BigQuery Storage. (Facultatif, la valeur par défaut est l'ID de tâche Dataproc, sinon l'ID de l'application Spark est utilisé.) | Lire |
createDisposition | Spécifie si le travail est autorisé à créer de nouvelles tables. Les valeurs autorisées sont :
(Facultatif. Par défaut : CREATE_IF_NEEDED). | Écrire |
writeMethod | Contrôle la méthode dans laquelle les données sont écrites dans BigQuery. Les valeurs disponibles sont direct pour utiliser l'API BigQuery Storage Write et indirect qui écrivent d'abord les données dans GCS, puis déclenchent une opération de chargement BigQuery. En voir plus ici(Facultatif, par défaut indirect ) | Écrire |
writeAtLeastOnce | Garantit que les données sont écrites dans BigQuery au moins une fois. Il s’agit d’une garantie moindre qu’exactement une fois. Cela convient aux scénarios de streaming dans lesquels les données sont écrites en continu par petits lots. (Facultatif. La valeur par défaut est false )Pris en charge uniquement par la méthode d'écriture `DIRECT` et le mode n'est PAS `Overwrite`. | Écrire |
temporaryGcsBucket | Bucket GCS qui contient temporairement les données avant qu'elles ne soient chargées dans BigQuery. Obligatoire sauf si défini dans la configuration Spark ( spark.conf.set(...) ).Non pris en charge par la méthode d'écriture `DIRECT`. | Écrire |
persistentGcsBucket | Bucket GCS qui contient les données avant leur chargement dans BigQuery. Si vous en êtes informé, les données ne seront pas supprimées après leur écriture dans BigQuery. Non pris en charge par la méthode d'écriture `DIRECT`. | Écrire |
persistentGcsPath | Chemin GCS qui contient les données avant leur chargement dans BigQuery. Utilisé uniquement avec persistentGcsBucket .Non pris en charge par la méthode d'écriture `DIRECT`. | Écrire |
intermediateFormat | Format des données avant leur chargement dans BigQuery. Les valeurs peuvent être "parquet", "orc" ou "avro". Afin d'utiliser le format Avro, le package spark-avro doit être ajouté au runtime. (Facultatif. La valeur par défaut est parquet ). En écriture uniquement. Pris en charge uniquement pour la méthode d'écriture `INDIRECT`. | Écrire |
useAvroLogicalTypes | Lors du chargement depuis Avro (`.option("intermediateFormat", "avro")`), BigQuery utilise les types Avro sous-jacents au lieu des types logiques [par défaut](https://cloud.google.com/bigquery/docs/ chargement-données-cloud-storage-avro#logical_types). La fourniture de cette option convertit les types logiques Avro en leurs types de données BigQuery correspondants. (Facultatif. La valeur par défaut est false ). En écriture uniquement. | Écrire |
datePartition | La partition de date sur laquelle les données vont être écrites. Doit être une chaîne de date donnée au format YYYYMMDD . Peut être utilisé pour écraser les données d'une seule partition, comme ceci :
(Facultatif). En écriture uniquement. Peut également être utilisé avec différents types de partitions comme : HEURE : YYYYMMDDHH MOIS : YYYYMM ANNÉE : YYYY Non pris en charge par la méthode d'écriture `DIRECT`. | Écrire |
partitionField | Si ce champ est spécifié, la table est partitionnée par ce champ. Pour le partitionnement temporel, spécifiez avec l'option `partitionType`. Pour le partitionnement par plages entières, spécifiez avec les 3 options : `partitionRangeStart`, `partitionRangeEnd, `partitionRangeInterval`. Le champ doit être un champ TIMESTAMP ou DATE de niveau supérieur pour le partitionnement temporel, ou INT64 pour le partitionnement par plage d'entiers. Son mode doit être NULLABLE ou REQUIRED . Si l'option n'est pas définie pour une table partitionnée par heure, alors la table sera partitionnée par pseudo-colonne, référencée soit via '_PARTITIONTIME' as TIMESTAMP , soit '_PARTITIONDATE' as DATE .(Facultatif). Non pris en charge par la méthode d'écriture `DIRECT`. | Écrire |
partitionExpirationMs | Nombre de millisecondes pendant lesquelles conserver le stockage des partitions dans la table. Le stockage dans une partition aura un délai d'expiration correspondant à son temps de partition plus cette valeur. (Facultatif). Non pris en charge par la méthode d'écriture `DIRECT`. | Écrire |
partitionType | Utilisé pour spécifier le partitionnement temporel. Les types pris en charge sont : HOUR, DAY, MONTH, YEAR Cette option est obligatoire pour qu'une table cible soit partitionnée en temps. (Facultatif. La valeur par défaut est DAY si PartitionField est spécifié). Non pris en charge par la méthode d'écriture `DIRECT`. | Écrire |
partitionRangeStart , partitionRangeEnd , partitionRangeInterval | Utilisé pour spécifier le partitionnement par plages entières. Ces options sont obligatoires pour qu'une table cible soit partitionnée en plages d'entiers. Les 3 options doivent être spécifiées. Non pris en charge par la méthode d'écriture `DIRECT`. | Écrire |
clusteredFields | Chaîne de colonnes de niveau supérieur non répétées, séparées par une virgule. (Facultatif). | Écrire |
allowFieldAddition | Ajoute ALLOW_FIELD_ADDITION SchemaUpdateOption au BigQuery LoadJob. Les valeurs autorisées sont true et false .(Facultatif. La valeur par défaut est false ).Pris en charge uniquement par la méthode d'écriture `INDIRECT`. | Écrire |
allowFieldRelaxation | Ajoute ALLOW_FIELD_RELAXATION SchemaUpdateOption au BigQuery LoadJob. Les valeurs autorisées sont true et false .(Facultatif. La valeur par défaut est false ).Pris en charge uniquement par la méthode d'écriture `INDIRECT`. | Écrire |
proxyAddress | Adresse du serveur proxy. Le proxy doit être un proxy HTTP et l'adresse doit être au format « hôte : port ». Peut être défini alternativement dans la configuration Spark ( spark.conf.set(...) ) ou dans la configuration Hadoop ( fs.gs.proxy.address ).(Facultatif. Obligatoire uniquement si vous vous connectez à GCP via un proxy.) | Lire/écrire |
proxyUsername | Le nom d'utilisateur utilisé pour se connecter au proxy. Peut être défini alternativement dans la configuration Spark ( spark.conf.set(...) ) ou dans la configuration Hadoop ( fs.gs.proxy.username ).(Facultatif. Obligatoire uniquement si vous vous connectez à GCP via un proxy avec authentification.) | Lire/écrire |
proxyPassword | Le mot de passe utilisé pour se connecter au proxy. Peut être défini alternativement dans la configuration Spark ( spark.conf.set(...) ) ou dans la configuration Hadoop ( fs.gs.proxy.password ).(Facultatif. Obligatoire uniquement si vous vous connectez à GCP via un proxy avec authentification.) | Lire/écrire |
httpMaxRetry | Nombre maximal de tentatives pour les requêtes HTTP de bas niveau adressées à BigQuery. Peut également être défini dans la configuration Spark ( spark.conf.set("httpMaxRetry", ...) ) ou dans la configuration Hadoop ( fs.gs.http.max.retry ).(Facultatif. La valeur par défaut est 10) | Lire/écrire |
httpConnectTimeout | Délai d'expiration en millisecondes pour établir une connexion avec BigQuery. Peut être défini alternativement dans la configuration Spark ( spark.conf.set("httpConnectTimeout", ...) ) ou dans la configuration Hadoop ( fs.gs.http.connect-timeout ).(Facultatif. La valeur par défaut est 60 000 ms. 0 pour un délai d'attente infini, un nombre négatif pour 20 000) | Lire/écrire |
httpReadTimeout | Délai d'expiration en millisecondes pour lire les données d'une connexion établie. Peut être défini alternativement dans la configuration Spark ( spark.conf.set("httpReadTimeout", ...) ) ou dans la configuration Hadoop ( fs.gs.http.read-timeout ).(Facultatif. La valeur par défaut est 60 000 ms. 0 pour un délai d'attente infini, un nombre négatif pour 20 000) | Lire |
arrowCompressionCodec | Codec de compression lors de la lecture d'une table BigQuery lors de l'utilisation du format Arrow. Options : ZSTD (Zstandard compression) , LZ4_FRAME (https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md) , COMPRESSION_UNSPECIFIED . Le codec de compression recommandé est ZSTD lors de l'utilisation de Java.(Facultatif. La valeur par défaut est COMPRESSION_UNSPECIFIED , ce qui signifie qu'aucune compression ne sera utilisée) | Lire |
responseCompressionCodec | Codec de compression utilisé pour compresser les données ReadRowsResponse. Options : RESPONSE_COMPRESSION_CODEC_UNSPECIFIED , RESPONSE_COMPRESSION_CODEC_LZ4 (Facultatif. La valeur par défaut est RESPONSE_COMPRESSION_CODEC_UNSPECIFIED , ce qui signifie qu'aucune compression ne sera utilisée) | Lire |
cacheExpirationTimeInMinutes | Délai d'expiration du cache en mémoire stockant les informations de requête. Pour désactiver la mise en cache, définissez la valeur sur 0. (Facultatif. La valeur par défaut est 15 minutes) | Lire |
enableModeCheckForSchemaFields | Vérifie que le mode de chaque champ du schéma de destination est égal au mode du schéma de champ source correspondant, lors de l'écriture DIRECTe. La valeur par défaut est vraie, c'est-à-dire que la vérification est effectuée par défaut. Si la valeur est false, la vérification du mode est ignorée. | Écrire |
enableListInference | Indique s'il faut utiliser l'inférence de schéma spécifiquement lorsque le mode est Parquet (https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions). La valeur par défaut est false. | Écrire |
bqChannelPoolSize | Taille (fixe) du pool de canaux gRPC créé par BigQueryReadClient. Pour des performances optimales, ce paramètre doit être défini sur au moins le nombre de cœurs sur les exécuteurs du cluster. | Lire |
createReadSessionTimeoutInSeconds | Le délai d'attente en secondes pour créer une ReadSession lors de la lecture d'une table. Pour une table extrêmement grande, cette valeur doit être augmentée. (Facultatif. La valeur par défaut est 600 secondes) | Lire |
queryJobPriority | Niveaux de priorité définis pour la tâche lors de la lecture des données d'une requête BigQuery. Les valeurs autorisées sont :
(Facultatif. La valeur par défaut est INTERACTIVE ) | Lire/écrire |
destinationTableKmsKeyName | Décrit la clé de chiffrement Cloud KMS qui sera utilisée pour protéger la table BigQuery de destination. Le compte de service BigQuery associé à votre projet nécessite l'accès à cette clé de chiffrement. pour plus d'informations sur l'utilisation de CMEK avec BigQuery, voir [ici](https://cloud.google.com/bigquery/docs/customer-managed-encryption#key_resource_id). Remarque : La table sera chiffrée par la clé uniquement si elle a été créée par le connecteur. Une table non chiffrée préexistante ne sera pas chiffrée simplement en définissant cette option. (Facultatif) | Écrire |
allowMapTypeConversion | Configuration booléenne pour désactiver la conversion des enregistrements BigQuery vers Spark MapType lorsque l'enregistrement comporte deux sous-champs avec des noms de champ comme key et value . La valeur par défaut est true ce qui permet la conversion.(Facultatif) | Lire |
spark.sql.sources.partitionOverwriteMode | Configuration pour spécifier le mode d'écrasement lors de l'écriture lorsque la table est partitionnée en plage/heure. Actuellement pris en charge deux modes : STATIC et DYNAMIC . En mode STATIC , la table entière est écrasée. En mode DYNAMIC , les données sont écrasées par les partitions de la table existante. La valeur par défaut est STATIC .(Facultatif) | Écrire |
enableReadSessionCaching | Configuration booléenne pour désactiver la mise en cache des sessions de lecture. Met en cache les sessions de lecture BigQuery pour permettre une planification plus rapide des requêtes Spark. La valeur par défaut est true .(Facultatif) | Lire |
readSessionCacheDurationMins | Configuration pour définir la durée de mise en cache de la session de lecture en minutes. Fonctionne uniquement si enableReadSessionCaching est true (par défaut). Permet de spécifier la durée de mise en cache des sessions de lecture. La valeur maximale autorisée est 300 . La valeur par défaut est 5 .(Facultatif) | Lire |
bigQueryJobTimeoutInMinutes | Configuration pour définir le délai d'expiration de la tâche BigQuery en minutes. La valeur par défaut est 360 minutes.(Facultatif) | Lire/écrire |
snapshotTimeMillis | Un horodatage spécifié en millisecondes à utiliser pour lire un instantané de table. Par défaut, cela n'est pas défini et la dernière version d'une table est lue. (Facultatif) | Lire |
bigNumericDefaultPrecision | Précision par défaut alternative pour les champs BigNumeric, car la valeur par défaut de BigQuery est trop large pour Spark. Les valeurs peuvent être comprises entre 1 et 38. Cette valeur par défaut est utilisée uniquement lorsque le champ a un type BigNumeric non paramétré. Veuillez noter qu'il peut y avoir une perte de données si la précision des données réelles est supérieure à celle spécifiée. (Facultatif) | Lire/écrire |
bigNumericDefaultScale | Une échelle par défaut alternative pour les champs BigNumeric. Les valeurs peuvent être comprises entre 0 et 38 et inférieures à bigNumericFieldsPrecision. Cette valeur par défaut est utilisée uniquement lorsque le champ a un type BigNumeric non paramétré. Veuillez noter qu'il peut y avoir une perte de données si l'échelle des données réelles est supérieure à celle spécifiée. (Facultatif) | Lire/écrire |
Les options peuvent également être définies en dehors du code, à l'aide du paramètre --conf
de spark-submit
ou du paramètre --properties
de gcloud dataproc submit spark
. Pour l'utiliser, ajoutez le préfixe spark.datasource.bigquery.
à l'une des options, par exemple spark.conf.set("temporaryGcsBucket", "some-bucket")
peut également être défini comme --conf spark.datasource.bigquery.temporaryGcsBucket=some-bucket
.
À l'exception de DATETIME
et TIME
tous les types de données BigQuery dirigés sont mappés dans le type de données Spark SQL correspondant. Voici tous les mappages :
Type de données SQL standard BigQuery | Spark SQL Type de données | Remarques |
BOOL | BooleanType | |
INT64 | LongType | |
FLOAT64 | DoubleType | |
NUMERIC | DecimalType | Veuillez vous référer à la prise en charge numérique et BigNumeric |
BIGNUMERIC | DecimalType | Veuillez vous référer à la prise en charge numérique et BigNumeric |
STRING | StringType | |
BYTES | BinaryType | |
STRUCT | StructType | |
ARRAY | ArrayType | |
TIMESTAMP | TimestampType | |
DATE | DateType | |
DATETIME | StringType , TimestampNTZType * | Spark n'a pas de type DATETIME. La chaîne Spark peut être écrite dans une colonne BQ DATETIME existante à condition qu'elle soit au format des littéraux BQ DATETIME. * Pour Spark 3.4+, BQ DATETIME est lu comme le type TimestampNTZ de Spark, c'est-à-dire java LocalDateTime |
TIME | LongType , StringType * | Spark n'a pas de type TIME. Les longs générés, qui indiquent les microsecondes depuis minuit, peuvent être convertis en toute sécurité en TimestampType, mais cela entraîne la déduction de la date comme étant le jour en cours. Ainsi, les temps sont laissés aussi longs et l'utilisateur peut lancer s'il le souhaite. Lors de la diffusion vers Timestamp TIME, les mêmes problèmes de fuseau horaire que DATETIME * La chaîne Spark peut être écrite dans une colonne BQ TIME existante à condition qu'elle soit au format pour les littéraux BQ TIME. |
JSON | StringType | Spark n'a pas de type JSON. Les valeurs sont lues sous forme de chaîne. Afin de réécrire du JSON dans BigQuery, les conditions suivantes sont REQUISES :
|
ARRAY<STRUCT<key,value>> | MapType | BigQuery n'a pas de type MAP. Par conséquent, à l'instar d'autres conversions telles que les tâches Apache Avro et BigQuery Load, le connecteur convertit une Spark Map en REPEATED STRUCT<key,value>. Cela signifie que même si l'écriture et la lecture de cartes sont disponibles, l'exécution d'un code SQL sur BigQuery utilisant la sémantique des cartes n'est pas prise en charge. Pour faire référence aux valeurs de la carte à l'aide de BigQuery SQL, veuillez consulter la documentation BigQuery. En raison de ces incompatibilités, quelques restrictions s'appliquent :
|
Les Spark ML Vector et Matrix sont pris en charge, y compris leurs versions denses et clairsemées. Les données sont enregistrées sous forme d'enregistrement BigQuery. Notez qu'un suffixe est ajouté à la description du champ qui inclut le type d'étincelle du champ.
Pour écrire ces types dans BigQuery, utilisez le format intermédiaire ORC ou Avro et placez-les comme colonne de la ligne (c'est-à-dire pas comme champ dans une structure).
BigNumeric de BigQuery a une précision de 76,76 (le 77e chiffre est partiel) et une échelle de 38. Étant donné que cette précision et cette échelle sont au-delà de la prise en charge de DecimalType (échelle 38 et précision 38) de Spark, cela signifie que les champs BigNumeric avec une précision supérieure à 38 ne peuvent pas être utilisés. . Une fois cette limitation Spark mise à jour, le connecteur sera mis à jour en conséquence.
La conversion Spark Decimal/BigQuery Numeric tente de conserver le paramétrage du type, c'est-à-dire que NUMERIC(10,2)
sera converti en Decimal(10,2)
et vice versa. Notez cependant qu'il existe des cas où les paramètres sont perdus. Cela signifie que les paramètres seront rétablis aux valeurs par défaut - NUMERIC (38,9) et BIGNUMERIC (76,38). Cela signifie qu'à l'heure actuelle, la lecture BigNumeric n'est prise en charge qu'à partir d'une table standard, mais pas à partir de la vue BigQuery ni lors de la lecture de données à partir d'une requête BigQuery.
Le connecteur calcule automatiquement les colonnes et filtre l'instruction SELECT
du DataFrame, par exemple
spark.read.bigquery("bigquery-public-data:samples.shakespeare")
.select("word")
.where("word = 'Hamlet' or word = 'Claudius'")
.collect()
filtre le word
de la colonne et pousse vers le bas le prédicat filtre word = 'hamlet' or word = 'Claudius'
.
Si vous ne souhaitez pas envoyer plusieurs requêtes de lecture à BigQuery, vous pouvez mettre en cache le DataFrame avant de filtrer, par exemple :
val cachedDF = spark.read.bigquery("bigquery-public-data:samples.shakespeare").cache()
val rows = cachedDF.select("word")
.where("word = 'Hamlet'")
.collect()
// All of the table was cached and this doesn't require an API call
val otherRows = cachedDF.select("word_count")
.where("word = 'Romeo'")
.collect()
Vous pouvez également spécifier manuellement l'option filter
, qui remplacera le refoulement automatique et Spark effectuera le reste du filtrage dans le client.
Les pseudo-colonnes _PARTITIONDATE et _PARTITIONTIME ne font pas partie du schéma de la table. Par conséquent, pour interroger les partitions des tables partitionnées, n'utilisez pas la méthode Where() présentée ci-dessus. Ajoutez plutôt une option de filtre de la manière suivante :
val df = spark.read.format("bigquery")
.option("filter", "_PARTITIONDATE > '2019-01-01'")
...
.load(TABLE)
Par défaut, le connecteur crée une partition tous les 400 Mo dans la table en cours de lecture (avant filtrage). Cela devrait correspondre à peu près au nombre maximal de lecteurs pris en charge par l'API BigQuery Storage. Cela peut être configuré explicitement avec la propriété maxParallelism
. BigQuery peut limiter le nombre de partitions en fonction des contraintes du serveur.
Afin de prendre en charge le suivi de l'utilisation des ressources BigQuery, les connecteurs proposent les options suivantes pour baliser les ressources BigQuery :
Le connecteur peut lancer des tâches de chargement et de requête BigQuery. L'ajout d'étiquettes aux travaux s'effectue de la manière suivante :
spark.conf.set("bigQueryJobLabel.cost_center", "analytics")
spark.conf.set("bigQueryJobLabel.usage", "nightly_etl")
Cela créera des étiquettes cost_center
= analytics
et usage
= nightly_etl
.
Utilisé pour annoter les sessions de lecture et d'écriture. L’ID de trace est au format Spark:ApplicationName:JobID
. Il s'agit d'une option opt-in et pour l'utiliser, l'utilisateur doit définir la propriété traceApplicationName
. JobID est généré automatiquement par l'ID de tâche Dataproc, avec un repli vers l'ID d'application Spark (tel que application_1648082975639_0001
). L'ID de travail peut être remplacé en définissant l'option traceJobId
. Notez que la longueur totale de l’ID de trace ne peut pas dépasser 256 caractères.
Le connecteur peut être utilisé dans les notebooks Jupyter même s'il n'est pas installé sur le cluster Spark. Il peut être ajouté en tant que fichier jar externe en utilisant le code suivant :
Python:
from pyspark . sql import SparkSession
spark = SparkSession . builder
. config ( "spark.jars.packages" , "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0" )
. getOrCreate ()
df = spark . read . format ( "bigquery" )
. load ( "dataset.table" )
Échelle :
val spark = SparkSession .builder
.config( " spark.jars.packages " , " com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0 " )
.getOrCreate()
val df = spark.read.format( " bigquery " )
.load( " dataset.table " )
Dans le cas où le cluster Spark utilise Scala 2.12 (c'est facultatif pour Spark 2.4.x, obligatoire dans 3.0.x), alors le package correspondant est com.google.cloud.spark:spark-bigquery-with-dependencies_ 2.12 :0.41.0. Afin de savoir quelle version de Scala est utilisée, veuillez exécuter le code suivant :
Python:
spark . sparkContext . _jvm . scala . util . Properties . versionString ()
Échelle :
scala . util . Properties . versionString
Sauf si vous souhaitez utiliser l'API Scala implicite spark.read.bigquery("TABLE_ID")
, il n'est pas nécessaire de compiler avec le connecteur.
Pour inclure le connecteur dans votre projet :
< dependency >
< groupId >com.google.cloud.spark</ groupId >
< artifactId >spark-bigquery-with-dependencies_${scala.version}</ artifactId >
< version >0.41.0</ version >
</ dependency >
libraryDependencies + = " com.google.cloud.spark " %% " spark-bigquery-with-dependencies " % " 0.41.0 "
Spark renseigne de nombreuses métriques qui peuvent être trouvées par l'utilisateur final dans la page d'historique Spark. Mais toutes ces métriques sont liées aux étincelles et sont implicitement collectées sans aucun changement de la part du connecteur. Mais peu de métriques sont renseignées à partir de BigQuery et sont actuellement visibles dans les journaux d'application qui peuvent être lues dans les journaux du pilote/exécuteur.
À partir de Spark 3.2, Spark a fourni l'API pour exposer les métriques personnalisées dans la page Spark UI https://spark.apache.org/docs/3.2.0/api/java/org/apache/spark/sql/connector/metric /Custommetric.html
Actuellement, en utilisant cette API, Connector expose les métriques BigQuery suivantes pendant la lecture
<style> Tableau # Metricstable TD, Table TH {Word-Break: Break-Word} </ Style>Nom métrique | Description |
---|---|
bytes read | Nombre d'octets BigQuery lus |
rows read | Nombre de lignes BigQuery lu |
scan time | Le temps passé entre la réponse des lignes de lecture demandée à obtenir tous les exécuteurs, en millisecondes. |
parse time | Le temps consacré à l'analyse des lignes a lu tous les exécuteurs, en millisecondes. |
spark time | Le temps passé dans Spark pour traiter les requêtes (c'est-à-dire à part la numérisation et l'analyse), à tous les exécuteurs, en millisecondes. |
Remarque: Pour utiliser les métriques de la page Spark UI, vous devez vous assurer que spark-bigquery-metrics-0.41.0.jar
est le chemin de classe avant de démarrer le serveur historique et la version du connecteur est spark-3.2
ou supérieure.
Voir la documentation de tarification BigQuery.
Vous pouvez définir manuellement le nombre de partitions avec la propriété maxParallelism
. BigQuery peut fournir moins de partitions que vous ne le demandez. Voir Configuration du partitionnement.
Vous pouvez également toujours repartition après avoir lu dans Spark.
S'il y a trop de partitions, les quotas CreateWritestream ou le débit peuvent être dépassés. Cela se produit car si les données de chaque partition sont traitées en série, les partitions indépendantes peuvent être traitées en parallèle sur différents nœuds dans le cluster Spark. Généralement, pour garantir un débit soutenu maximal, vous devez déposer une demande d'augmentation de quota. Cependant, vous pouvez également réduire manuellement le nombre de partitions rédigées en appelant coalesce
sur le dataframe pour atténuer ce problème.
desiredPartitionCount = 5
dfNew = df.coalesce(desiredPartitionCount)
dfNew.write
Une règle de base consiste à avoir une seule poignée de partition au moins 1 Go de données.
Notez également qu'un travail exécuté avec la propriété writeAtLeastOnce
activée ne rencontrera pas les erreurs de quota CreateWritestream.
Le connecteur a besoin d'une instance de googlecredentials afin de se connecter aux API BigQuery. Il existe plusieurs options pour le fournir:
GOOGLE_APPLICATION_CREDENTIALS
, comme décrit ici. // Globally
spark.conf.set("credentialsFile", "</path/to/key/file>")
// Per read/Write
spark.read.format("bigquery").option("credentialsFile", "</path/to/key/file>")
// Globally
spark.conf.set("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
// Per read/Write
spark.read.format("bigquery").option("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
gcpAccessTokenProvider
. AccessTokenProvider
doit être mis en œuvre dans Java ou dans d'autres langues JVM telles que Scala ou Kotlin. Il doit avoir un constructeur sans argage ou un constructeur acceptant un seul argument java.util.String
. Ce paramètre de configuration peut être fourni à l'aide de l'option gcpAccessTokenProviderConfig
. Si cela n'est pas fourni, le constructeur sans argage sera appelé. Le pot contenant l'implémentation doit être sur le chemin de classe du cluster. // Globally
spark.conf.set("gcpAccessTokenProvider", "com.example.ExampleAccessTokenProvider")
// Per read/Write
spark.read.format("bigquery").option("gcpAccessTokenProvider", "com.example.ExampleAccessTokenProvider")
L'identité du compte de service peut être configurée pour un nom d'utilisateur spécifique et un nom de groupe, ou pour tous les utilisateurs par défaut en utilisant les propriétés ci-dessous:
gcpImpersonationServiceAccountForUser_<USER_NAME>
(pas défini par défaut)
L'identification du compte de service pour un utilisateur spécifique.
gcpImpersonationServiceAccountForGroup_<GROUP_NAME>
(pas défini par défaut)
Le compte de service s'approvisionnement d'un groupe spécifique.
gcpImpersonationServiceAccount
(non définie par défaut)
Un imitation de compte de service par défaut pour tous les utilisateurs.
Si l'une des propriétés ci-dessus est définie, le compte de service spécifié sera usurpé en générant des informations d'identification de courte durée lors de l'accès à BigQuery.
Si plusieurs propriétés sont définies, le compte de service associé au nom d'utilisateur aura priorité sur le compte de service associé au nom de groupe pour un utilisateur et un groupe correspondants, qui à son tour auront la priorité sur l'identité du compte de service par défaut.
Pour une application plus simple, où une actualisation du jeton d'accès n'est pas requise, une autre alternative consiste à passer le jeton d'accès comme l'option de configuration gcpAccessToken
. Vous pouvez obtenir le jeton d'accès en exécutant gcloud auth application-default print-access-token
.
// Globally
spark.conf.set("gcpAccessToken", "<access-token>")
// Per read/Write
spark.read.format("bigquery").option("gcpAccessToken", "<acccess-token>")
IMPORTANT: L' CredentialsProvider
et AccessTokenProvider
doivent être mis en œuvre en Java ou dans un autre langage JVM tel que Scala ou Kotlin. Le pot contenant l'implémentation doit être sur le chemin de classe du cluster.
AVIS: Une seule des options ci-dessus doit être fournie.
Pour vous connecter à un proxy vers l'avant et pour authentifier les informations d'identification de l'utilisateur, configurez les options suivantes.
proxyAddress
: Adresse du serveur proxy. Le proxy doit être un proxy HTTP et l'adresse doit être dans le format host:port
.
proxyUsername
: le nom d'utilisateur utilisé pour se connecter au proxy.
proxyPassword
: le mot de passe utilisé pour se connecter au proxy.
val df = spark.read.format("bigquery")
.option("proxyAddress", "http://my-proxy:1234")
.option("proxyUsername", "my-username")
.option("proxyPassword", "my-password")
.load("some-table")
Les mêmes paramètres de proxy peuvent également être définis à l'échelle mondiale à l'aide de RuntimeConfig de Spark comme ceci:
spark.conf.set("proxyAddress", "http://my-proxy:1234")
spark.conf.set("proxyUsername", "my-username")
spark.conf.set("proxyPassword", "my-password")
val df = spark.read.format("bigquery")
.load("some-table")
Vous pouvez également définir ce qui suit dans la configuration Hadoop.
fs.gs.proxy.address
(similaire à "ProxyAddress"), fs.gs.proxy.username
(similaire à "proxyUsername") et fs.gs.proxy.password
(similaire à "proxyPassword").
Si le même paramètre est défini à plusieurs endroits, l'ordre de priorité est le suivant:
Option ("Key", "Value")> Spark.conf> Configuration Hadoop