커넥터는 Google BigQuery 테이블을 Spark의 DataFrame으로 읽고 DataFrame을 다시 BigQuery에 쓰는 것을 지원합니다. 이는 Spark SQL 데이터 소스 API를 사용하여 BigQuery와 통신함으로써 수행됩니다.
Storage API는 Google Cloud Storage를 중개자로 사용하지 않고 gRPC를 통해 BigQuery에서 직접 데이터를 병렬로 스트리밍합니다.
일반적으로 더 나은 읽기 성능으로 이어지는 이전 내보내기 기반 읽기 흐름을 사용하는 것보다 여러 가지 장점이 있습니다.
Google Cloud Storage에 임시 파일을 남기지 않습니다. Arrow 또는 Avro 와이어 형식을 사용하여 BigQuery 서버에서 행을 직접 읽습니다.
새로운 API를 사용하면 열 및 조건자 필터링을 통해 관심 있는 데이터만 읽을 수 있습니다.
BigQuery는 열 형식 데이터 저장소로 지원되므로 모든 열을 읽지 않고도 효율적으로 데이터를 스트리밍할 수 있습니다.
Storage API는 조건자 필터의 임의 푸시다운을 지원합니다. 커넥터 버전 0.8.0-베타 이상에서는 BigQuery에 대한 임의 필터 푸시다운을 지원합니다.
Spark에는 중첩된 필드에서 필터 푸시다운을 허용하지 않는 알려진 문제가 있습니다. 예를 들어 address.city = "Sunnyvale"
과 같은 필터는 Bigquery로 푸시다운되지 않습니다.
API는 모두 완료될 때까지 판독기 간의 레코드 균형을 재조정합니다. 이는 모든 맵 단계가 거의 동시에 완료된다는 것을 의미합니다. Google Cloud Dataflow에서 동적 샤딩이 유사하게 사용되는 방법에 대한 이 블로그 기사를 참조하세요.
자세한 내용은 분할 구성을 참조하세요.
다음 지침을 따르십시오.
Apache Spark 환경이 없으면 사전 구성된 인증을 사용하여 Cloud Dataproc 클러스터를 만들 수 있습니다. 다음 예시에서는 Cloud Dataproc을 사용한다고 가정하지만 모든 클러스터에서 spark-submit
사용할 수 있습니다.
API를 사용하는 모든 Dataproc 클러스터에는 'bigquery' 또는 'cloud-platform' 범위가 필요합니다. Dataproc 클러스터에는 기본적으로 'bigquery' 범위가 있으므로 활성화된 프로젝트의 대부분의 클러스터는 기본적으로 작동해야 합니다.
MY_CLUSTER=...
gcloud dataproc clusters create "$MY_CLUSTER"
최신 버전의 커넥터는 다음 링크에서 공개적으로 제공됩니다.
버전 | 링크 |
---|---|
스파크 3.5 | gs://spark-lib/bigquery/spark-3.5-bigquery-0.41.0.jar (HTTP 링크) |
스파크 3.4 | gs://spark-lib/bigquery/spark-3.4-bigquery-0.41.0.jar (HTTP 링크) |
스파크 3.3 | gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar (HTTP 링크) |
스파크 3.2 | gs://spark-lib/bigquery/spark-3.2-bigquery-0.41.0.jar (HTTP 링크) |
스파크 3.1 | gs://spark-lib/bigquery/spark-3.1-bigquery-0.41.0.jar (HTTP 링크) |
스파크 2.4 | gs://spark-lib/bigquery/spark-2.4-bigquery-0.37.0.jar (HTTP 링크) |
스칼라 2.13 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-0.41.0.jar (HTTP 링크) |
스칼라 2.12 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.41.0.jar (HTTP 링크) |
스칼라 2.11 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.29.0.jar (HTTP 링크) |
처음 6개 버전은 Spark의 새로운 데이터 소스 API(Data Source API v2)를 기반으로 구축된 모든 Scala 버전의 Spark 2.4/3.1/3.2/3.3/3.4/3.5를 대상으로 하는 Java 기반 커넥터입니다.
마지막 두 개의 커넥터는 Scala 기반 커넥터입니다. 아래 설명된 대로 Spark 설치와 관련된 jar를 사용하십시오.
커넥터 스파크 | 2.3 | 2.4 | 3.0 | 3.1 | 3.2 | 3.3 | 3.4 | 3.5 |
---|---|---|---|---|---|---|---|---|
스파크-3.5-bigquery | ✓ | |||||||
스파크-3.4-bigquery | ✓ | ✓ | ||||||
스파크-3.3-bigquery | ✓ | ✓ | ✓ | |||||
스파크-3.2-bigquery | ✓ | ✓ | ✓ | ✓ | ||||
스파크-3.1-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | |||
스파크-2.4-bigquery | ✓ | |||||||
Spark-bigquery-with-종속성_2.13 | ✓ | ✓ | ✓ | ✓ | ||||
Spark-bigquery-with-종속성_2.12 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | |
Spark-bigquery-with-종속성_2.11 | ✓ | ✓ |
커넥터 Dataproc 이미지 | 1.3 | 1.4 | 1.5 | 2.0 | 2.1 | 2.2 | 서버리스 이미지 1.0 | 서버리스 이미지 2.0 | 서버리스 이미지 2.1 | 서버리스 이미지 2.2 |
---|---|---|---|---|---|---|---|---|---|---|
스파크-3.5-bigquery | ✓ | ✓ | ||||||||
스파크-3.4-bigquery | ✓ | ✓ | ✓ | |||||||
스파크-3.3-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ||||
스파크-3.2-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ||||
스파크-3.1-bigquery | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | |||
스파크-2.4-bigquery | ✓ | ✓ | ||||||||
Spark-bigquery-with-종속성_2.13 | ✓ | ✓ | ✓ | |||||||
Spark-bigquery-with-종속성_2.12 | ✓ | ✓ | ✓ | ✓ | ✓ | |||||
Spark-bigquery-with-종속성_2.11 | ✓ | ✓ |
커넥터는 Maven Central 저장소에서도 사용할 수 있습니다. --packages
옵션 또는 spark.jars.packages
구성 속성을 사용하여 사용할 수 있습니다. 다음 값을 사용하십시오
버전 | 커넥터 아티팩트 |
---|---|
스파크 3.5 | com.google.cloud.spark:spark-3.5-bigquery:0.41.0 |
스파크 3.4 | com.google.cloud.spark:spark-3.4-bigquery:0.41.0 |
스파크 3.3 | com.google.cloud.spark:spark-3.3-bigquery:0.41.0 |
스파크 3.2 | com.google.cloud.spark:spark-3.2-bigquery:0.41.0 |
스파크 3.1 | com.google.cloud.spark:spark-3.1-bigquery:0.41.0 |
스파크 2.4 | com.google.cloud.spark:spark-2.4-bigquery:0.37.0 |
스칼라 2.13 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.13:0.41.0 |
스칼라 2.12 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0 |
스칼라 2.11 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.29.0 |
이미지 2.1 이상을 사용하여 생성된 Dataproc 클러스터 또는 Dataproc 서버리스 서비스를 사용하는 배치에는 Spark BigQuery 커넥터가 내장되어 있습니다. 이 경우 표준 --jars
또는 --packages
(또는 spark.jars
/ spark.jars.packages
구성)를 사용하면 내장 커넥터가 우선하므로 도움이 되지 않습니다.
내장 버전이 아닌 다른 버전을 사용하려면 다음 중 하나를 수행하십시오.
--metadata SPARK_BQ_CONNECTOR_VERSION=0.41.0
또는 --metadata SPARK_BQ_CONNECTOR_URL=gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar
버전을 업그레이드합니다. --metadata SPARK_BQ_CONNECTOR_URL=gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar
다른 jar로 클러스터를 만듭니다. URL은 클러스터의 Spark 버전에 대한 유효한 커넥터 JAR을 가리킬 수 있습니다.--properties dataproc.sparkBqConnector.version=0.41.0
또는 --properties dataproc.sparkBqConnector.uri=gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar
다른 jar로 배치를 생성합니다. URL은 런타임의 Spark 버전에 대한 유효한 커넥터 JAR을 가리킬 수 있습니다. 다음을 실행하여 컴파일하지 않고 API에 대해 간단한 PySpark 단어 개수를 실행할 수 있습니다.
Dataproc 이미지 1.5 이상
gcloud dataproc jobs submit pyspark --cluster "$MY_CLUSTER"
--jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.41.0.jar
examples/python/shakespeare.py
Dataproc 이미지 1.4 이하
gcloud dataproc jobs submit pyspark --cluster "$MY_CLUSTER"
--jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.29.0.jar
examples/python/shakespeare.py
https://codelabs.developers.google.com/codelabs/pyspark-bigquery
커넥터는 언어 간 Spark SQL 데이터 소스 API를 사용합니다.
df = spark.read
.format("bigquery")
.load("bigquery-public-data.samples.shakespeare")
또는 Scala 전용 암시적 API:
import com.google.cloud.spark.bigquery._
val df = spark.read.bigquery("bigquery-public-data.samples.shakespeare")
자세한 내용은 Python, Scala 및 Java의 추가 코드 샘플을 참조하세요.
커넥터를 사용하면 BigQuery에서 표준 SQL SELECT 쿼리를 실행하고 해당 결과를 Spark 데이터 프레임으로 직접 가져올 수 있습니다. 이는 다음 코드 샘플에 설명된 대로 쉽게 수행됩니다.
spark.conf.set("viewsEnabled","true")
spark.conf.set("materializationDataset","<dataset>")
sql = """
SELECT tag, COUNT(*) c
FROM (
SELECT SPLIT(tags, '|') tags
FROM `bigquery-public-data.stackoverflow.posts_questions` a
WHERE EXTRACT(YEAR FROM creation_date)>=2014
), UNNEST(tags) tag
GROUP BY 1
ORDER BY 2 DESC
LIMIT 10
"""
df = spark.read.format("bigquery").load(sql)
df.show()
결과가 나옵니다
+----------+-------+
| tag| c|
+----------+-------+
|javascript|1643617|
| python|1352904|
| java|1218220|
| android| 913638|
| php| 911806|
| c#| 905331|
| html| 769499|
| jquery| 608071|
| css| 510343|
| c++| 458938|
+----------+-------+
두 번째 옵션은 다음과 같은 query
옵션을 사용하는 것입니다.
df = spark.read.format("bigquery").option("query", sql).load()
결과만 유선을 통해 전송되므로 실행 속도가 더 빨라집니다. 비슷한 방식으로 쿼리는 Spark에서 조인을 실행하는 것보다 더 효율적으로 JOIN을 포함하거나 하위 쿼리, BigQuery 사용자 정의 함수, 와일드 카드 테이블, BigQuery ML 등과 같은 다른 BigQuery 기능을 사용할 수 있습니다.
이 기능을 사용하려면 다음 구성을 설정해야 합니다.
viewsEnabled
true
로 설정해야 합니다.materializationDataset
GCP 사용자에게 테이블 생성 권한이 있는 데이터세트로 설정되어야 합니다. materializationProject
선택 사항입니다. 참고: BigQuery 문서에 언급된 대로 쿼리된 테이블은 materializationDataset
와 동일한 위치에 있어야 합니다. 또한 SQL statement
테이블이 parentProject
가 아닌 다른 프로젝트의 테이블인 경우 정규화된 테이블 이름(예: [project].[dataset].[table]
을 사용하세요.
중요: 이 기능은 BigQuery에서 쿼리를 실행하고 그 결과를 Spark가 결과를 읽는 임시 테이블에 저장하여 구현됩니다. 이로 인해 BigQuery 계정에 추가 비용이 추가될 수 있습니다.
커넥터는 BigQuery 뷰에서 읽기를 예비적으로 지원합니다. 몇 가지 주의사항이 있습니다.
collect()
또는 count()
작업을 실행하기 전이라도 읽기 성능에 영향을 미칩니다.materializationProject
및 materializationDataset
옵션으로 구성할 수 있습니다. 이러한 옵션은 뷰를 읽기 전에 spark.conf.set(...)
호출하여 전역적으로 설정할 수도 있습니다..option("viewsEnabled", "true")
)를 읽을 때 viewsEnabled 옵션을 설정하거나 spark.conf.set("viewsEnabled", "true")
호출하여 전역적으로 설정하세요.materializationDataset
는 뷰와 동일한 위치에 있어야 합니다.BigQuery에 DataFrame을 쓰는 작업은 직접 및 간접이라는 두 가지 방법을 사용하여 수행할 수 있습니다.
이 방법에서는 BigQuery Storage Write API를 사용하여 데이터가 BigQuery에 직접 기록됩니다. 이 옵션을 활성화하려면 아래와 같이 writeMethod
옵션을 direct
로 설정하십시오.
df.write
.format("bigquery")
.option("writeMethod", "direct")
.save("dataset.table")
APPEND 저장 모드 및 OVERWRITE 모드(날짜 및 범위로 파티션을 나눈 경우에만)에서 파티션을 나눈 기존 테이블(날짜로 파티션을 나눈, 수집 시간으로 파티션을 나눈, 범위로 파티션을 나눈) 쓰기는 커넥터와 BigQuery Storage Write API에서 완벽하게 지원됩니다. 아래에 설명된 datePartition
, partitionField
, partitionType
, partitionRangeStart
, partitionRangeEnd
, partitionRangeInterval
사용은 현재 직접 쓰기 방법에서 지원되지 않습니다.
중요: BigQuery Storage Write API 가격에 대해서는 데이터 수집 가격 페이지를 참조하세요.
중요: 이전 버전에는 경우에 따라 테이블이 삭제될 수 있는 버그가 있으므로 직접 쓰기에는 버전 0.24.2 이상을 사용하십시오.
이 방법에서는 데이터가 먼저 GCS에 기록된 다음 BigQuery에 로드됩니다. 임시 데이터 위치를 나타내도록 GCS 버킷을 구성해야 합니다.
df.write
.format("bigquery")
.option("temporaryGcsBucket","some-bucket")
.save("dataset.table")
데이터는 Apache Parquet, Apache ORC 또는 Apache Avro 형식을 사용하여 임시 저장됩니다.
GCS 버킷과 형식은 다음과 같이 Spark의 RuntimeConfig를 사용하여 전역적으로 설정할 수도 있습니다.
spark.conf.set("temporaryGcsBucket","some-bucket")
df.write
.format("bigquery")
.save("dataset.table")
DataFrame을 BigQuery로 스트리밍할 때 각 배치는 비스트리밍 DataFrame과 동일한 방식으로 작성됩니다. HDFS 호환 체크포인트 위치(예: path/to/HDFS/dir
또는 gs://checkpoint-bucket/checkpointDir
)를 지정해야 합니다.
df.writeStream
.format("bigquery")
.option("temporaryGcsBucket","some-bucket")
.option("checkpointLocation", "some-location")
.option("table", "dataset.table")
중요: 커넥터는 다른 GCS 커넥터(있는 경우)와의 충돌을 피하기 위해 GCS 커넥터를 구성하지 않습니다. 커넥터의 쓰기 기능을 사용하려면 여기에 설명된 대로 클러스터에서 GCS 커넥터를 구성하세요.
API는 읽기 구성을 위한 다양한 옵션을 지원합니다.
<style> table#propertytable td, 테이블 th { word-break:break-word } </style>재산 | 의미 | 용법 |
---|---|---|
table | [[project:]dataset.]table 형식의 BigQuery 테이블입니다. 대신 load() / save() 의 path 매개변수를 사용하는 것이 좋습니다. 이 옵션은 더 이상 사용되지 않으며 향후 버전에서는 제거될 예정입니다.(더 이상 사용되지 않음) | 읽기/쓰기 |
dataset | 테이블이 포함된 데이터 세트입니다. 이 옵션은 표준 테이블 및 뷰와 함께 사용해야 하지만 쿼리 결과를 로드할 때는 사용할 수 없습니다. ( table 에서 생략하지 않는 한 선택 사항) | 읽기/쓰기 |
project | 테이블의 Google Cloud 프로젝트 ID입니다. 이 옵션은 표준 테이블 및 뷰와 함께 사용해야 하지만 쿼리 결과를 로드할 때는 사용할 수 없습니다. (선택사항. 기본값은 사용 중인 서비스 계정의 프로젝트) | 읽기/쓰기 |
parentProject | 내보내기 비용을 청구할 테이블의 Google Cloud 프로젝트 ID입니다. (선택사항. 기본값은 사용 중인 서비스 계정의 프로젝트) | 읽기/쓰기 |
maxParallelism | 데이터를 분할할 최대 파티션 수입니다. BigQuery에서 데이터가 충분히 작다고 판단하면 실제 숫자는 더 적을 수도 있습니다. 파티션당 판독기를 예약할 실행자가 충분하지 않은 경우 일부 파티션이 비어 있을 수 있습니다. 중요: 이전 매개변수( parallelism )는 계속 지원되지만 더 이상 사용되지 않는 모드입니다. 커넥터 버전 1.0에서는 제거될 예정입니다.(선택 사항. 기본적으로는 PreferredMinParallelism과 20,000 중 더 큰 값으로 설정됩니다.) | 읽다 |
preferredMinParallelism | 데이터를 분할할 기본 최소 파티션 수입니다. BigQuery에서 데이터가 충분히 작다고 판단하면 실제 숫자는 더 적을 수도 있습니다. 파티션당 판독기를 예약할 실행자가 충분하지 않은 경우 일부 파티션이 비어 있을 수 있습니다. (선택 사항. 기본값은 애플리케이션의 기본 병렬 처리 및 maxParallelism의 3배 중 가장 작은 값입니다.) | 읽다 |
viewsEnabled | 커넥터가 테이블뿐만 아니라 보기에서도 읽을 수 있도록 합니다. 이 옵션을 활성화하기 전에 관련 섹션을 읽어 보십시오. (선택사항. 기본값은 false ) | 읽다 |
materializationProject | 구체화된 뷰가 생성될 프로젝트 ID (선택사항. 기본값은 보기의 프로젝트 ID입니다) | 읽다 |
materializationDataset | 구체화된 뷰가 생성될 데이터세트입니다. 이 데이터 세트는 뷰 또는 쿼리된 테이블과 동일한 위치에 있어야 합니다. (선택사항. 기본값은 뷰의 데이터세트입니다) | 읽다 |
materializationExpirationTimeInMinutes | 뷰 또는 쿼리의 구체화된 데이터를 보유하는 임시 테이블의 만료 시간(분)입니다. 커넥터는 로컬 캐시 사용 및 BigQuery 계산을 줄이기 위해 임시 테이블을 재사용할 수 있으므로 값이 너무 낮으면 오류가 발생할 수 있습니다. 값은 양의 정수여야 합니다. (선택 사항. 기본값은 1440 또는 24시간) | 읽다 |
readDataFormat | BigQuery에서 읽기 위한 데이터 형식입니다. 옵션 : ARROW , AVRO (선택사항. 기본값은 ARROW ) | 읽다 |
optimizedEmptyProjection | 커넥터는 count() 실행에 사용되는 최적화된 빈 프로젝션(열 없이 선택) 논리를 사용합니다. 이 논리는 테이블 메타데이터에서 직접 데이터를 가져오거나 필터가 있는 경우 훨씬 효율적인 `SELECT COUNT(*) WHERE...`를 수행합니다. 이 옵션을 false 로 설정하면 이 논리의 사용을 취소할 수 있습니다.(선택사항, 기본값은 true ) | 읽다 |
pushAllFilters | true 로 설정하면 커넥터는 Spark가 BigQuery Storage API에 위임할 수 있는 모든 필터를 푸시합니다. 이렇게 하면 BigQuery Storage API 서버에서 Spark 클라이언트로 전송해야 하는 데이터 양이 줄어듭니다. 이 옵션은 더 이상 사용되지 않으며 향후 버전에서는 제거될 예정입니다.(선택사항, 기본값은 true )(더 이상 사용되지 않음) | 읽다 |
bigQueryJobLabel | 커넥터가 시작한 쿼리에 라벨을 추가하고 BigQuery 작업을 로드하는 데 사용할 수 있습니다. 여러 라벨을 설정할 수 있습니다. (선택 과목) | 읽다 |
bigQueryTableLabel | 테이블에 쓰는 동안 테이블에 레이블을 추가하는 데 사용할 수 있습니다. 여러 라벨을 설정할 수 있습니다. (선택 과목) | 쓰다 |
traceApplicationName | BigQuery Storage 읽기 및 쓰기 세션을 추적하는 데 사용되는 애플리케이션 이름입니다. 세션에서 추적 ID를 설정하려면 애플리케이션 이름을 설정해야 합니다. (선택 과목) | 읽다 |
traceJobId | BigQuery Storage 읽기 및 쓰기 세션을 추적하는 데 사용되는 작업 ID입니다. (선택사항, 기본적으로 Dataproc 작업 ID가 존재합니다. 그렇지 않으면 Spark 애플리케이션 ID를 사용합니다.) | 읽다 |
createDisposition | 작업에서 새 테이블을 생성할 수 있는지 여부를 지정합니다. 허용되는 값은 다음과 같습니다.
(선택 사항. 기본값은 CREATE_IF_NEEDED입니다). | 쓰다 |
writeMethod | BigQuery에 데이터가 기록되는 방법을 제어합니다. 사용 가능한 값은 BigQuery Storage Write API를 사용하는 direct 값과 먼저 GCS에 데이터를 쓴 후 BigQuery 로드 작업을 트리거하는 indirect 입니다. 여기에서 자세한 내용을 확인하세요(선택사항, 기본값은 indirect ) | 쓰다 |
writeAtLeastOnce | 데이터가 BigQuery에 한 번 이상 기록되도록 보장합니다. 이는 정확히 한 번만 보장하는 것보다 보장이 적습니다. 이는 데이터가 작은 배치로 지속적으로 기록되는 스트리밍 시나리오에 적합합니다. (선택사항. 기본값은 false )'DIRECT' 쓰기 방법에서만 지원되며 모드는 '덮어쓰기'가 아닙니다 . | 쓰다 |
temporaryGcsBucket | BigQuery에 데이터가 로드되기 전에 데이터를 임시로 보관하는 GCS 버킷입니다. Spark 구성( spark.conf.set(...) )에 설정되지 않은 경우 필수입니다.`DIRECT` 쓰기 방법에서는 지원되지 않습니다. | 쓰다 |
persistentGcsBucket | BigQuery에 로드되기 전에 데이터를 보관하는 GCS 버킷입니다. 알림을 받은 경우 BigQuery에 데이터를 쓴 후에는 데이터가 삭제되지 않습니다. `DIRECT` 쓰기 방법에서는 지원되지 않습니다. | 쓰다 |
persistentGcsPath | BigQuery에 로드되기 전에 데이터를 보관하는 GCS 경로입니다. persistentGcsBucket 에만 사용됩니다.`DIRECT` 쓰기 방법에서는 지원되지 않습니다. | 쓰다 |
intermediateFormat | BigQuery에 로드되기 전의 데이터 형식으로, 값은 'parquet', 'orc' 또는 'avro'일 수 있습니다. Avro 형식을 사용하려면 런타임에 Spark-avro 패키지를 추가해야 합니다. (선택 사항. 기본값은 parquet 입니다). 쓰기 전용입니다. 'INDIRECT' 쓰기 방법에만 지원됩니다. | 쓰다 |
useAvroLogicalTypes | Avro(`.option("intermediateFormat", "avro")`)에서 로드할 때 BigQuery는 [기본적으로] 논리 유형 대신 기본 Avro 유형을 사용합니다(https://cloud.google.com/bigquery/docs/ 로딩-데이터-클라우드-스토리지-avro#logical_types). 이 옵션을 제공하면 Avro 논리 유형이 해당 BigQuery 데이터 유형으로 변환됩니다. (선택 사항. 기본값은 false 입니다). 쓰기 전용입니다. | 쓰다 |
datePartition | 데이터가 기록될 날짜 파티션입니다. YYYYMMDD 형식으로 지정된 날짜 문자열이어야 합니다. 다음과 같이 단일 파티션의 데이터를 덮어쓰는 데 사용할 수 있습니다.
(선택 과목). 쓰기 전용입니다. 다음과 같은 다양한 파티션 유형과 함께 사용할 수도 있습니다. 시간: YYYYMMDDHH 월: YYYYMM 연도: YYYY `DIRECT` 쓰기 방법에서는 지원되지 않습니다. | 쓰다 |
partitionField | 이 필드가 지정되면 테이블이 이 필드로 분할됩니다. 시간 분할의 경우 `partitionType` 옵션과 함께 지정합니다. 정수 범위 파티셔닝의 경우 `partitionRangeStart`, `partitionRangeEnd,`partitionRangeInterval`의 3가지 옵션과 함께 지정합니다. 필드는 시간 분할의 경우 최상위 TIMESTAMP 또는 DATE 필드여야 하고, 정수 범위 분할의 경우 INT64여야 합니다. 해당 모드는 NULLABLE 또는 REQUIRED 여야 합니다. 시간으로 파티션을 나눈 테이블에 대해 옵션이 설정되지 않은 경우 테이블은 '_PARTITIONTIME' as TIMESTAMP 또는 '_PARTITIONDATE' as DATE 통해 참조되는 유사 열로 파티션이 지정됩니다.(선택 과목). `DIRECT` 쓰기 방법에서는 지원되지 않습니다. | 쓰다 |
partitionExpirationMs | 테이블의 파티션에 대한 스토리지를 유지하는 시간(밀리초)입니다. 파티션의 스토리지에는 해당 파티션 시간에 이 값을 더한 만료 시간이 있습니다. (선택 과목). `DIRECT` 쓰기 방법에서는 지원되지 않습니다. | 쓰다 |
partitionType | 시간 분할을 지정하는 데 사용됩니다. 지원되는 유형은 HOUR, DAY, MONTH, YEAR 입니다.이 옵션은 시간 분할 대상 테이블에 필수 입니다. (선택 사항. PartitionField가 지정된 경우 기본값은 DAY입니다.) `DIRECT` 쓰기 방법에서는 지원되지 않습니다. | 쓰다 |
partitionRangeStart , partitionRangeEnd , partitionRangeInterval | 정수 범위 분할을 지정하는 데 사용됩니다. 이러한 옵션은 대상 테이블을 정수 범위로 분할하려면 필수 입니다. 3가지 옵션을 모두 지정해야 합니다. `DIRECT` 쓰기 방법에서는 지원되지 않습니다. | 쓰다 |
clusteredFields | 쉼표로 구분된 반복되지 않는 최상위 열의 문자열입니다. (선택 과목). | 쓰다 |
allowFieldAddition | BigQuery LoadJob에 ALLOW_FIELD_ADDITION SchemaUpdateOption을 추가합니다. 허용되는 값은 true 및 false 입니다.(선택 사항. 기본값은 false 입니다).`INDIRECT` 쓰기 방법에서만 지원됩니다. | 쓰다 |
allowFieldRelaxation | BigQuery LoadJob에 ALLOW_FIELD_RELAXATION SchemaUpdateOption을 추가합니다. 허용되는 값은 true 및 false 입니다.(선택 사항. 기본값은 false 입니다).`INDIRECT` 쓰기 방법에서만 지원됩니다. | 쓰다 |
proxyAddress | 프록시 서버의 주소입니다. 프록시는 HTTP 프록시여야 하며 주소는 '호스트:포트' 형식이어야 합니다. Spark 구성( spark.conf.set(...) ) 또는 Hadoop 구성( fs.gs.proxy.address )에서 대신 설정할 수 있습니다.(선택사항. 프록시를 통해 GCP에 연결하는 경우에만 필요합니다.) | 읽기/쓰기 |
proxyUsername | 프록시에 연결하는 데 사용되는 userName입니다. Spark 구성( spark.conf.set(...) ) 또는 Hadoop 구성( fs.gs.proxy.username )에서 대신 설정할 수 있습니다.(선택사항. 인증을 통해 프록시를 통해 GCP에 연결하는 경우에만 필요합니다.) | 읽기/쓰기 |
proxyPassword | 프록시에 연결하는 데 사용되는 비밀번호입니다. Spark 구성( spark.conf.set(...) ) 또는 Hadoop 구성( fs.gs.proxy.password )에서 대신 설정할 수 있습니다.(선택사항. 인증을 통해 프록시를 통해 GCP에 연결하는 경우에만 필요합니다.) | 읽기/쓰기 |
httpMaxRetry | BigQuery에 대한 하위 수준 HTTP 요청의 최대 재시도 횟수입니다. Spark 구성( spark.conf.set("httpMaxRetry", ...) ) 또는 Hadoop 구성( fs.gs.http.max.retry )에서 대안으로 설정할 수 있습니다.(선택사항. 기본값은 10) | 읽기/쓰기 |
httpConnectTimeout | BigQuery와의 연결 설정을 위한 제한 시간(밀리초)입니다. Spark 구성( spark.conf.set("httpConnectTimeout", ...) ) 또는 Hadoop 구성( fs.gs.http.connect-timeout )에서 대신 설정할 수 있습니다.(선택 사항. 기본값은 60000ms입니다. 무한 시간 초과의 경우 0, 20000의 경우 음수) | 읽기/쓰기 |
httpReadTimeout | 설정된 연결에서 데이터를 읽는 데 걸리는 시간 제한(밀리초)입니다. Spark 구성( spark.conf.set("httpReadTimeout", ...) ) 또는 Hadoop 구성( fs.gs.http.read-timeout )에서 대신 설정할 수 있습니다.(선택 사항. 기본값은 60000ms입니다. 무한 시간 초과의 경우 0, 20000의 경우 음수) | 읽다 |
arrowCompressionCodec | Arrow 형식을 사용할 때 BigQuery 테이블에서 읽는 동안의 압축 코덱입니다. 옵션: ZSTD (Zstandard compression) , LZ4_FRAME (https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md) , COMPRESSION_UNSPECIFIED . Java를 사용하는 경우 권장되는 압축 코덱은 ZSTD 입니다.(선택 사항. 기본값은 압축이 사용되지 않음을 의미하는 COMPRESSION_UNSPECIFIED 입니다) | 읽다 |
responseCompressionCodec | ReadRowsResponse 데이터를 압축하는 데 사용되는 압축 코덱입니다. 옵션: RESPONSE_COMPRESSION_CODEC_UNSPECIFIED , RESPONSE_COMPRESSION_CODEC_LZ4 (선택 사항. 기본값은 압축이 사용되지 않음을 의미하는 RESPONSE_COMPRESSION_CODEC_UNSPECIFIED 입니다) | 읽다 |
cacheExpirationTimeInMinutes | 쿼리 정보를 저장하는 메모리 내 캐시의 만료 시간입니다. 캐싱을 비활성화하려면 값을 0으로 설정합니다. (선택사항. 기본값은 15분) | 읽다 |
enableModeCheckForSchemaFields | DIRECT 쓰기 중에 대상 스키마의 모든 필드 모드가 해당 소스 필드 스키마의 모드와 동일한지 확인합니다. 기본값은 true입니다. 즉, 검사가 기본적으로 수행됩니다. false로 설정하면 모드 확인이 무시됩니다. | 쓰다 |
enableListInference | 모드가 Parquet인 경우 특히 스키마 추론을 사용할지 여부를 나타냅니다(https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions). 기본값은 거짓입니다. | 쓰다 |
bqChannelPoolSize | BigQueryReadClient에서 생성된 gRPC 채널 풀의 (고정된) 크기입니다. 최적의 성능을 위해서는 최소한 클러스터 실행기의 코어 수로 설정해야 합니다. | 읽다 |
createReadSessionTimeoutInSeconds | 테이블을 읽을 때 ReadSession을 생성하는 시간 초과(초)입니다. 매우 큰 테이블의 경우 이 값을 늘려야 합니다. (선택사항. 기본값은 600초) | 읽다 |
queryJobPriority | BigQuery 쿼리에서 데이터를 읽는 동안 작업에 설정된 우선순위 수준입니다. 허용되는 값은 다음과 같습니다.
(선택사항. 기본값은 INTERACTIVE ) | 읽기/쓰기 |
destinationTableKmsKeyName | 대상 BigQuery 테이블을 보호하는 데 사용될 Cloud KMS 암호화 키를 설명합니다. 프로젝트와 연결된 BigQuery 서비스 계정에는 이 암호화 키에 대한 액세스 권한이 필요합니다. BigQuery에서 CMEK를 사용하는 방법에 대한 자세한 내용은 [여기](https://cloud.google.com/bigquery/docs/customer-managed-encryption#key_resource_id)를 참조하세요. 참고: 테이블은 커넥터에서 생성된 경우에만 키로 암호화됩니다. 기존에 암호화되지 않은 테이블은 이 옵션을 설정해도 암호화되지 않습니다. (선택 과목) | 쓰다 |
allowMapTypeConversion | 레코드에 필드 이름이 key 및 value 인 두 개의 하위 필드가 있는 경우 BigQuery 레코드에서 Spark MapType으로의 변환을 비활성화하는 부울 구성입니다. 기본값은 변환을 허용하는 true 입니다.(선택 과목) | 읽다 |
spark.sql.sources.partitionOverwriteMode | 테이블이 범위/시간으로 분할된 경우 쓰기 시 덮어쓰기 모드를 지정하는 구성입니다. 현재 지원되는 두 가지 모드: STATIC 및 DYNAMIC . STATIC 모드에서는 전체 테이블을 덮어씁니다. DYNAMIC 모드에서는 기존 테이블의 파티션이 데이터를 덮어씁니다. 기본값은 STATIC 입니다.(선택 과목) | 쓰다 |
enableReadSessionCaching | 읽기 세션 캐싱을 비활성화하는 부울 구성입니다. 더 빠른 Spark 쿼리 계획을 위해 BigQuery 읽기 세션을 캐시합니다. 기본값은 true 입니다.(선택 과목) | 읽다 |
readSessionCacheDurationMins | 읽기 세션 캐싱 기간을 분 단위로 설정하도록 구성합니다. enableReadSessionCaching 이 true (기본값)인 경우에만 작동합니다. 읽기 세션을 캐시하는 기간을 지정할 수 있습니다. 최대 허용 값은 300 입니다. 기본값은 5 입니다.(선택 과목) | 읽다 |
bigQueryJobTimeoutInMinutes | BigQuery 작업 제한 시간을 분 단위로 설정하도록 구성합니다. 기본값은 360 분입니다.(선택 과목) | 읽기/쓰기 |
snapshotTimeMillis | 테이블 스냅샷을 읽는 데 사용하기 위해 밀리초 단위로 지정된 타임스탬프입니다. 기본적으로 이는 설정되지 않으며 최신 버전의 테이블을 읽습니다. (선택 과목) | 읽다 |
bigNumericDefaultPrecision | BigQuery 기본값이 Spark에 비해 너무 넓기 때문에 BigNumeric 필드에 대한 대체 기본 정밀도입니다. 값은 1에서 38 사이일 수 있습니다. 이 기본값은 필드에 매개변수화되지 않은 BigNumeric 유형이 있는 경우에만 사용됩니다. 실제 데이터의 정밀도가 지정된 값보다 높을 경우 데이터가 손실될 수 있으므로 주의하시기 바랍니다. (선택 과목) | 읽기/쓰기 |
bigNumericDefaultScale | BigNumeric 필드에 대한 대체 기본 배율입니다. 값은 0에서 38 사이일 수 있으며 bigNumericFieldsPrecision보다 작을 수 있습니다. 이 기본값은 필드에 매개변수화되지 않은 BigNumeric 유형이 있는 경우에만 사용됩니다. 실제 데이터의 규모가 지정된 것보다 클 경우 데이터가 손실될 수 있으므로 주의하시기 바랍니다. (선택 과목) | 읽기/쓰기 |
spark-submit
의 --conf
매개변수 또는 gcloud dataproc submit spark
의 --properties
매개변수를 사용하여 코드 외부에서 옵션을 설정할 수도 있습니다. 이를 사용하려면 접두사 spark.datasource.bigquery.
예를 들어 spark.conf.set("temporaryGcsBucket", "some-bucket")
--conf spark.datasource.bigquery.temporaryGcsBucket=some-bucket
으로 설정할 수도 있습니다.
DATETIME
및 TIME
제외한 모든 BigQuery 데이터 유형은 해당 Spark SQL 데이터 유형에 매핑됩니다. 모든 매핑은 다음과 같습니다.
BigQuery 표준 SQL 데이터 유형 | 스파크 SQL 데이터 유형 | 메모 |
BOOL | BooleanType | |
INT64 | LongType | |
FLOAT64 | DoubleType | |
NUMERIC | DecimalType | Numeric 및 BigNumeric 지원을 참조하세요. |
BIGNUMERIC | DecimalType | Numeric 및 BigNumeric 지원을 참조하세요. |
STRING | StringType | |
BYTES | BinaryType | |
STRUCT | StructType | |
ARRAY | ArrayType | |
TIMESTAMP | TimestampType | |
DATE | DateType | |
DATETIME | StringType , TimestampNTZType * | Spark에는 DATETIME 유형이 없습니다. Spark 문자열은 BQ DATETIME 리터럴 형식인 경우 기존 BQ DATETIME 열에 쓸 수 있습니다. * Spark 3.4+의 경우 BQ DATETIME은 Spark의 TimestampNTZ 유형, 즉 java LocalDateTime으로 읽혀집니다. |
TIME | LongType , StringType * | Spark에는 TIME 유형이 없습니다. 자정 이후 마이크로초를 나타내는 생성된 long은 TimestampType으로 안전하게 캐스팅될 수 있지만 이로 인해 날짜가 현재 날짜로 추론됩니다. 따라서 시간은 길게 남겨지고 사용자는 원하는 경우 캐스팅할 수 있습니다. Timestamp TIME으로 캐스팅할 때 DATETIME과 동일한 TimeZone 문제가 있습니다. * Spark 문자열은 BQ TIME 리터럴 형식인 경우 기존 BQ TIME 열에 쓸 수 있습니다. |
JSON | StringType | Spark에는 JSON 유형이 없습니다. 값은 문자열로 읽혀집니다. JSON을 BigQuery에 다시 작성하려면 다음 조건이 필요합니다 .
|
ARRAY<STRUCT<key,value>> | MapType | BigQuery에는 MAP 유형이 없으므로 Apache Avro 및 BigQuery 로드 작업과 같은 다른 변환과 유사하게 커넥터는 Spark 맵을 REPEATED STRUCT<key,value>로 변환합니다. 즉, 지도 쓰기 및 읽기는 가능하지만 지도 의미 체계를 사용하는 BigQuery에서 SQL 실행은 지원되지 않습니다. BigQuery SQL을 사용하여 지도의 값을 참조하려면 BigQuery 문서를 확인하세요. 이러한 비호환성으로 인해 몇 가지 제한 사항이 적용됩니다.
|
Spark ML Vector 및 Matrix는 밀집 버전과 희소 버전을 포함하여 지원됩니다. 데이터는 BigQuery RECORD로 저장됩니다. 필드의 스파크 유형을 포함하는 필드 설명에 접미사가 추가됩니다.
이러한 유형을 BigQuery에 작성하려면 ORC 또는 Avro 중간 형식을 사용하고 해당 유형을 행의 열(즉, 구조체의 필드가 아님)로 사용하세요.
BigQuery의 BigNumeric의 정밀도는 76.76(77번째 숫자는 부분)이고 소수 자릿수는 38입니다. 이 정밀도와 소수 자릿수는 Spark의 DecimalType(38 소수 자릿수 및 38 정밀도) 지원 범위를 벗어나므로 정밀도가 38보다 큰 BigNumeric 필드는 사용할 수 없습니다. . 이 Spark 제한 사항이 업데이트되면 그에 따라 커넥터도 업데이트됩니다.
Spark Decimal/BigQuery Numeric 변환은 유형의 매개변수화를 유지하려고 시도합니다. 즉, NUMERIC(10,2)
Decimal(10,2)
로 변환되고 그 반대의 경우도 마찬가지입니다. 그러나 매개변수가 손실되는 경우도 있습니다. 즉, 매개변수가 기본값인 NUMERIC(38,9) 및 BIGNUMERIC(76,38)으로 되돌아갑니다. 즉, 현재 BigNumeric 읽기는 표준 테이블에서만 지원되고 BigQuery 뷰나 BigQuery 쿼리에서 데이터를 읽을 때는 지원되지 않습니다.
커넥터는 자동으로 열을 계산하고 DataFrame의 SELECT
문을 푸시다운 필터링합니다.
spark.read.bigquery("bigquery-public-data:samples.shakespeare")
.select("word")
.where("word = 'Hamlet' or word = 'Claudius'")
.collect()
필터를 열 word
로 필터링하고 조건자 필터 word = 'hamlet' or word = 'Claudius'
아래로 푸시했습니다.
BigQuery에 여러 번 읽기 요청을 하지 않으려면 필터링하기 전에 DataFrame을 캐시하면 됩니다. 예:
val cachedDF = spark.read.bigquery("bigquery-public-data:samples.shakespeare").cache()
val rows = cachedDF.select("word")
.where("word = 'Hamlet'")
.collect()
// All of the table was cached and this doesn't require an API call
val otherRows = cachedDF.select("word_count")
.where("word = 'Romeo'")
.collect()
자동 푸시다운을 재정의하고 Spark가 클라이언트에서 나머지 필터링을 수행하는 filter
옵션을 수동으로 지정할 수도 있습니다.
유사 열 _PARTITIONDATE 및 _PARTITIONTIME은 테이블 스키마의 일부가 아닙니다. 따라서 분할된 테이블의 파티션별로 쿼리하려면 위에 표시된 where() 메소드를 사용하지 마십시오. 대신 다음과 같은 방식으로 필터 옵션을 추가하세요.
val df = spark.read.format("bigquery")
.option("filter", "_PARTITIONDATE > '2019-01-01'")
...
.load(TABLE)
기본적으로 커넥터는 필터링 전 읽고 있는 테이블에 400MB당 하나의 파티션을 생성합니다. 이는 BigQuery Storage API에서 지원하는 최대 리더 수와 대략적으로 일치해야 합니다. 이는 maxParallelism
속성을 사용하여 명시적으로 구성할 수 있습니다. BigQuery는 서버 제약조건에 따라 파티션 수를 제한할 수 있습니다.
BigQuery 리소스 사용량 추적을 지원하기 위해 커넥터는 BigQuery 리소스에 태그를 지정하는 다음 옵션을 제공합니다.
커넥터는 BigQuery 로드 및 쿼리 작업을 실행할 수 있습니다. 작업에 라벨을 추가하는 방법은 다음과 같습니다.
spark.conf.set("bigQueryJobLabel.cost_center", "analytics")
spark.conf.set("bigQueryJobLabel.usage", "nightly_etl")
그러면 cost_center
= analytics
및 usage
= nightly_etl
라벨이 생성됩니다.
읽기 및 쓰기 세션에 주석을 추가하는 데 사용됩니다. 추적 ID는 Spark:ApplicationName:JobID
형식입니다. 이는 선택 옵션이며 이를 사용하려면 사용자가 traceApplicationName
속성을 설정해야 합니다. JobID는 Spark 애플리케이션 ID(예: application_1648082975639_0001
)로 대체되어 Dataproc 작업 ID에 의해 자동 생성됩니다. traceJobId
옵션을 설정하여 작업 ID를 재정의할 수 있습니다. 추적 ID의 총 길이는 256자를 초과할 수 없습니다.
커넥터는 Spark 클러스터에 설치되지 않은 경우에도 Jupyter Notebook에서 사용할 수 있습니다. 다음 코드를 사용하여 외부 jar로 추가할 수 있습니다.
파이썬:
from pyspark . sql import SparkSession
spark = SparkSession . builder
. config ( "spark.jars.packages" , "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0" )
. getOrCreate ()
df = spark . read . format ( "bigquery" )
. load ( "dataset.table" )
스칼라:
val spark = SparkSession .builder
.config( " spark.jars.packages " , " com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0 " )
.getOrCreate()
val df = spark.read.format( " bigquery " )
.load( " dataset.table " )
Spark 클러스터가 Scala 2.12(Spark 2.4.x에서는 선택 사항, 3.0.x에서는 필수)를 사용하는 경우 관련 패키지는 com.google.cloud.spark:spark-bigquery-with-dependent_ 2.12 :0.41.0입니다. 어떤 Scala 버전이 사용되는지 확인하려면 다음 코드를 실행하세요.
파이썬:
spark . sparkContext . _jvm . scala . util . Properties . versionString ()
스칼라:
scala . util . Properties . versionString
암시적 Scala API spark.read.bigquery("TABLE_ID")
사용하려는 경우가 아니면 커넥터에 대해 컴파일할 필요가 없습니다.
프로젝트에 커넥터를 포함하려면:
< dependency >
< groupId >com.google.cloud.spark</ groupId >
< artifactId >spark-bigquery-with-dependencies_${scala.version}</ artifactId >
< version >0.41.0</ version >
</ dependency >
libraryDependencies + = " com.google.cloud.spark " %% " spark-bigquery-with-dependencies " % " 0.41.0 "
Spark는 Spark 기록 페이지에서 최종 사용자가 찾을 수 있는 많은 측정항목을 채웁니다. 그러나 이러한 모든 측정항목은 커넥터에서 변경하지 않고 암시적으로 수집되는 스파크와 관련되어 있습니다. 그러나 BigQuery에서 채워지고 현재 드라이버/실행기 로그에서 읽을 수 있는 애플리케이션 로그에 표시되는 측정항목은 거의 없습니다.
Spark 3.2부터 Spark는 Spark UI 페이지 https://spark.apache.org/docs/3.2.0/api/java/org/apache/spark/sql/connector/metric에서 사용자 정의 메트릭을 노출하는 API를 제공했습니다. /CustomMetric.html
현재 커넥터는 이 API를 사용하여 읽기 중에 다음과 같은 BigQuery 측정항목을 노출합니다.
<스타일> 테이블#metricstable td, 표 th {Word-Break : Break-Word} </style>측정항목 이름 | 설명 |
---|---|
bytes read | BigQuery 바이트 수를 읽습니다 |
rows read | BigQuery 행의 수를 읽습니다 |
scan time | 모든 집행자에서 획득하도록 요청 된 읽기 줄 사이에 소비 된 시간은 밀리 초입니다. |
parse time | 행을 구문 분석하는 데 소비 된 시간은 모든 집행자에 걸쳐 밀리 초로 읽습니다. |
spark time | 모든 executors에서 밀리 초로 쿼리 (즉, 스캔 및 구문 분석을 제외하고)를 처리하기 위해 Spark에서 소비 된 시간. |
참고 : Spark UI 페이지에서 메트릭을 사용하려면 spark-bigquery-metrics-0.41.0.jar
히스토리 서버를 시작하기 전에 클래스 경로인지 확인해야하며 커넥터 버전은 spark-3.2
이상입니다.
BigQuery 가격 책정 문서를 참조하십시오.
maxParallelism
속성으로 파티션 수를 수동으로 설정할 수 있습니다. BigQuery는 귀하가 요구하는 것보다 더 적은 파티션을 제공 할 수 있습니다. 파티셔닝 구성을 참조하십시오.
Spark에서 읽은 후에는 항상 반복 할 수 있습니다.
파티션이 너무 많으면 CreateWritestream 또는 처리량 할당량을 초과 할 수 있습니다. 이는 각 파티션 내의 데이터가 일련의 처리되는 동안 독립적 인 파티션이 스파크 클러스터 내의 다른 노드에서 병렬로 처리 될 수 있기 때문에 발생합니다. 일반적으로 최대 지속적인 처리량을 보장하려면 할당량 증가 요청을 제출해야합니다. 그러나이 문제를 완화하기 위해 데이터 프레임에서 coalesce
호출하여 작성되는 파티션 수를 수동으로 줄일 수 있습니다.
desiredPartitionCount = 5
dfNew = df.coalesce(desiredPartitionCount)
dfNew.write
경험의 규칙은 단일 파티션 핸들이 최소 1GB의 데이터를 갖는 것입니다.
또한 writeAtLeastOnce
속성으로 실행되는 작업은 CreateWritestream 쿼터 오류에 맞지 않습니다.
커넥터는 BigQuery API에 연결하려면 GoogleCredentials 인스턴스가 필요합니다. 제공하기위한 여러 가지 옵션이 있습니다.
GOOGLE_APPLICATION_CREDENTIALS
환경 변수에서 JSON 키를로드하는 것입니다. // Globally
spark.conf.set("credentialsFile", "</path/to/key/file>")
// Per read/Write
spark.read.format("bigquery").option("credentialsFile", "</path/to/key/file>")
// Globally
spark.conf.set("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
// Per read/Write
spark.read.format("bigquery").option("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
gcpAccessTokenProvider
옵션에 제공되어야합니다. AccessTokenProvider
Java 또는 Scala 또는 Kotlin과 같은 다른 JVM 언어로 구현해야합니다. ARG 생성자가 없거나 단일 java.util.String
인수를 수락하는 생성자가 있어야합니다. 이 구성 매개 변수는 gcpAccessTokenProviderConfig
옵션을 사용하여 제공 할 수 있습니다. 이것이 제공되지 않으면, 비 아그 생성자가 호출됩니다. 구현이 포함 된 항아리는 클러스터의 클래스 경로에 있어야합니다. // Globally
spark.conf.set("gcpAccessTokenProvider", "com.example.ExampleAccessTokenProvider")
// Per read/Write
spark.read.format("bigquery").option("gcpAccessTokenProvider", "com.example.ExampleAccessTokenProvider")
서비스 계정 가장은 특정 사용자 이름 및 그룹 이름 또는 아래 속성을 사용하여 기본적으로 모든 사용자에 대해 구성 할 수 있습니다.
gcpImpersonationServiceAccountForUser_<USER_NAME>
(기본적으로 설정되지 않음)
특정 사용자에 대한 서비스 계정 가장.
gcpImpersonationServiceAccountForGroup_<GROUP_NAME>
(기본적으로 설정되지 않음)
특정 그룹의 서비스 계정 가장.
gcpImpersonationServiceAccount
(기본적으로 설정되지 않음)
모든 사용자를위한 기본 서비스 계정 가장.
위의 속성 중 하나가 설정되면 BigQuery에 액세스 할 때 단기간 자격 증명을 생성하여 지정된 서비스 계정이 가장합니다.
둘 이상의 속성이 설정되면 사용자 이름과 관련된 서비스 계정은 일치하는 사용자 및 그룹의 그룹 이름과 관련된 서비스 계정보다 우선하며, 이는 기본 서비스 계정에 입국보다 우선합니다.
액세스 토큰 새로 고침이 필요하지 않은 간단한 응용 프로그램의 경우 또 다른 대안은 액세스 토큰을 gcpAccessToken
구성 옵션으로 전달하는 것입니다. gcloud auth application-default print-access-token
실행하여 액세스 토큰을 얻을 수 있습니다.
// Globally
spark.conf.set("gcpAccessToken", "<access-token>")
// Per read/Write
spark.read.format("bigquery").option("gcpAccessToken", "<acccess-token>")
중요 : CredentialsProvider
및 AccessTokenProvider
Java 또는 Scala 또는 Kotlin과 같은 기타 JVM 언어로 구현해야합니다. 구현이 포함 된 항아리는 클러스터의 클래스 경로에 있어야합니다.
통지 : 위의 옵션 중 하나만 제공해야합니다.
전방 프록시에 연결하고 사용자 자격 증명을 인증하려면 다음 옵션을 구성하십시오.
proxyAddress
: 프록시 서버의 주소. 프록시는 HTTP 프록시 여야하며 주소는 host:port
형식이어야합니다.
proxyUsername
: 프록시에 연결하는 데 사용되는 사용자 이름입니다.
proxyPassword
: 프록시에 연결하는 데 사용되는 비밀번호.
val df = spark.read.format("bigquery")
.option("proxyAddress", "http://my-proxy:1234")
.option("proxyUsername", "my-username")
.option("proxyPassword", "my-password")
.load("some-table")
Spark의 runtimeConfig를 사용하여 동일한 프록시 매개 변수를 전 세계적으로 설정할 수도 있습니다.
spark.conf.set("proxyAddress", "http://my-proxy:1234")
spark.conf.set("proxyUsername", "my-username")
spark.conf.set("proxyPassword", "my-password")
val df = spark.read.format("bigquery")
.load("some-table")
Hadoop 구성에서도 다음을 설정할 수도 있습니다.
fs.gs.proxy.address
( "proxyaddress"와 유사), fs.gs.proxy.username
( "proxyusername과 유사) 및 fs.gs.proxy.password
("proxypassword "와 유사).
동일한 매개 변수가 여러 위치에서 설정되면 우선 순위 순서는 다음과 같습니다.
옵션 ( "키", "값")> spark.conf> hadoop 구성