コネクタは、Google BigQuery テーブルの Spark の DataFrame への読み取りと、DataFrame の BigQuery への書き込みをサポートしています。これは、Spark SQL データ ソース API を使用して BigQuery と通信することによって行われます。
Storage API は、Google Cloud Storage を仲介せずに、gRPC 経由で BigQuery から直接データを並列ストリーミングします。
以前のエクスポートベースの読み取りフローを使用するよりも多くの利点があり、一般に読み取りパフォーマンスの向上につながります。
Google Cloud Storage には一時ファイルが残りません。行は、Arrow または Avro ワイヤ形式を使用して BigQuery サーバーから直接読み取られます。
新しい API を使用すると、列と述語のフィルタリングで、関心のあるデータのみを読み取ることができます。
BigQuery は列指向データストアによって支えられているため、すべての列を読み取らなくてもデータを効率的にストリーミングできます。
Storage API は、述語フィルターの任意のプッシュダウンをサポートします。コネクタ バージョン 0.8.0 ベータ以降では、BigQuery への任意のフィルタのプッシュダウンがサポートされています。
Spark には、ネストされたフィールドでフィルターのプッシュダウンを許可しない既知の問題があります。たとえば、 address.city = "Sunnyvale"
のようなフィルタは Bigquery にプッシュダウンされません。
API は、すべてのリーダーが完了するまで、リーダー間のレコードのバランスを再調整します。これは、すべてのマップ フェーズがほぼ同時に終了することを意味します。動的シャーディングが Google Cloud Dataflow でどのように使用されるかについては、このブログ記事をご覧ください。
詳細については、「パーティショニングの構成」を参照してください。
以下の指示に従ってください。
Apache Spark 環境がない場合は、事前構成された認証を使用して Cloud Dataproc クラスタを作成できます。次の例では、Cloud Dataproc を使用していることを前提としていますが、どのクラスタでもspark-submit
を使用できます。
API を使用する Dataproc クラスタには、「bigquery」または「cloud-platform」スコープが必要です。 Dataproc クラスタにはデフォルトで「bigquery」スコープがあるため、有効なプロジェクト内のほとんどのクラスタはデフォルトで動作するはずです。
MY_CLUSTER=...
gcloud dataproc clusters create "$MY_CLUSTER"
コネクタの最新バージョンは、次のリンクから公開されています。
バージョン | リンク |
---|---|
スパーク3.5 | gs://spark-lib/bigquery/spark-3.5-bigquery-0.41.0.jar (HTTP リンク) |
スパーク3.4 | gs://spark-lib/bigquery/spark-3.4-bigquery-0.41.0.jar (HTTP リンク) |
スパーク3.3 | gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar (HTTP リンク) |
スパーク3.2 | gs://spark-lib/bigquery/spark-3.2-bigquery-0.41.0.jar (HTTP リンク) |
スパーク 3.1 | gs://spark-lib/bigquery/spark-3.1-bigquery-0.41.0.jar (HTTP リンク) |
スパーク2.4 | gs://spark-lib/bigquery/spark-2.4-bigquery-0.37.0.jar (HTTP リンク) |
スカラ 2.13 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-0.41.0.jar (HTTP リンク) |
スカラ 2.12 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.41.0.jar (HTTP リンク) |
スカラ 2.11 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.29.0.jar (HTTP リンク) |
最初の 6 つのバージョンは、Spark の新しいデータ ソース API (データ ソース API v2) 上に構築されたすべての Scala バージョンの Spark 2.4/3.1/3.2/3.3/3.4/3.5 をターゲットとする Java ベースのコネクタです。
最後の 2 つのコネクタは Scala ベースのコネクタです。以下に説明するように、Spark インストールに関連する jar を使用してください。
コネクタ スパーク | 2.3 | 2.4 | 3.0 | 3.1 | 3.2 | 3.3 | 3.4 | 3.5 |
---|---|---|---|---|---|---|---|---|
スパーク-3.5-bigquery | ✓ | |||||||
スパーク-3.4-bigquery | ✓ | ✓ | ||||||
スパーク-3.3-bigquery | ✓ | ✓ | ✓ | |||||
スパーク-3.2-bigquery | ✓ | ✓ | ✓ | ✓ | ||||
スパーク-3.1-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | |||
スパーク-2.4-bigquery | ✓ | |||||||
スパークビッグクエリと依存関係_2.13 | ✓ | ✓ | ✓ | ✓ | ||||
スパークビッグクエリと依存関係_2.12 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | |
スパークビッグクエリと依存関係_2.11 | ✓ | ✓ |
コネクタ Dataproc イメージ | 1.3 | 1.4 | 1.5 | 2.0 | 2.1 | 2.2 | サーバーレス 画像1.0 | サーバーレス イメージ2.0 | サーバーレス 画像2.1 | サーバーレス 画像2.2 |
---|---|---|---|---|---|---|---|---|---|---|
スパーク-3.5-bigquery | ✓ | ✓ | ||||||||
スパーク-3.4-bigquery | ✓ | ✓ | ✓ | |||||||
スパーク-3.3-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ||||
スパーク-3.2-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ||||
スパーク-3.1-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | |||
スパーク-2.4-bigquery | ✓ | ✓ | ||||||||
スパークビッグクエリと依存関係_2.13 | ✓ | ✓ | ✓ | |||||||
スパークビッグクエリと依存関係_2.12 | ✓ | ✓ | ✓ | ✓ | ✓ | |||||
スパークビッグクエリと依存関係_2.11 | ✓ | ✓ |
このコネクタは、Maven Central リポジトリから入手することもできます。 --packages
オプションまたはspark.jars.packages
構成プロパティを使用して使用できます。次の値を使用します
バージョン | コネクタアーティファクト |
---|---|
スパーク3.5 | com.google.cloud.spark:spark-3.5-bigquery:0.41.0 |
スパーク3.4 | com.google.cloud.spark:spark-3.4-bigquery:0.41.0 |
スパーク3.3 | com.google.cloud.spark:spark-3.3-bigquery:0.41.0 |
スパーク3.2 | com.google.cloud.spark:spark-3.2-bigquery:0.41.0 |
スパーク 3.1 | com.google.cloud.spark:spark-3.1-bigquery:0.41.0 |
スパーク2.4 | com.google.cloud.spark:spark-2.4-bigquery:0.37.0 |
スカラ 2.13 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.13:0.41.0 |
スカラ 2.12 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0 |
スカラ 2.11 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.29.0 |
イメージ 2.1 以降を使用して作成された Dataproc クラスタ、または Dataproc サーバーレス サービスを使用したバッチには、組み込みの Spark BigQuery コネクタが付属しています。この場合、組み込みコネクタが優先されるため、標準の--jars
または--packages
(または、 spark.jars
/ spark.jars.packages
構成) の使用は役に立ちません。
組み込みバージョン以外の別のバージョンを使用するには、次のいずれかを実行してください。
--metadata SPARK_BQ_CONNECTOR_VERSION=0.41.0
、または--metadata SPARK_BQ_CONNECTOR_URL=gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar
して、別の jar を使用してクラスターを作成します。 URL は、クラスターの Spark バージョンの有効なコネクタ JAR を指すことができます。--properties dataproc.sparkBqConnector.version=0.41.0
、または--properties dataproc.sparkBqConnector.uri=gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar
別の jar でバッチを作成するには--properties dataproc.sparkBqConnector.uri=gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar
します。 URL は、ランタイムの Spark バージョンの有効なコネクタ JAR を指すことができます。 次のコマンドを実行すると、コンパイルせずに API に対して単純な PySpark ワードカウントを実行できます。
Dataproc イメージ 1.5 以降
gcloud dataproc jobs submit pyspark --cluster "$MY_CLUSTER"
--jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.41.0.jar
examples/python/shakespeare.py
Dataproc イメージ 1.4 以前
gcloud dataproc jobs submit pyspark --cluster "$MY_CLUSTER"
--jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.29.0.jar
examples/python/shakespeare.py
https://codelabs.developers.google.com/codelabs/pyspark-bigquery
コネクタは、クロスランゲージ Spark SQL データ ソース API を使用します。
df = spark.read
.format("bigquery")
.load("bigquery-public-data.samples.shakespeare")
または Scala のみの暗黙的な API:
import com.google.cloud.spark.bigquery._
val df = spark.read.bigquery("bigquery-public-data.samples.shakespeare")
詳細については、Python、Scala、Java の追加のコード サンプルを参照してください。
コネクタを使用すると、BigQuery で標準 SQL SELECT クエリを実行し、その結果を Spark データフレームに直接フェッチできます。これは、次のコード サンプルで説明されているように簡単に実行できます。
spark.conf.set("viewsEnabled","true")
spark.conf.set("materializationDataset","<dataset>")
sql = """
SELECT tag, COUNT(*) c
FROM (
SELECT SPLIT(tags, '|') tags
FROM `bigquery-public-data.stackoverflow.posts_questions` a
WHERE EXTRACT(YEAR FROM creation_date)>=2014
), UNNEST(tags) tag
GROUP BY 1
ORDER BY 2 DESC
LIMIT 10
"""
df = spark.read.format("bigquery").load(sql)
df.show()
結果が得られるのは
+----------+-------+
| tag| c|
+----------+-------+
|javascript|1643617|
| python|1352904|
| java|1218220|
| android| 913638|
| php| 911806|
| c#| 905331|
| html| 769499|
| jquery| 608071|
| css| 510343|
| c++| 458938|
+----------+-------+
2 番目のオプションは、次のようにquery
オプションを使用することです。
df = spark.read.format("bigquery").option("query", sql).load()
結果のみがネットワーク経由で送信されるため、実行が高速になることに注意してください。同様の方法で、Spark で結合を実行するより効率的にクエリに JOIN を含めたり、サブクエリ、BigQuery ユーザー定義関数、ワイルドカード テーブル、BigQuery ML などの他の BigQuery 機能を使用したりできます。
この機能を使用するには、次の構成を設定する必要があります。
viewsEnabled
true
に設定する必要があります。materializationDataset
GCP ユーザーがテーブル作成権限を持つデータセットに設定する必要があります。 materializationProject
オプションです。注: BigQuery のドキュメントで説明されているように、クエリされるテーブルはmaterializationDataset
と同じ場所にある必要があります。また、 SQL statement
内のテーブルがparentProject
以外のプロジェクトからのものである場合は、完全修飾テーブル名 ( [project].[dataset].[table]
を使用します。
重要:この機能は、BigQuery でクエリを実行し、結果を一時テーブルに保存することで実装され、Spark はそこから結果を読み取ります。これにより、BigQuery アカウントに追加のコストが追加される可能性があります。
コネクタには、BigQuery ビューからの読み取りに対する予備的なサポートがあります。いくつかの注意点があることに注意してください。
collect()
またはcount()
アクションを実行する前であっても、読み取りパフォーマンスに影響します。materializationProject
およびmaterializationDataset
オプションによって構成できます。これらのオプションは、ビューを読み取る前にspark.conf.set(...)
を呼び出すことでグローバルに設定することもできます。.option("viewsEnabled", "true")
) を読み取るときに viewsEnabled オプションを設定するか、 spark.conf.set("viewsEnabled", "true")
呼び出してグローバルに設定します。materializationDataset
ビューと同じ場所にある必要があります。BigQuery への DataFrame の書き込みは、直接と間接の 2 つの方法を使用して実行できます。
この方法では、BigQuery Storage Write API を使用してデータが BigQuery に直接書き込まれます。このオプションを有効にするには、以下に示すように、 writeMethod
オプションをdirect
に設定してください。
df.write
.format("bigquery")
.option("writeMethod", "direct")
.save("dataset.table")
APPEND 保存モードおよび OVERWRITE モード(日付と範囲パーティションのみ)での既存のパーティション分割テーブル(日付パーティション、取り込み時間パーティション、範囲パーティション)への書き込みは、コネクタと BigQuery Storage Write API で完全にサポートされています。以下で説明するdatePartition
、 partitionField
、 partitionType
、 partitionRangeStart
、 partitionRangeEnd
、 partitionRangeInterval
の使用は、現時点では直接書き込み方式ではサポートされていません。
重要: BigQuery Storage Write API の料金については、データ取り込みの料金ページを参照してください。
重要:以前のバージョンには、場合によってはテーブルの削除を引き起こす可能性があるバグがあるため、直接書き込みにはバージョン 0.24.2 以降を使用してください。
このメソッドでは、データは最初に GCS に書き込まれ、次に BigQuery にロードされます。 GCS バケットは、一時的なデータの場所を示すように構成する必要があります。
df.write
.format("bigquery")
.option("temporaryGcsBucket","some-bucket")
.save("dataset.table")
データは、Apache Parquet、Apache ORC、または Apache Avro 形式を使用して一時的に保存されます。
GCS バケットと形式は、次のように Spark の RuntimeConfig を使用してグローバルに設定することもできます。
spark.conf.set("temporaryGcsBucket","some-bucket")
df.write
.format("bigquery")
.save("dataset.table")
DataFrame を BigQuery にストリーミングする場合、各バッチは非ストリーミング DataFrame と同じ方法で書き込まれます。 HDFS と互換性のあるチェックポイントの場所 (例: path/to/HDFS/dir
またはgs://checkpoint-bucket/checkpointDir
) を指定する必要があることに注意してください。
df.writeStream
.format("bigquery")
.option("temporaryGcsBucket","some-bucket")
.option("checkpointLocation", "some-location")
.option("table", "dataset.table")
重要:別の GCS コネクタが存在する場合、そのコネクタとの競合を避けるために、コネクタは GCS コネクタを構成しません。コネクタの書き込み機能を使用するには、ここで説明されているようにクラスター上で GCS コネクタを構成してください。
API は、読み取りを構成するための多数のオプションをサポートしています。
<style> table#propertytable td, table th { word-break:break-word } </style>財産 | 意味 | 使用法 |
---|---|---|
table | [[project:]dataset.]table 形式の BigQuery テーブル。代わりに、 load() / save() のpath パラメーターを使用することをお勧めします。このオプションは非推奨となっており、将来のバージョンでは削除される予定です。(非推奨) | 読み取り/書き込み |
dataset | テーブルを含むデータセット。このオプションは標準のテーブルとビューで使用する必要がありますが、クエリ結果をロードする場合は使用しないでください。 ( table で省略されない限りオプション) | 読み取り/書き込み |
project | テーブルの Google Cloud プロジェクト ID。このオプションは標準のテーブルとビューで使用する必要がありますが、クエリ結果をロードする場合は使用しないでください。 (オプション。デフォルトは使用されているサービスアカウントのプロジェクト) | 読み取り/書き込み |
parentProject | エクスポートに対して請求するテーブルの Google Cloud プロジェクト ID。 (オプション。デフォルトは使用されているサービスアカウントのプロジェクト) | 読み取り/書き込み |
maxParallelism | データを分割するパーティションの最大数。 BigQuery がデータが十分に小さいと判断した場合、実際の数はこれより少なくなる可能性があります。パーティションごとにリーダーをスケジュールするのに十分なエグゼキュータがない場合、一部のパーティションが空になる可能性があります。 重要:古いパラメータ ( parallelism ) は引き続きサポートされていますが、非推奨モードです。コネクタのバージョン 1.0 では削除される予定です。(オプション。デフォルトはpreferredMinParallelismと20,000の大きい方です)。 | 読む |
preferredMinParallelism | データを分割するパーティションの推奨最小数。 BigQuery がデータが十分に小さいと判断した場合、実際の数はこれより少なくなる可能性があります。パーティションごとにリーダーをスケジュールするのに十分なエグゼキュータがない場合、一部のパーティションが空になる可能性があります。 (オプション。デフォルトは、アプリケーションのデフォルトの並列処理と maxParallelism の 3 倍の最小値です。) | 読む |
viewsEnabled | コネクタがテーブルだけでなくビューから読み取ることができるようにします。このオプションを有効にする前に、関連するセクションをお読みください。 (オプション。デフォルトは false ) | 読む |
materializationProject | マテリアライズド ビューが作成されるプロジェクト ID (オプション。デフォルトはビューのプロジェクト ID) | 読む |
materializationDataset | マテリアライズド ビューが作成されるデータセット。このデータセットは、ビューまたはクエリされたテーブルと同じ場所にある必要があります。 (オプション。デフォルトはビューのデータセット) | 読む |
materializationExpirationTimeInMinutes | ビューまたはクエリの実体化されたデータを保持する一時テーブルの有効期限 (分単位)。コネクタはローカル キャッシュを使用するため、また BigQuery の計算を削減するために一時テーブルを再利用する場合があるため、非常に低い値を指定するとエラーが発生する可能性があることに注意してください。値は正の整数である必要があります。 (オプション。デフォルトは 1440、つまり 24 時間) | 読む |
readDataFormat | BigQuery から読み取るデータ形式。オプション: ARROW 、 AVRO (オプション。デフォルトは ARROW ) | 読む |
optimizedEmptyProjection | コネクタは、 count() の実行に使用される、最適化された空の射影 (列を含まない選択) ロジックを使用します。このロジックは、テーブルのメタデータから直接データを取得するか、フィルターがある場合に非常に効率的な `SELECT COUNT(*) WHERE...` を実行します。このオプションをfalse に設定すると、このロジックの使用をキャンセルできます。(オプション、デフォルトは true ) | 読む |
pushAllFilters | true に設定すると、コネクタは Spark が BigQuery Storage API に委任できるすべてのフィルタをプッシュします。これにより、BigQuery Storage API サーバーから Spark クライアントに送信する必要があるデータの量が削減されます。このオプションは非推奨となっており、将来のバージョンでは削除される予定です。(オプション、デフォルトは true )(非推奨) | 読む |
bigQueryJobLabel | コネクタが開始したクエリにラベルを追加し、BigQuery ジョブを読み込むために使用できます。ラベルは複数設定できます。 (オプション) | 読む |
bigQueryTableLabel | テーブルへの書き込み中にテーブルにラベルを追加するために使用できます。ラベルは複数設定できます。 (オプション) | 書く |
traceApplicationName | BigQuery Storage の読み取りおよび書き込みセッションをトレースするために使用されるアプリケーション名。セッションにトレース ID を設定するには、アプリケーション名の設定が必要です。 (オプション) | 読む |
traceJobId | BigQuery Storage の読み取りおよび書き込みセッションをトレースするために使用されるジョブ ID。 (オプション。デフォルトでは Dataproc ジョブ ID が存在します。それ以外の場合は Spark アプリケーション ID を使用します) | 読む |
createDisposition | ジョブが新しいテーブルを作成できるかどうかを指定します。許可される値は次のとおりです。
(オプション。デフォルトは CREATE_IF_NEEDED です)。 | 書く |
writeMethod | データを BigQuery に書き込む方法を制御します。使用可能な値は、BigQuery Storage Write API を使用するdirect 値と、最初にデータを GCS に書き込んでから BigQuery の読み込みオペレーションをトリガーするindirect です。詳細はこちらをご覧ください(オプション、デフォルトは indirect ) | 書く |
writeAtLeastOnce | データが少なくとも 1 回 BigQuery に書き込まれることを保証します。これは、確実に 1 回だけ保証されるよりも低い保証です。これは、データが小さなバッチで継続的に書き込まれるストリーミング シナリオに適しています。 (オプション。デフォルトは false )「DIRECT」書き込みメソッドでのみサポートされており、モードは「上書き」ではありません。 | 書く |
temporaryGcsBucket | BigQuery に読み込まれる前にデータを一時的に保持する GCS バケット。 Spark 構成 ( spark.conf.set(...) ) で設定されていない限り必須です。「DIRECT」書き込み方式ではサポートされていません。 | 書く |
persistentGcsBucket | BigQuery に読み込まれる前のデータを保持する GCS バケット。通知された場合、BigQuery へのデータの書き込み後にデータは削除されません。 「DIRECT」書き込み方式ではサポートされていません。 | 書く |
persistentGcsPath | BigQuery に読み込まれる前のデータを保持する GCS パス。 persistentGcsBucket でのみ使用されます。「DIRECT」書き込み方式ではサポートされていません。 | 書く |
intermediateFormat | BigQuery に読み込まれる前のデータの形式。値は「parquet」、「orc」、または「avro」のいずれかです。 Avro 形式を使用するには、spark-avro パッケージを実行時に追加する必要があります。 (オプション。デフォルトは parquet 細工です)。書き込み時のみ。 「INDIRECT」書き込みメソッドでのみサポートされます。 | 書く |
useAvroLogicalTypes | Avro (`.option("intermediateFormat", "avro")`) から読み込む場合、BigQuery は論理型の代わりに基礎となる Avro 型を使用します [デフォルト](https://cloud.google.com/bigquery/docs/)ロードデータクラウドストレージavro#logical_types)。このオプションを指定すると、Avro 論理型が対応する BigQuery データ型に変換されます。 (オプション。デフォルトは false )。書き込み時のみ。 | 書く |
datePartition | データが書き込まれる日付パーティション。 YYYYMMDD 形式で指定された日付文字列である必要があります。次のように、単一パーティションのデータを上書きするために使用できます。
(オプション)。書き込み時のみ。 次のようなさまざまなパーティション タイプでも使用できます。 時間: YYYYMMDDHH 月: YYYYMM 年: YYYY 「DIRECT」書き込み方式ではサポートされていません。 | 書く |
partitionField | このフィールドが指定されている場合、テーブルはこのフィールドによってパーティション化されます。 時間分割の場合は、オプション `partitionType` とともに指定します。 整数範囲パーティショニングの場合は、`partitionRangeStart`、`partitionRangeEnd、`partitionRangeInterval`の 3 つのオプションとともに指定します。 このフィールドは、時間パーティショニングの場合は最上位の TIMESTAMP または DATE フィールド、整数範囲パーティショニングの場合は INT64 である必要があります。そのモードはNULLABLEまたはREQUIREDである必要があります。このオプションが時間パーティション テーブルに設定されていない場合、テーブルは疑似列によってパーティション化され、 '_PARTITIONTIME' as TIMESTAMP または'_PARTITIONDATE' as DATE を介して参照されます。(オプション)。 「DIRECT」書き込み方式ではサポートされていません。 | 書く |
partitionExpirationMs | テーブル内のパーティションのストレージを保持するミリ秒数。パーティション内のストレージの有効期限は、そのパーティション時間にこの値を加えた時間になります。 (オプション)。 「DIRECT」書き込み方式ではサポートされていません。 | 書く |
partitionType | 時間分割を指定するために使用されます。 サポートされているタイプは次のとおりです: HOUR, DAY, MONTH, YEAR このオプションは、ターゲットテーブルを時間パーティション化するために必須です。 (オプション。PartitionField が指定されている場合、デフォルトは DAY)。 「DIRECT」書き込み方式ではサポートされていません。 | 書く |
partitionRangeStart 、 partitionRangeEnd 、 partitionRangeInterval | 整数範囲のパーティショニングを指定するために使用されます。 これらのオプションは、ターゲット テーブルを整数範囲パーティション化するために必須です。 3 つのオプションをすべて指定する必要があります。 「DIRECT」書き込み方式ではサポートされていません。 | 書く |
clusteredFields | カンマで区切られた、繰り返しのない最上位の列の文字列。 (オプション)。 | 書く |
allowFieldAddition | ALLOW_FIELD_ADDITION SchemaUpdateOption を BigQuery LoadJob に追加します。許可される値はtrue とfalse です。(オプション。デフォルトは false )。「INDIRECT」書き込みメソッドでのみサポートされます。 | 書く |
allowFieldRelaxation | ALLOW_FIELD_RELAXATION SchemaUpdateOption を BigQuery LoadJob に追加します。許可される値はtrue とfalse です。(オプション。デフォルトは false )。「INDIRECT」書き込みメソッドでのみサポートされます。 | 書く |
proxyAddress | プロキシサーバーのアドレス。プロキシは HTTP プロキシである必要があり、アドレスは「host:port」形式である必要があります。あるいは、Spark 構成 ( spark.conf.set(...) ) または Hadoop 構成 ( fs.gs.proxy.address ) で設定することもできます。(オプション。プロキシ経由で GCP に接続する場合にのみ必要です。) | 読み取り/書き込み |
proxyUsername | プロキシへの接続に使用される userName。あるいは、Spark 構成 ( spark.conf.set(...) ) または Hadoop 構成 ( fs.gs.proxy.username ) で設定することもできます。(オプション。認証を使用してプロキシ経由で GCP に接続する場合にのみ必要です。) | 読み取り/書き込み |
proxyPassword | プロキシへの接続に使用されるパスワード。あるいは、Spark 構成 ( spark.conf.set(...) ) または Hadoop 構成 ( fs.gs.proxy.password ) で設定することもできます。(オプション。認証を使用してプロキシ経由で GCP に接続する場合にのみ必要です。) | 読み取り/書き込み |
httpMaxRetry | BigQuery への低レベル HTTP リクエストの最大再試行数。あるいは、Spark 構成 ( spark.conf.set("httpMaxRetry", ...) ) または Hadoop 構成 ( fs.gs.http.max.retry ) で設定することもできます。(オプション。デフォルトは10) | 読み取り/書き込み |
httpConnectTimeout | BigQuery との接続を確立するためのタイムアウト (ミリ秒単位)。あるいは、Spark 構成 ( spark.conf.set("httpConnectTimeout", ...) ) または Hadoop 構成 ( fs.gs.http.connect-timeout ) で設定することもできます。(オプション。デフォルトは 60000 ミリ秒です。無限のタイムアウトの場合は 0、負の数の場合は 20000) | 読み取り/書き込み |
httpReadTimeout | 確立された接続からデータを読み取るときのタイムアウト (ミリ秒単位)。あるいは、Spark 構成 ( spark.conf.set("httpReadTimeout", ...) ) または Hadoop 構成 ( fs.gs.http.read-timeout ) で設定することもできます。(オプション。デフォルトは 60000 ミリ秒です。無限のタイムアウトの場合は 0、負の数の場合は 20000) | 読む |
arrowCompressionCodec | Arrow 形式を使用する場合の BigQuery テーブルからの読み取り時の圧縮コーデック。オプション: ZSTD (Zstandard compression) 、 LZ4_FRAME (https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md) 、 COMPRESSION_UNSPECIFIED 。 Java を使用する場合、推奨される圧縮コーデックはZSTD です。(オプション。デフォルトは COMPRESSION_UNSPECIFIED で、圧縮は使用されないことを意味します) | 読む |
responseCompressionCodec | ReadRowsResponse データの圧縮に使用される圧縮コーデック。オプション: RESPONSE_COMPRESSION_CODEC_UNSPECIFIED 、 RESPONSE_COMPRESSION_CODEC_LZ4 (オプション。デフォルトは RESPONSE_COMPRESSION_CODEC_UNSPECIFIED で、圧縮は使用されないことを意味します) | 読む |
cacheExpirationTimeInMinutes | クエリ情報を保存するメモリ内キャッシュの有効期限。 キャッシュを無効にするには、値を 0 に設定します。 (オプション。デフォルトは 15 分) | 読む |
enableModeCheckForSchemaFields | DIRECT 書き込み中に、宛先スキーマのすべてのフィールドのモードが、対応するソース フィールド スキーマのモードと等しいかどうかを確認します。 デフォルト値は true です。つまり、チェックはデフォルトで行われます。 false に設定すると、モード チェックは無視されます。 | 書く |
enableListInference | 特にモードが Parquet の場合にスキーマ推論を使用するかどうかを示します (https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions)。 デフォルトは false です。 | 書く |
bqChannelPoolSize | BigQueryReadClient によって作成される gRPC チャネル プールの(固定)サイズ。 最適なパフォーマンスを得るには、これを少なくともクラスター エグゼキューターのコア数に設定する必要があります。 | 読む |
createReadSessionTimeoutInSeconds | テーブルの読み取り時に ReadSession を作成するためのタイムアウト (秒単位)。 非常に大きなテーブルの場合は、この値を増やす必要があります。 (オプション。デフォルトは 600 秒) | 読む |
queryJobPriority | BigQuery クエリからデータを読み取るときにジョブに設定される優先度レベル。許可される値は次のとおりです。
(オプション。デフォルトは INTERACTIVE ) | 読み取り/書き込み |
destinationTableKmsKeyName | 宛先 BigQuery テーブルを保護するために使用される Cloud KMS 暗号化キーについて説明します。プロジェクトに関連付けられた BigQuery サービス アカウントは、この暗号化キーにアクセスする必要があります。 BigQuery での CMEK の使用の詳細については、[こちら](https://cloud.google.com/bigquery/docs/customer-managed-encryption#key_resource_id) を参照してください。 注意:テーブルは、コネクタによって作成された場合にのみキーによって暗号化されます。このオプションを設定しただけでは、既存の暗号化されていないテーブルは暗号化されません。 (オプション) | 書く |
allowMapTypeConversion | フィールド名がkey およびvalue である 2 つのサブフィールドがレコードにある場合に、BigQuery レコードから Spark MapType への変換を無効にするブール構成。デフォルト値はtrue で、変換が許可されます。(オプション) | 読む |
spark.sql.sources.partitionOverwriteMode | テーブルが範囲/時間パーティション化されている場合に、書き込み時に上書きモードを指定するための構成。現在サポートされているモードはSTATIC とDYNAMIC の 2 つです。 STATIC モードでは、テーブル全体が上書きされます。 DYNAMIC モードでは、データは既存のテーブルのパーティションによって上書きされます。デフォルト値はSTATIC です。(オプション) | 書く |
enableReadSessionCaching | 読み取りセッションのキャッシュを無効にするブール設定。 BigQuery の読み取りセッションをキャッシュして、Spark クエリの計画を高速化できるようにします。デフォルト値はtrue です。(オプション) | 読む |
readSessionCacheDurationMins | 読み取りセッションのキャッシュ期間を分単位で設定するように構成します。 enableReadSessionCaching がtrue (デフォルト) の場合にのみ機能します。読み取りセッションをキャッシュする期間を指定できます。許可される最大値は300 です。デフォルト値は5 です。(オプション) | 読む |
bigQueryJobTimeoutInMinutes | BigQuery ジョブのタイムアウトを分単位で設定するように構成します。デフォルト値は360 分です。(オプション) | 読み取り/書き込み |
snapshotTimeMillis | テーブルのスナップショットの読み取りに使用するタイムスタンプをミリ秒単位で指定します。デフォルトでは、これは設定されておらず、テーブルの最新バージョンが読み取られます。 (オプション) | 読む |
bigNumericDefaultPrecision | BigQuery のデフォルトは Spark には広すぎるため、BigNumeric フィールドの代替デフォルト精度。値は 1 ~ 38 です。このデフォルトは、フィールドがパラメータ化されていない BigNumeric 型を持つ場合にのみ使用されます。実際のデータの精度が指定された精度よりも高い場合、データが失われる可能性があることに注意してください。 (オプション) | 読み取り/書き込み |
bigNumericDefaultScale | BigNumeric フィールドの代替デフォルト スケール。値は 0 ~ 38 の範囲で、bigNumericFieldsPrecision 未満にすることができます。このデフォルトは、フィールドにパラメータ化されていない BigNumeric 型がある場合にのみ使用されます。実際のデータの規模が指定されたものより大きい場合、データが失われる可能性があることに注意してください。 (オプション) | 読み取り/書き込み |
オプションは、 spark-submit
の--conf
パラメータまたはgcloud dataproc submit spark
の--properties
パラメータを使用して、コードの外部で設定することもできます。これを使用するには、接頭辞spark.datasource.bigquery.
たとえば、 spark.conf.set("temporaryGcsBucket", "some-bucket")
を--conf spark.datasource.bigquery.temporaryGcsBucket=some-bucket
として設定することもできます。
DATETIME
とTIME
を除くすべての BigQuery データ型は、対応する Spark SQL データ型に直接マッピングされます。すべてのマッピングは次のとおりです。
BigQuery 標準 SQL データ型 | スパークSQL データ型 | 注意事項 |
BOOL | BooleanType | |
INT64 | LongType | |
FLOAT64 | DoubleType | |
NUMERIC | DecimalType | Numeric および BigNumeric のサポートを参照してください。 |
BIGNUMERIC | DecimalType | Numeric および BigNumeric のサポートを参照してください。 |
STRING | StringType | |
BYTES | BinaryType | |
STRUCT | StructType | |
ARRAY | ArrayType | |
TIMESTAMP | TimestampType | |
DATE | DateType | |
DATETIME | StringType 、 TimestampNTZType * | Spark には DATETIME 型がありません。 BQ DATETIME リテラルの形式であれば、Spark 文字列を既存の BQ DATETIME 列に書き込むことができます。 * Spark 3.4 以降の場合、BQ DATETIME は Spark の TimestampNTZ タイプ、つまり java LocalDateTime として読み取られます。 |
TIME | LongType 、 StringType * | Spark には TIME タイプがありません。生成された Long は午前 0 時からのマイクロ秒を示し、安全に TimestampType にキャストできますが、これにより日付が現在の日として推測されます。したがって、時間は長く残され、ユーザーは必要に応じてキャストできます。 タイムスタンプ TIME にキャストすると、DATETIME と同じ TimeZone の問題が発生します * Spark 文字列は、BQ TIME リテラルの形式であれば、既存の BQ TIME 列に書き込むことができます。 |
JSON | StringType | Spark には JSON タイプがありません。値は文字列として読み取られます。 JSON を BigQuery に書き戻すには、次の条件が必要です。
|
ARRAY<STRUCT<key,value>> | MapType | BigQuery には MAP タイプがないため、Apache Avro や BigQuery Load ジョブなどの他の変換と同様に、コネクタは Spark マップを REPEATED STRUCT<key,value> に変換します。つまり、マップの書き込みと読み取りは可能ですが、マップ セマンティクスを使用する BigQuery での SQL の実行はサポートされていません。 BigQuery SQL を使用してマップの値を参照するには、BigQuery のドキュメントを確認してください。これらの非互換性のため、いくつかの制限が適用されます。
|
Spark ML Vector と Matrix は、密バージョンと疎バージョンを含めてサポートされています。データは BigQuery RECORD として保存されます。フィールドの説明に、フィールドのスパーク タイプを含むサフィックスが追加されていることに注意してください。
これらの型を BigQuery に書き込むには、ORC または Avro 中間形式を使用し、それらを行の列として(つまり、構造体のフィールドではなく)含めます。
BigQuery の BigNumeric の精度は 76.76 (77 桁目は部分的)、位取りは 38 です。この精度と位取りは、Spark の DecimalType (位取り 38 と精度 38) のサポートを超えているため、精度が 38 を超える BigNumeric フィールドは使用できないことを意味します。 。この Spark の制限が更新されると、それに応じてコネクタも更新されます。
Spark Decimal/BigQuery Numeric 変換は、型のパラメータ化を保持しようとします。つまり、 NUMERIC(10,2)
Decimal(10,2)
に変換され、その逆も同様です。ただし、パラメータが失われる場合があることに注意してください。これは、パラメータがデフォルトの NUMERIC (38,9) および BIGNUMERIC(76,38) に戻ることを意味します。つまり、現時点では BigNumeric 読み取りは標準テーブルからのみサポートされており、BigQuery ビューや BigQuery クエリからデータを読み取る場合にはサポートされていません。
コネクタは自動的に列を計算し、DataFrame のSELECT
ステートメントをプッシュダウン フィルターします。
spark.read.bigquery("bigquery-public-data:samples.shakespeare")
.select("word")
.where("word = 'Hamlet' or word = 'Claudius'")
.collect()
列word
にフィルターを適用し、述語フィルターword = 'hamlet' or word = 'Claudius'
をプッシュダウンします。
BigQuery に対して複数の読み取りリクエストを行いたくない場合は、フィルタリングする前に DataFrame をキャッシュできます。例:
val cachedDF = spark.read.bigquery("bigquery-public-data:samples.shakespeare").cache()
val rows = cachedDF.select("word")
.where("word = 'Hamlet'")
.collect()
// All of the table was cached and this doesn't require an API call
val otherRows = cachedDF.select("word_count")
.where("word = 'Romeo'")
.collect()
filter
オプションを手動で指定することもできます。これにより、自動プッシュダウンがオーバーライドされ、Spark が残りのフィルター処理をクライアントで実行します。
疑似列 _PARTITIONDATE および _PARTITIONTIME はテーブル スキーマの一部ではありません。したがって、パーティション化されたテーブルのパーティションごとにクエリを実行するには、上記の where() メソッドを使用しないでください。代わりに、次の方法でフィルター オプションを追加します。
val df = spark.read.format("bigquery")
.option("filter", "_PARTITIONDATE > '2019-01-01'")
...
.load(TABLE)
デフォルトでは、コネクタは、読み取られるテーブル内に 400MB ごとに 1 つのパーティションを作成します (フィルタリング前)。これは、BigQuery Storage API でサポートされるリーダーの最大数にほぼ対応します。これは、 maxParallelism
プロパティを使用して明示的に構成できます。 BigQuery は、サーバーの制約に基づいてパーティションの数を制限する場合があります。
BigQuery リソースの使用状況の追跡をサポートするために、コネクタは BigQuery リソースにタグを付ける次のオプションを提供します。
コネクタは、BigQuery の読み込みジョブとクエリ ジョブを起動できます。ジョブへのラベルの追加は次の方法で行われます。
spark.conf.set("bigQueryJobLabel.cost_center", "analytics")
spark.conf.set("bigQueryJobLabel.usage", "nightly_etl")
これにより、ラベルcost_center
= analytics
とusage
= nightly_etl
が作成されます。
読み取りおよび書き込みセッションに注釈を付けるために使用されます。トレース ID の形式はSpark:ApplicationName:JobID
です。これはオプトイン オプションであり、これを使用するには、ユーザーは、 traceApplicationName
プロパティを設定する必要があります。 JobID は Dataproc ジョブ ID によって自動生成され、Spark アプリケーション ID( application_1648082975639_0001
など)にフォールバックされます。ジョブ ID は、 traceJobId
オプションを設定することでオーバーライドできます。トレース ID の合計の長さは 256 文字を超えることができないことに注意してください。
コネクタは、Spark クラスターにインストールされていない場合でも、Jupyter ノートブックで使用できます。次のコードを使用して、外部 jar として追加できます。
パイソン:
from pyspark . sql import SparkSession
spark = SparkSession . builder
. config ( "spark.jars.packages" , "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0" )
. getOrCreate ()
df = spark . read . format ( "bigquery" )
. load ( "dataset.table" )
スカラ:
val spark = SparkSession .builder
.config( " spark.jars.packages " , " com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0 " )
.getOrCreate()
val df = spark.read.format( " bigquery " )
.load( " dataset.table " )
Spark クラスターが Scala 2.12 を使用している場合 (Spark 2.4.x ではオプション、3.0.x では必須)、関連するパッケージは com.google.cloud.spark:spark-bigquery-with-dependency_ 2.12 :0.41.0 です。使用されている Scala バージョンを確認するには、次のコードを実行してください。
パイソン:
spark . sparkContext . _jvm . scala . util . Properties . versionString ()
スカラ:
scala . util . Properties . versionString
暗黙的な Scala API spark.read.bigquery("TABLE_ID")
を使用しない限り、コネクタに対してコンパイルする必要はありません。
プロジェクトにコネクタを含めるには:
< dependency >
< groupId >com.google.cloud.spark</ groupId >
< artifactId >spark-bigquery-with-dependencies_${scala.version}</ artifactId >
< version >0.41.0</ version >
</ dependency >
libraryDependencies + = " com.google.cloud.spark " %% " spark-bigquery-with-dependencies " % " 0.41.0 "
Spark では、エンド ユーザーが Spark 履歴ページで見つけることができる多くのメトリクスが設定されます。ただし、これらのメトリクスはすべて Spark 関連であり、コネクタから変更を加えることなく暗黙的に収集されます。ただし、BigQuery から入力され、現在アプリケーション ログに表示され、ドライバー/エグゼキュータ ログで読み取ることができる指標はほとんどありません。
Spark 3.2 以降、Spark は、Spark UI ページ https://spark.apache.org/docs/3.2.0/api/java/org/apache/spark/sql/connector/metric でカスタム メトリクスを公開する API を提供しています。 /カスタムメトリック.html
現在、このAPIを使用して、コネクタは読み取り中に次のBigQueryメトリックを公開します
<style> Table#MetricStable TD、Table TH {word-break:break-word} </style>メトリクス名 | 説明 |
---|---|
bytes read | BigQueryバイトの数が読み取られます |
rows read | BigQueryの列の数が読まれます |
scan time | 読み取り行の応答間で費やされた時間は、すべての執行者で取得するように要求され、ミリ秒単位で。 |
parse time | すべての執行者で読み取られた行を解析するために費やされる時間は、ミリ秒単位で。 |
spark time | スパークに費やされた時間の量は、クエリ(すなわち、スキャンと解析を除いて、すべての執行者全体でミリ秒単位で処理します。 |
注: Spark UIページでメトリックを使用するには、 spark-bigquery-metrics-0.41.0.jar
が歴史サーバーを開始する前のクラスパスであり、コネクタバージョンがspark-3.2
以上であることを確認する必要があります。
BigQueryの価格設定文書を参照してください。
maxParallelism
プロパティを使用して、パーティションの数を手動で設定できます。 BigQueryは、あなたが求めるよりも少ないパーティションを提供する場合があります。パーティション化の構成を参照してください。
また、Sparkで読んだ後は常に再パーティションを行うこともできます。
パーティションが多すぎると、createwRiteStreamまたはスループットクォータを超えることがあります。これは、各パーティション内のデータが連続的に処理されている間、独立したパーティションがSparkクラスター内の異なるノードで並行して処理される可能性があるためです。一般に、最大の持続的なスループットを確保するには、クォータの増加要求を提出する必要があります。ただし、この問題を軽減するために、データフレームでcoalesce
を呼び出すことにより、記述されるパーティションの数を手動で減らすこともできます。
desiredPartitionCount = 5
dfNew = df.coalesce(desiredPartitionCount)
dfNew.write
経験則では、少なくとも1GBのデータを1つのパーティションハンドルにすることです。
また、 writeAtLeastOnce
プロパティをオンにして実行されているジョブは、CreateWRiteStreamクォータエラーに遭遇しないことに注意してください。
コネクタには、BigQuery APIに接続するために、GoogleCredentionsのインスタンスが必要です。それを提供するための複数のオプションがあります:
GOOGLE_APPLICATION_CREDENTIALS
環境変数からJSONキーをロードすることです。 // Globally
spark.conf.set("credentialsFile", "</path/to/key/file>")
// Per read/Write
spark.read.format("bigquery").option("credentialsFile", "</path/to/key/file>")
// Globally
spark.conf.set("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
// Per read/Write
spark.read.format("bigquery").option("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
gcpAccessTokenProvider
オプションで提供する必要があります。 AccessTokenProvider
、JavaまたはScalaやKotlinなどの他のJVM言語で実装する必要があります。単一のjava.util.String
引数を受け入れる非ARGコンストラクターまたはコンストラクターが必要です。この構成パラメーターは、 gcpAccessTokenProviderConfig
オプションを使用して提供できます。これが提供されていない場合、ARGノーコンストラクターが呼び出されます。実装を含む瓶は、クラスターのクラスパスにある必要があります。 // Globally
spark.conf.set("gcpAccessTokenProvider", "com.example.ExampleAccessTokenProvider")
// Per read/Write
spark.read.format("bigquery").option("gcpAccessTokenProvider", "com.example.ExampleAccessTokenProvider")
サービスアカウントのなりすましは、特定のユーザー名とグループ名に対して、またはデフォルトで以下のプロパティを使用してすべてのユーザーに対して構成できます。
gcpImpersonationServiceAccountForUser_<USER_NAME>
(デフォルトでは設定されていません)
特定のユーザーのサービスアカウントのなりすまし。
gcpImpersonationServiceAccountForGroup_<GROUP_NAME>
(デフォルトで設定されていません)
特定のグループのサービスアカウントのなりすまし。
gcpImpersonationServiceAccount
(デフォルトでは設定されていません)
すべてのユーザーのデフォルトのサービスアカウントのなりすまし。
上記のプロパティのいずれかが設定されている場合、指定されたサービスアカウントは、BigQueryにアクセスするときに短命の資格情報を生成することによりなりすまします。
複数のプロパティが設定されている場合、ユーザー名に関連付けられたサービスアカウントは、マッチングユーザーとグループのグループ名に関連付けられたサービスアカウントよりも優先され、デフォルトのサービスアカウントのなりすましよりも優先されます。
アクセストークンの更新が不要な単純なアプリケーションの場合、 gcpAccessToken
構成オプションとしてアクセストークンを渡すことです。 gcloud auth application-default print-access-token
実行することで、アクセストークンを取得できます。
// Globally
spark.conf.set("gcpAccessToken", "<access-token>")
// Per read/Write
spark.read.format("bigquery").option("gcpAccessToken", "<acccess-token>")
重要: CredentialsProvider
とAccessTokenProvider
JavaまたはScalaやKotlinなどの他のJVM言語で実装する必要があります。実装を含む瓶は、クラスターのクラスパスにある必要があります。
通知:上記のオプションの1つのみを提供する必要があります。
フォワードプロキシに接続し、ユーザー資格情報を認証するには、次のオプションを構成します。
proxyAddress
:プロキシサーバーのアドレス。プロキシはHTTPプロキシである必要があり、アドレスはhost:port
形式にある必要があります。
proxyUsername
:プロキシに接続するために使用されるユーザー名。
proxyPassword
:プロキシに接続するために使用されるパスワード。
val df = spark.read.format("bigquery")
.option("proxyAddress", "http://my-proxy:1234")
.option("proxyUsername", "my-username")
.option("proxyPassword", "my-password")
.load("some-table")
同じプロキシパラメーターを、このようなSparkのRuntimeConfigを使用してグローバルに設定することもできます。
spark.conf.set("proxyAddress", "http://my-proxy:1234")
spark.conf.set("proxyUsername", "my-username")
spark.conf.set("proxyPassword", "my-password")
val df = spark.read.format("bigquery")
.load("some-table")
Hadoop構成でも次のように設定できます。
fs.gs.proxy.address
(「proxyaddress」に似ています)、 fs.gs.proxy.username
(「proxyusername」に類似)、 fs.gs.proxy.password
(「proxypassword」と同様)。
同じパラメーターが複数の場所に設定されている場合、優先度の順序は次のとおりです。
オプション( "key"、 "value")> spark.conf> hadoop構成