ตัวเชื่อมต่อรองรับการอ่านตาราง Google BigQuery ลงใน DataFrames ของ Spark และการเขียน DataFrames กลับเข้าไปใน BigQuery ซึ่งทำได้โดยใช้ Spark SQL Data Source API เพื่อสื่อสารกับ BigQuery
Storage API สตรีมข้อมูลแบบขนานโดยตรงจาก BigQuery ผ่าน gRPC โดยไม่ต้องใช้ Google Cloud Storage เป็นตัวกลาง
มีข้อดีหลายประการเหนือการใช้โฟลว์การอ่านตามการส่งออกก่อนหน้านี้ ซึ่งโดยทั่วไปแล้วควรนำไปสู่ประสิทธิภาพการอ่านที่ดีขึ้น:
มันไม่ทิ้งไฟล์ชั่วคราวใด ๆ ไว้ใน Google Cloud Storage แถวจะอ่านได้โดยตรงจากเซิร์ฟเวอร์ BigQuery โดยใช้รูปแบบ Wire Arrow หรือ Avro
API ใหม่อนุญาตให้การกรองคอลัมน์และภาคแสดงอ่านเฉพาะข้อมูลที่คุณสนใจเท่านั้น
เนื่องจาก BigQuery ได้รับการสนับสนุนโดยพื้นที่เก็บข้อมูลแบบเรียงเป็นแนว จึงสตรีมข้อมูลได้อย่างมีประสิทธิภาพโดยไม่ต้องอ่านคอลัมน์ทั้งหมด
Storage API รองรับการกดลงของตัวกรองภาคแสดงโดยพลการ เครื่องมือเชื่อมต่อเวอร์ชัน 0.8.0 เบต้าขึ้นไปรองรับการกดลงของตัวกรองที่กำหนดเองไปยัง Bigquery
มีปัญหาที่ทราบแล้วใน Spark ซึ่งไม่อนุญาตให้กดตัวกรองลงในฟิลด์ที่ซ้อนกัน ตัวอย่างเช่น - ตัวกรองเช่น address.city = "Sunnyvale"
จะไม่ได้รับการเลื่อนลงไปยัง Bigquery
API จะปรับสมดุลบันทึกระหว่างผู้อ่านจนกว่าจะเสร็จสิ้นทั้งหมด ซึ่งหมายความว่าทุกเฟสของแผนที่จะเสร็จสิ้นเกือบจะพร้อมๆ กัน ดูบทความบล็อกนี้เกี่ยวกับวิธีการใช้การแบ่งส่วนข้อมูลแบบไดนามิกใน Google Cloud Dataflow
ดูการกำหนดค่าการแบ่งพาร์ติชันสำหรับรายละเอียดเพิ่มเติม
ทำตามคำแนะนำเหล่านี้
หากไม่มีสภาพแวดล้อม Apache Spark คุณสามารถสร้างคลัสเตอร์ Cloud Dataproc ด้วยการตรวจสอบสิทธิ์ที่กำหนดค่าไว้ล่วงหน้าได้ ตัวอย่างต่อไปนี้ถือว่าคุณกำลังใช้ Cloud Dataproc แต่คุณสามารถใช้ spark-submit
บนคลัสเตอร์ใดก็ได้
คลัสเตอร์ Dataproc ใดๆ ที่ใช้ API จำเป็นต้องมีขอบเขต 'bigquery' หรือ 'cloud-platform' คลัสเตอร์ Dataproc มีขอบเขต 'bigquery' เป็นค่าเริ่มต้น ดังนั้นคลัสเตอร์ส่วนใหญ่ในโปรเจ็กต์ที่เปิดใช้งานควรทำงานตามค่าเริ่มต้น เช่น
MY_CLUSTER=...
gcloud dataproc clusters create "$MY_CLUSTER"
เวอร์ชันล่าสุดของตัวเชื่อมต่อมีเผยแพร่ต่อสาธารณะในลิงก์ต่อไปนี้:
รุ่น | ลิงค์ |
---|---|
สปาร์ค 3.5 | gs://spark-lib/bigquery/spark-3.5-bigquery-0.41.0.jar (ลิงก์ HTTP) |
สปาร์ค 3.4 | gs://spark-lib/bigquery/spark-3.4-bigquery-0.41.0.jar (ลิงก์ HTTP) |
สปาร์ค 3.3 | gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar (ลิงก์ HTTP) |
สปาร์ค 3.2 | gs://spark-lib/bigquery/spark-3.2-bigquery-0.41.0.jar (ลิงก์ HTTP) |
สปาร์ค 3.1 | gs://spark-lib/bigquery/spark-3.1-bigquery-0.41.0.jar (ลิงก์ HTTP) |
สปาร์ค 2.4 | gs://spark-lib/bigquery/spark-2.4-bigquery-0.37.0.jar (ลิงก์ HTTP) |
สกาล่า 2.13 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-0.41.0.jar (ลิงก์ HTTP) |
สกาล่า 2.12 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.41.0.jar (ลิงก์ HTTP) |
สกาล่า 2.11 | gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.29.0.jar (ลิงก์ HTTP) |
หกเวอร์ชันแรกเป็นตัวเชื่อมต่อที่ใช้ Java ซึ่งกำหนดเป้าหมาย Spark 2.4/3.1/3.2/3.3/3.4/3.5 ของเวอร์ชัน Scala ทั้งหมดที่สร้างขึ้นบน Data Source API ใหม่ (Data Source API v2) ของ Spark
ตัวเชื่อมต่อสองตัวสุดท้ายคือตัวเชื่อมต่อแบบ Scala โปรดใช้ jar ที่เกี่ยวข้องกับการติดตั้ง Spark ของคุณตามที่ระบุไว้ด้านล่าง
ขั้วต่อ สปาร์ค | 2.3 | 2.4 | 3.0 | 3.1 | 3.2 | 3.3 | 3.4 | 3.5 |
---|---|---|---|---|---|---|---|---|
spark-3.5-bigquery | ||||||||
spark-3.4-bigquery | ||||||||
spark-3.3-bigquery | ||||||||
spark-3.2-bigquery | ||||||||
จุดประกาย-3.1-bigquery | ||||||||
spark-2.4-bigquery | ||||||||
spark-bigquery-ที่มีการพึ่งพา_2.13 | ||||||||
spark-bigquery-ที่มีการพึ่งพา_2.12 | ||||||||
spark-bigquery-ที่มีการพึ่งพา_2.11 |
ตัวเชื่อมต่อ รูปภาพ Dataproc | 1.3 | 1.4 | 1.5 | 2.0 | 2.1 | 2.2 | ไร้เซิร์ฟเวอร์ รูปภาพ 1.0 | ไร้เซิร์ฟเวอร์ รูปภาพ 2.0 | ไร้เซิร์ฟเวอร์ ภาพที่ 2.1 | ไร้เซิร์ฟเวอร์ ภาพที่ 2.2 |
---|---|---|---|---|---|---|---|---|---|---|
spark-3.5-bigquery | ||||||||||
spark-3.4-bigquery | ||||||||||
spark-3.3-bigquery | ||||||||||
spark-3.2-bigquery | ||||||||||
จุดประกาย-3.1-bigquery | ||||||||||
spark-2.4-bigquery | ||||||||||
spark-bigquery-ที่มีการพึ่งพา_2.13 | ||||||||||
spark-bigquery-ที่มีการพึ่งพา_2.12 | ||||||||||
spark-bigquery-ที่มีการพึ่งพา_2.11 |
ตัวเชื่อมต่อยังมีให้จากพื้นที่เก็บข้อมูล Maven Central อีกด้วย สามารถใช้โดยใช้ตัวเลือก --packages
หรือคุณสมบัติการกำหนดค่า spark.jars.packages
ใช้ค่าต่อไปนี้
รุ่น | สิ่งประดิษฐ์ตัวเชื่อมต่อ |
---|---|
สปาร์ค 3.5 | com.google.cloud.spark:spark-3.5-bigquery:0.41.0 |
สปาร์ค 3.4 | com.google.cloud.spark:spark-3.4-bigquery:0.41.0 |
สปาร์ค 3.3 | com.google.cloud.spark:spark-3.3-bigquery:0.41.0 |
สปาร์ค 3.2 | com.google.cloud.spark:spark-3.2-bigquery:0.41.0 |
สปาร์ค 3.1 | com.google.cloud.spark:spark-3.1-bigquery:0.41.0 |
สปาร์ค 2.4 | com.google.cloud.spark:spark-2.4-bigquery:0.37.0 |
สกาล่า 2.13 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.13:0.41.0 |
สกาล่า 2.12 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0 |
สกาล่า 2.11 | com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.29.0 |
คลัสเตอร์ Dataproc ที่สร้างโดยใช้อิมเมจ 2.1 ขึ้นไป หรือแบทช์ที่ใช้บริการแบบไร้เซิร์ฟเวอร์ Dataproc จะมาพร้อมกับตัวเชื่อมต่อ Spark BigQuery ในตัว การใช้มาตรฐาน --jars
หรือ --packages
(หรืออีกทางหนึ่งคือการกำหนดค่า spark.jars
/ spark.jars.packages
) จะไม่ช่วยในกรณีนี้เนื่องจากตัวเชื่อมต่อในตัวจะมีความสำคัญกว่า
หากต้องการใช้เวอร์ชันอื่นที่ไม่ใช่เวอร์ชันในตัว โปรดดำเนินการอย่างใดอย่างหนึ่งต่อไปนี้:
--metadata SPARK_BQ_CONNECTOR_VERSION=0.41.0
หรือ --metadata SPARK_BQ_CONNECTOR_URL=gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar
เพื่อสร้างคลัสเตอร์ด้วย jar อื่น URL สามารถชี้ไปที่ JAR ตัวเชื่อมต่อที่ถูกต้องสำหรับเวอร์ชัน Spark ของคลัสเตอร์--properties dataproc.sparkBqConnector.version=0.41.0
หรือ --properties dataproc.sparkBqConnector.uri=gs://spark-lib/bigquery/spark-3.3-bigquery-0.41.0.jar
เพื่อสร้างแบทช์ด้วย jar อื่น URL สามารถชี้ไปที่ JAR ตัวเชื่อมต่อที่ถูกต้องสำหรับเวอร์ชัน Spark ของรันไทม์ คุณสามารถรันการนับจำนวนคำ PySpark อย่างง่ายกับ API โดยไม่ต้องคอมไพล์โดยการรัน
อิมเมจ Dataproc 1.5 ขึ้นไป
gcloud dataproc jobs submit pyspark --cluster "$MY_CLUSTER"
--jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.41.0.jar
examples/python/shakespeare.py
อิมเมจ Dataproc 1.4 และต่ำกว่า
gcloud dataproc jobs submit pyspark --cluster "$MY_CLUSTER"
--jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.29.0.jar
examples/python/shakespeare.py
https://codelabs.developers.google.com/codelabs/pyspark-bigquery
ตัวเชื่อมต่อใช้ Spark SQL Data Source API ข้ามภาษา:
df = spark.read
.format("bigquery")
.load("bigquery-public-data.samples.shakespeare")
หรือ Scala API โดยนัยเท่านั้น:
import com.google.cloud.spark.bigquery._
val df = spark.read.bigquery("bigquery-public-data.samples.shakespeare")
สำหรับข้อมูลเพิ่มเติม โปรดดูตัวอย่างโค้ดเพิ่มเติมใน Python, Scala และ Java
เครื่องมือเชื่อมต่อช่วยให้คุณเรียกใช้การสืบค้น Standard SQL SELECT บน BigQuery และดึงผลลัพธ์ไปยัง Spark Dataframe ได้โดยตรง ทำได้อย่างง่ายดายตามที่อธิบายไว้ในตัวอย่างโค้ดต่อไปนี้:
spark.conf.set("viewsEnabled","true")
spark.conf.set("materializationDataset","<dataset>")
sql = """
SELECT tag, COUNT(*) c
FROM (
SELECT SPLIT(tags, '|') tags
FROM `bigquery-public-data.stackoverflow.posts_questions` a
WHERE EXTRACT(YEAR FROM creation_date)>=2014
), UNNEST(tags) tag
GROUP BY 1
ORDER BY 2 DESC
LIMIT 10
"""
df = spark.read.format("bigquery").load(sql)
df.show()
ซึ่งให้ผลลัพธ์
+----------+-------+
| tag| c|
+----------+-------+
|javascript|1643617|
| python|1352904|
| java|1218220|
| android| 913638|
| php| 911806|
| c#| 905331|
| html| 769499|
| jquery| 608071|
| css| 510343|
| c++| 458938|
+----------+-------+
ตัวเลือกที่สองคือการใช้ตัวเลือก query
ดังนี้:
df = spark.read.format("bigquery").option("query", sql).load()
โปรดสังเกตว่าการดำเนินการควรเร็วขึ้นเนื่องจากเฉพาะผลลัพธ์เท่านั้นที่จะถูกส่งผ่านสาย ในลักษณะเดียวกัน การสืบค้นสามารถรวม JOIN ได้อย่างมีประสิทธิภาพมากขึ้น จากนั้นเรียกใช้การรวมบน Spark หรือใช้คุณสมบัติ BigQuery อื่นๆ เช่น การสืบค้นย่อย ฟังก์ชันที่ผู้ใช้กำหนดของ BigQuery ตารางไวด์การ์ด BigQuery ML และอื่นๆ
เพื่อที่จะใช้คุณสมบัตินี้ จะต้องตั้งค่าการกำหนดค่าต่อไปนี้:
viewsEnabled
เป็น true
materializationDataset
ต้องตั้งค่าเป็นชุดข้อมูลที่ผู้ใช้ GCP มีสิทธิ์ในการสร้างตาราง materializationProject
เป็นทางเลือก หมายเหตุ: ตามที่กล่าวไว้ในเอกสาร BigQuery ตารางที่สืบค้นจะต้องอยู่ในตำแหน่งเดียวกันกับชุด materializationDataset
นอกจากนี้ หากตารางใน SQL statement
มาจากโปรเจ็กต์อื่นที่ไม่ใช่ parentProject
ให้ใช้ชื่อตารางแบบเต็ม เช่น [project].[dataset].[table]
สำคัญ: คุณลักษณะนี้ใช้งานโดยการเรียกใช้แบบสอบถามบน BigQuery และบันทึกผลลัพธ์ลงในตารางชั่วคราว ซึ่ง Spark จะอ่านผลลัพธ์ ซึ่งอาจเพิ่มค่าใช้จ่ายเพิ่มเติมในบัญชี BigQuery ของคุณ
เครื่องมือเชื่อมต่อมีการสนับสนุนเบื้องต้นสำหรับการอ่านจากมุมมอง BigQuery โปรดทราบว่ามีข้อแม้บางประการ:
collect()
หรือ count()
ใดๆ ก็ตามmaterializationProject
และ materializationDataset
ที่เป็นตัวเลือก ตามลำดับ ตัวเลือกเหล่านี้สามารถตั้งค่าได้ทั่วโลกโดยการเรียก spark.conf.set(...)
ก่อนที่จะอ่านมุมมอง.option("viewsEnabled", "true")
) หรือตั้งค่าแบบโกลบอลโดยการเรียก spark.conf.set("viewsEnabled", "true")
materializationDataset
ควรอยู่ในตำแหน่งเดียวกับมุมมองการเขียน DataFrames ไปยัง BigQuery สามารถทำได้สองวิธี: ทางตรงและทางอ้อม
ในวิธีนี้ ข้อมูลจะถูกเขียนโดยตรงไปยัง BigQuery โดยใช้ BigQuery Storage Write API เพื่อเปิดใช้งานตัวเลือกนี้ โปรดตั้งค่าตัวเลือก writeMethod
เป็น direct
ดังที่แสดงด้านล่าง:
df.write
.format("bigquery")
.option("writeMethod", "direct")
.save("dataset.table")
การเขียนลงในตารางที่แบ่งพาร์ติชันที่มีอยู่ (แบ่งพาร์ติชันวันที่ เวลานำเข้าแบ่งพาร์ติชัน และแบ่งพาร์ติชันช่วง) ในโหมดบันทึก APPEND และโหมด OVERWRITE (เฉพาะวันที่และแบ่งพาร์ติชันช่วง) ได้รับการสนับสนุนโดยสมบูรณ์โดยตัวเชื่อมต่อและ BigQuery Storage Write API การใช้ datePartition
, partitionField
, partitionType
, partitionRangeStart
, partitionRangeEnd
, partitionRangeInterval
ที่อธิบายไว้ด้านล่าง ไม่ได้รับการสนับสนุนในขณะนี้โดยวิธีการเขียนโดยตรง
สำคัญ: โปรดดูหน้าราคาการนำเข้าข้อมูลที่เกี่ยวข้องกับราคา BigQuery Storage Write API
สำคัญ: โปรดใช้เวอร์ชัน 0.24.2 ขึ้นไปสำหรับการเขียนโดยตรง เนื่องจากเวอร์ชันก่อนหน้ามีข้อบกพร่องที่อาจทำให้ตารางถูกลบในบางกรณี
ในวิธีนี้ ข้อมูลจะถูกเขียนไปยัง GCS ก่อน จากนั้นจึงโหลดลงใน BigQuery ต้องกำหนดค่าที่เก็บข้อมูล GCS เพื่อระบุตำแหน่งข้อมูลชั่วคราว
df.write
.format("bigquery")
.option("temporaryGcsBucket","some-bucket")
.save("dataset.table")
ข้อมูลจะถูกจัดเก็บชั่วคราวโดยใช้รูปแบบ Apache Parquet, Apache ORC หรือ Apache Avro
ที่เก็บข้อมูล GCS และรูปแบบสามารถตั้งค่าได้ทั่วโลกโดยใช้ RuntimeConfig ของ Spark ดังนี้
spark.conf.set("temporaryGcsBucket","some-bucket")
df.write
.format("bigquery")
.save("dataset.table")
เมื่อสตรีม DataFrame ไปยัง BigQuery แต่ละชุดจะถูกเขียนในลักษณะเดียวกับ DataFrame ที่ไม่ใช่การสตรีม โปรดทราบว่าต้องระบุตำแหน่งจุดตรวจสอบที่เข้ากันได้กับ HDFS (เช่น: path/to/HDFS/dir
หรือ gs://checkpoint-bucket/checkpointDir
)
df.writeStream
.format("bigquery")
.option("temporaryGcsBucket","some-bucket")
.option("checkpointLocation", "some-location")
.option("table", "dataset.table")
สิ่งสำคัญ: ตัวเชื่อมต่อไม่ได้กำหนดค่าตัวเชื่อมต่อ GCS เพื่อหลีกเลี่ยงความขัดแย้งกับตัวเชื่อมต่อ GCS อื่น (หากมี) หากต้องการใช้ความสามารถในการเขียนของตัวเชื่อมต่อ โปรดกำหนดค่าตัวเชื่อมต่อ GCS บนคลัสเตอร์ของคุณตามที่อธิบายไว้ที่นี่
API รองรับตัวเลือกมากมายในการกำหนดค่าการอ่าน
<style> table#propertytable td, ตารางที่ { word-break:break-word } </style>คุณสมบัติ | ความหมาย | การใช้งาน |
---|---|---|
table | ตาราง BigQuery ในรูปแบบ [[project:]dataset.]table ขอแนะนำให้ใช้พารามิเตอร์ path ของ load() / save() แทน ตัวเลือกนี้เลิกใช้แล้วและจะถูกลบออกในเวอร์ชันต่อๆ ไป(เลิกใช้แล้ว) | อ่าน/เขียน |
dataset | ชุดข้อมูลที่มีตาราง ตัวเลือกนี้ควรใช้กับตารางและมุมมองมาตรฐาน แต่ไม่ใช่เมื่อโหลดผลลัพธ์คิวรี (ไม่บังคับเว้นแต่จะละเว้นใน table ) | อ่าน/เขียน |
project | รหัสโปรเจ็กต์ Google Cloud ของตาราง ตัวเลือกนี้ควรใช้กับตารางและมุมมองมาตรฐาน แต่ไม่ใช่เมื่อโหลดผลลัพธ์คิวรี (ไม่บังคับ ค่าเริ่มต้นของโครงการของบัญชีบริการที่ใช้) | อ่าน/เขียน |
parentProject | รหัสโปรเจ็กต์ Google Cloud ของตารางที่จะเรียกเก็บเงินสำหรับการส่งออก (ไม่บังคับ ค่าเริ่มต้นของโครงการของบัญชีบริการที่ใช้) | อ่าน/เขียน |
maxParallelism | จำนวนพาร์ติชันสูงสุดที่จะแบ่งข้อมูลออกเป็น จำนวนจริงอาจน้อยกว่านี้หาก BigQuery เห็นว่าข้อมูลมีขนาดเล็กเพียงพอ หากมีตัวดำเนินการไม่เพียงพอที่จะกำหนดเวลาเครื่องอ่านต่อพาร์ติชัน บางพาร์ติชันอาจว่างเปล่า สำคัญ: ยังคงรองรับพารามิเตอร์เก่า ( parallelism ) แต่อยู่ในโหมดเลิกใช้แล้ว จะถูกลบออกในเวอร์ชัน 1.0 ของตัวเชื่อมต่อ(ไม่บังคับ ค่าเริ่มต้นคือค่า MinParallelism ที่ใหญ่กว่าที่ต้องการและ 20,000)) | อ่าน |
preferredMinParallelism | จำนวนพาร์ติชันขั้นต่ำที่ต้องการเพื่อแบ่งข้อมูลออกเป็น จำนวนจริงอาจน้อยกว่านี้หาก BigQuery เห็นว่าข้อมูลมีขนาดเล็กเพียงพอ หากมีตัวดำเนินการไม่เพียงพอที่จะกำหนดเวลาเครื่องอ่านต่อพาร์ติชัน บางพาร์ติชันอาจว่างเปล่า (ไม่บังคับ ค่าเริ่มต้นจะเล็กที่สุด 3 เท่าของความขนานเริ่มต้นและ maxParallelism ของแอปพลิเคชัน) | อ่าน |
viewsEnabled | ช่วยให้ตัวเชื่อมต่อสามารถอ่านจากมุมมองและไม่ใช่เฉพาะตารางเท่านั้น โปรดอ่านส่วนที่เกี่ยวข้องก่อนที่จะเปิดใช้งานตัวเลือกนี้ (ไม่บังคับ ค่าเริ่มต้นเป็น false ) | อ่าน |
materializationProject | รหัสโปรเจ็กต์ที่จะสร้างมุมมองที่เป็นรูปธรรม (ไม่บังคับ ค่าเริ่มต้นเพื่อดูรหัสโครงการ) | อ่าน |
materializationDataset | ชุดข้อมูลที่กำลังจะสร้างมุมมองที่เป็นรูปธรรม ชุดข้อมูลนี้ควรอยู่ในตำแหน่งเดียวกับมุมมองหรือตารางที่สืบค้น (ไม่บังคับ ค่าเริ่มต้นเพื่อดูชุดข้อมูล) | อ่าน |
materializationExpirationTimeInMinutes | เวลาหมดอายุของตารางชั่วคราวที่เก็บข้อมูลที่เป็นรูปธรรมของมุมมองหรือแบบสอบถาม หน่วยเป็นนาที โปรดสังเกตว่าตัวเชื่อมต่ออาจนำตารางชั่วคราวกลับมาใช้ใหม่ได้เนื่องจากการใช้แคชในเครื่อง และเพื่อลดการคำนวณ BigQuery ดังนั้นค่าที่ต่ำมากอาจทำให้เกิดข้อผิดพลาดได้ ค่าต้องเป็นจำนวนเต็มบวก (ไม่บังคับ ค่าเริ่มต้นคือ 1440 หรือ 24 ชั่วโมง) | อ่าน |
readDataFormat | รูปแบบข้อมูลสำหรับการอ่านจาก BigQuery ตัวเลือก : ARROW , AVRO (ไม่บังคับ ค่าเริ่มต้นคือ ARROW ) | อ่าน |
optimizedEmptyProjection | ตัวเชื่อมต่อใช้ตรรกะการฉายภาพว่างที่ได้รับการปรับปรุง (เลือกโดยไม่มีคอลัมน์ใดๆ) ซึ่งใช้สำหรับการดำเนินการ count() ตรรกะนี้รับข้อมูลโดยตรงจากเมตาดาต้าของตาราง หรือดำเนินการ `SELECT COUNT(*) WHERE...` ที่มีประสิทธิภาพมากในกรณีที่มีตัวกรอง คุณสามารถยกเลิกการใช้ตรรกะนี้ได้โดยตั้งค่าตัวเลือกนี้เป็น false (ไม่บังคับ ค่าเริ่มต้นเป็น true ) | อ่าน |
pushAllFilters | หากตั้งค่าเป็น true ตัวเชื่อมต่อจะพุชตัวกรองทั้งหมดที่ Spark สามารถมอบหมายให้กับ BigQuery Storage API ซึ่งจะช่วยลดปริมาณข้อมูลที่จำเป็นต้องส่งจากเซิร์ฟเวอร์ BigQuery Storage API ไปยังไคลเอนต์ Spark ตัวเลือกนี้เลิกใช้แล้วและจะถูกลบออกในเวอร์ชันต่อๆ ไป(ไม่บังคับ ค่าเริ่มต้นเป็น true )(เลิกใช้แล้ว) | อ่าน |
bigQueryJobLabel | ใช้เพื่อเพิ่มป้ายกำกับให้กับคำค้นหาที่เริ่มต้นโดยเครื่องมือเชื่อมต่อและโหลดงาน BigQuery สามารถตั้งค่าป้ายกำกับได้หลายรายการ (ไม่จำเป็น) | อ่าน |
bigQueryTableLabel | สามารถใช้เพื่อเพิ่มป้ายกำกับลงในตารางขณะเขียนลงในตาราง สามารถตั้งค่าป้ายกำกับได้หลายรายการ (ไม่จำเป็น) | เขียน |
traceApplicationName | ชื่อแอปพลิเคชันที่ใช้ในการติดตามเซสชันการอ่านและเขียน BigQuery Storage จำเป็นต้องตั้งชื่อแอปพลิเคชันเพื่อตั้งค่า ID ติดตามในเซสชัน (ไม่จำเป็น) | อ่าน |
traceJobId | รหัสงานที่ใช้ในการติดตามเซสชันการอ่านและเขียน BigQuery Storage (ไม่บังคับ มีค่าเริ่มต้นของรหัสงาน Dataproc มิฉะนั้นจะใช้รหัสแอปพลิเคชัน Spark) | อ่าน |
createDisposition | ระบุว่างานได้รับอนุญาตให้สร้างตารางใหม่หรือไม่ ค่าที่อนุญาตคือ:
(ไม่บังคับ ค่าเริ่มต้นเป็น CREATE_IF_NEEDED) | เขียน |
writeMethod | ควบคุมวิธีการเขียนข้อมูลไปยัง BigQuery ค่าที่ใช้ได้คือการใช้ BigQuery Storage Write API direct และ indirect ซึ่งจะเขียนข้อมูลไปที่ GCS ก่อน จากนั้นจะทริกเกอร์การดำเนินการโหลด BigQuery ดูเพิ่มเติมที่นี่(ไม่บังคับ ค่าเริ่มต้นเป็น indirect ) | เขียน |
writeAtLeastOnce | รับประกันว่าข้อมูลจะถูกเขียนไปยัง BigQuery อย่างน้อยหนึ่งครั้ง นี่เป็นการรับประกันน้อยกว่าหนึ่งครั้ง เหมาะสำหรับสถานการณ์การสตรีมที่มีการเขียนข้อมูลอย่างต่อเนื่องเป็นชุดเล็กๆ (ไม่บังคับ ค่าเริ่มต้นเป็น false )รองรับเฉพาะวิธีการเขียน "DIRECT" และโหมดเท่านั้น ไม่ใช่ "เขียนทับ" | เขียน |
temporaryGcsBucket | ที่เก็บข้อมูล GCS ที่เก็บข้อมูลไว้ชั่วคราวก่อนที่จะโหลดไปยัง BigQuery จำเป็นเว้นแต่จะตั้งค่าในการกำหนดค่า Spark ( spark.conf.set(...) )ไม่รองรับวิธีการเขียน 'DIRECT` | เขียน |
persistentGcsBucket | ที่เก็บข้อมูล GCS ที่เก็บข้อมูลก่อนที่จะโหลดไปยัง BigQuery หากได้รับแจ้ง ข้อมูลจะไม่ถูกลบหลังจากเขียนข้อมูลลงใน BigQuery ไม่รองรับวิธีการเขียน 'DIRECT` | เขียน |
persistentGcsPath | เส้นทาง GCS ที่เก็บข้อมูลก่อนที่จะโหลดไปยัง BigQuery ใช้เฉพาะกับ persistentGcsBucket เท่านั้นไม่รองรับวิธีการเขียน 'DIRECT` | เขียน |
intermediateFormat | รูปแบบของข้อมูลก่อนที่จะโหลดไปยัง BigQuery ค่าอาจเป็น "parquet", "orc" หรือ "avro" ก็ได้ หากต้องการใช้รูปแบบ Avro จะต้องเพิ่มแพ็คเกจ spark-avro ในรันไทม์ (ไม่บังคับ ค่าเริ่มต้นคือ parquet ) ในการเขียนเท่านั้น รองรับเฉพาะวิธีการเขียน `INDIRECT` เท่านั้น | เขียน |
useAvroLogicalTypes | เมื่อโหลดจาก Avro (`.option("intermediateFormat", "avro")`) BigQuery จะใช้ประเภท Avro พื้นฐานแทนประเภทตรรกะ [ตามค่าเริ่มต้น](https://cloud.google.com/bigquery/docs/ กำลังโหลดข้อมูล-cloud-storage-avro#logic_types) การระบุตัวเลือกนี้จะแปลงประเภทลอจิคัล Avro เป็นประเภทข้อมูล BigQuery ที่เกี่ยวข้อง (ไม่บังคับ ค่าเริ่มต้นเป็น false ) ในการเขียนเท่านั้น | เขียน |
datePartition | พาร์ติชันวันที่ที่จะเขียนข้อมูลลงไป ควรเป็นสตริงวันที่ที่กำหนดในรูปแบบ YYYYMMDD สามารถใช้เพื่อเขียนทับข้อมูลของพาร์ติชันเดียวได้ เช่นนี้
(ไม่จำเป็น). ในการเขียนเท่านั้น สามารถใช้ได้กับพาร์ติชั่นประเภทต่างๆ เช่น: ชั่วโมง: YYYYMMDDHH เดือน: YYYYMM ปี: YYYY ไม่รองรับวิธีการเขียน 'DIRECT` | เขียน |
partitionField | หากระบุฟิลด์นี้ ตารางจะถูกแบ่งพาร์ติชันตามฟิลด์นี้ สำหรับการแบ่งพาร์ติชันเวลา ให้ระบุพร้อมกับตัวเลือก `partitionType` สำหรับการแบ่งพาร์ติชันช่วงจำนวนเต็ม ให้ระบุพร้อมกับ 3 ตัวเลือก: `partitionRangeStart`, `partitionRangeEnd, `partitionRangeInterval` ฟิลด์ดังกล่าวต้องเป็นฟิลด์ TIMESTAMP หรือ DATE ระดับบนสุดสำหรับการแบ่งพาร์ติชันเวลา หรือ INT64 สำหรับการแบ่งพาร์ติชันช่วงจำนวนเต็ม โหมดของมันจะต้องเป็น NULLABLE หรือ REQUIRED หากไม่ได้ตั้งค่าตัวเลือกสำหรับตารางที่แบ่งพาร์ติชันเวลา ตารางจะถูกแบ่งพาร์ติชันตามคอลัมน์หลอก ซึ่งอ้างอิงผ่าน '_PARTITIONTIME' as TIMESTAMP หรือ '_PARTITIONDATE' as DATE (ไม่จำเป็น). ไม่รองรับวิธีการเขียน 'DIRECT` | เขียน |
partitionExpirationMs | จำนวนมิลลิวินาทีที่จะเก็บหน่วยเก็บข้อมูลสำหรับพาร์ติชันในตาราง ที่เก็บข้อมูลในพาร์ติชันจะมีเวลาหมดอายุของเวลาพาร์ติชันบวกด้วยค่านี้ (ไม่จำเป็น). ไม่รองรับวิธีการเขียน 'DIRECT` | เขียน |
partitionType | ใช้เพื่อระบุการแบ่งเวลา ประเภทที่รองรับคือ: HOUR, DAY, MONTH, YEAR ตัวเลือกนี้ จำเป็น สำหรับตารางเป้าหมายที่จะแบ่งพาร์ติชันเวลา (ไม่บังคับ ค่าเริ่มต้นเป็น DAY หากระบุ PartitionField) ไม่รองรับวิธีการเขียน 'DIRECT` | เขียน |
partitionRangeStart , partitionRangeEnd , partitionRangeInterval | ใช้เพื่อระบุการแบ่งพาร์ติชันช่วงจำนวนเต็ม ตัวเลือกเหล่านี้ จำเป็น สำหรับตารางเป้าหมายที่จะแบ่งพาร์ติชันช่วงจำนวนเต็ม ต้องระบุทั้ง 3 ตัวเลือก ไม่รองรับวิธีการเขียน 'DIRECT` | เขียน |
clusteredFields | สตริงของคอลัมน์ระดับบนสุดที่ไม่ซ้ำกัน คั่นด้วยเครื่องหมายจุลภาค (ไม่จำเป็น). | เขียน |
allowFieldAddition | เพิ่ม ALLOW_FIELD_ADDITION SchemaUpdateOption ให้กับ BigQuery LoadJob ค่าที่อนุญาตคือ true และ false (ไม่บังคับ ค่าเริ่มต้นเป็น false )รองรับเฉพาะวิธีการเขียน `INDIRECT` เท่านั้น | เขียน |
allowFieldRelaxation | เพิ่ม ALLOW_FIELD_RELAXATION SchemaUpdateOption ให้กับ BigQuery LoadJob ค่าที่อนุญาตคือ true และ false (ไม่บังคับ ค่าเริ่มต้นเป็น false )รองรับเฉพาะวิธีการเขียน `INDIRECT` เท่านั้น | เขียน |
proxyAddress | ที่อยู่ของพร็อกซีเซิร์ฟเวอร์ พร็อกซีต้องเป็นพร็อกซี HTTP และที่อยู่ควรอยู่ในรูปแบบ `host:port` สามารถตั้งค่าอีกทางหนึ่งได้ในการกำหนดค่า Spark ( spark.conf.set(...) ) หรือในการกำหนดค่า Hadoop ( fs.gs.proxy.address )(ไม่บังคับ จำเป็นเฉพาะเมื่อเชื่อมต่อกับ GCP ผ่านพร็อกซี) | อ่าน/เขียน |
proxyUsername | ชื่อผู้ใช้ที่ใช้เชื่อมต่อกับพร็อกซี สามารถตั้งค่าอีกทางหนึ่งได้ในการกำหนดค่า Spark ( spark.conf.set(...) ) หรือในการกำหนดค่า Hadoop ( fs.gs.proxy.username )(ไม่บังคับ จำเป็นเฉพาะเมื่อเชื่อมต่อกับ GCP ผ่านพร็อกซีที่มีการตรวจสอบสิทธิ์) | อ่าน/เขียน |
proxyPassword | รหัสผ่านที่ใช้เชื่อมต่อกับพร็อกซี สามารถตั้งค่าอีกทางหนึ่งได้ในการกำหนดค่า Spark ( spark.conf.set(...) ) หรือในการกำหนดค่า Hadoop ( fs.gs.proxy.password )(ไม่บังคับ จำเป็นเฉพาะเมื่อเชื่อมต่อกับ GCP ผ่านพร็อกซีที่มีการตรวจสอบสิทธิ์) | อ่าน/เขียน |
httpMaxRetry | จำนวนครั้งสูงสุดในการลองส่งคำขอ HTTP ระดับต่ำไปยัง BigQuery สามารถตั้งค่าอีกทางหนึ่งได้ในการกำหนดค่า Spark ( spark.conf.set("httpMaxRetry", ...) ) หรือใน Hadoop Configuration ( fs.gs.http.max.retry )(ไม่บังคับ ค่าเริ่มต้นคือ 10) | อ่าน/เขียน |
httpConnectTimeout | ระยะหมดเวลาเป็นมิลลิวินาทีเพื่อสร้างการเชื่อมต่อกับ BigQuery สามารถตั้งค่าอีกทางหนึ่งได้ในการกำหนดค่า Spark ( spark.conf.set("httpConnectTimeout", ...) ) หรือในการกำหนดค่า Hadoop ( fs.gs.http.connect-timeout )(ไม่บังคับ ค่าเริ่มต้นคือ 60,000 ms 0 สำหรับการหมดเวลาแบบไม่สิ้นสุด ซึ่งเป็นจำนวนลบคือ 20,000) | อ่าน/เขียน |
httpReadTimeout | การหมดเวลาในหน่วยมิลลิวินาทีเพื่ออ่านข้อมูลจากการเชื่อมต่อที่สร้างขึ้น สามารถตั้งค่าอีกทางหนึ่งได้ในการกำหนดค่า Spark ( spark.conf.set("httpReadTimeout", ...) ) หรือในการกำหนดค่า Hadoop ( fs.gs.http.read-timeout )(ไม่บังคับ ค่าเริ่มต้นคือ 60,000 ms 0 สำหรับการหมดเวลาแบบไม่สิ้นสุด ซึ่งเป็นจำนวนลบคือ 20,000) | อ่าน |
arrowCompressionCodec | ตัวแปลงสัญญาณการบีบอัดขณะอ่านจากตาราง BigQuery เมื่อใช้รูปแบบลูกศร ตัวเลือก : ZSTD (Zstandard compression) , LZ4_FRAME (https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md) , COMPRESSION_UNSPECIFIED ตัวแปลงสัญญาณการบีบอัดที่แนะนำคือ ZSTD ในขณะที่ใช้ Java(ไม่บังคับ ค่าเริ่มต้นคือ COMPRESSION_UNSPECIFIED ซึ่งหมายความว่าจะไม่มีการใช้การบีบอัด) | อ่าน |
responseCompressionCodec | ตัวแปลงสัญญาณการบีบอัดที่ใช้ในการบีบอัดข้อมูล ReadRowsResponse ตัวเลือก: RESPONSE_COMPRESSION_CODEC_UNSPECIFIED , RESPONSE_COMPRESSION_CODEC_LZ4 (ไม่บังคับ ค่าเริ่มต้นคือ RESPONSE_COMPRESSION_CODEC_UNSPECIFIED ซึ่งหมายความว่าจะไม่มีการใช้การบีบอัด) | อ่าน |
cacheExpirationTimeInMinutes | เวลาหมดอายุของแคชในหน่วยความจำที่จัดเก็บข้อมูลการสืบค้น หากต้องการปิดใช้งานการแคช ให้ตั้งค่าเป็น 0 (ไม่บังคับ ค่าเริ่มต้นคือ 15 นาที) | อ่าน |
enableModeCheckForSchemaFields | ตรวจสอบโหมดของทุกฟิลด์ในสคีมาปลายทางให้เท่ากับโหมดในสคีมาฟิลด์ต้นทางที่สอดคล้องกันระหว่างการเขียน DIRECT ค่าเริ่มต้นเป็นจริง กล่าวคือ การตรวจสอบเสร็จสิ้นตามค่าเริ่มต้น หากตั้งค่าเป็นเท็จ การตรวจสอบโหมดจะถูกละเว้น | เขียน |
enableListInference | ระบุว่าจะใช้การอนุมานสคีมาโดยเฉพาะหรือไม่เมื่อโหมดเป็น Parquet (https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions) ค่าเริ่มต้นเป็นเท็จ | เขียน |
bqChannelPoolSize | ขนาด (คงที่) ของพูลช่อง gRPC ที่สร้างโดย BigQueryReadClient เพื่อประสิทธิภาพสูงสุด ควรตั้งค่าเป็นอย่างน้อยจำนวนคอร์บนตัวดำเนินการคลัสเตอร์ | อ่าน |
createReadSessionTimeoutInSeconds | การหมดเวลาเป็นวินาทีเพื่อสร้าง ReadSession เมื่ออ่านตาราง สำหรับตารางที่มีขนาดใหญ่มาก ควรเพิ่มค่านี้ (ไม่บังคับ ค่าเริ่มต้นคือ 600 วินาที) | อ่าน |
queryJobPriority | ลำดับความสำคัญที่กำหนดไว้สำหรับงานขณะอ่านข้อมูลจากการค้นหา BigQuery ค่าที่อนุญาตคือ:
(ไม่บังคับ ค่าเริ่มต้นเป็น INTERACTIVE ) | อ่าน/เขียน |
destinationTableKmsKeyName | อธิบายคีย์การเข้ารหัส Cloud KMS ที่จะใช้เพื่อปกป้องตาราง BigQuery ปลายทาง บัญชีบริการ BigQuery ที่เชื่อมโยงกับโปรเจ็กต์ของคุณจำเป็นต้องเข้าถึงคีย์การเข้ารหัสนี้ สำหรับข้อมูลเพิ่มเติมเกี่ยวกับการใช้ CMEK กับ BigQuery โปรดดู[ที่นี่](https://cloud.google.com/bigquery/docs/customer-managed-encryption#key_resource_id) หมายเหตุ: ตารางจะถูกเข้ารหัสโดยคีย์เฉพาะในกรณีที่สร้างโดยตัวเชื่อมต่อเท่านั้น ตารางที่ไม่ได้เข้ารหัสที่มีอยู่แล้วจะไม่ถูกเข้ารหัสเพียงแค่ตั้งค่าตัวเลือกนี้ (ไม่จำเป็น) | เขียน |
allowMapTypeConversion | การกำหนดค่าบูลีนเพื่อปิดการแปลงจากบันทึก BigQuery เป็น Spark MapType เมื่อบันทึกมีสองฟิลด์ย่อยที่มีชื่อฟิลด์เป็น key และ value ค่าเริ่มต้นเป็น true ซึ่งทำให้สามารถแปลงได้(ไม่จำเป็น) | อ่าน |
spark.sql.sources.partitionOverwriteMode | กำหนดค่าเพื่อระบุโหมดเขียนทับในการเขียนเมื่อตารางมีการแบ่งพาร์ติชันช่วง/เวลา ปัจจุบันรองรับสองโหมด: STATIC และ DYNAMIC ในโหมด STATIC ตารางทั้งหมดจะถูกเขียนทับ ในโหมด DYNAMIC ข้อมูลจะถูกเขียนทับโดยพาร์ติชันของตารางที่มีอยู่ ค่าเริ่มต้นคือ STATIC (ไม่จำเป็น) | เขียน |
enableReadSessionCaching | การกำหนดค่าบูลีนเพื่อปิดใช้งานการแคชเซสชันการอ่าน แคชเซสชันการอ่าน BigQuery เพื่อให้วางแผนการค้นหา Spark ได้เร็วขึ้น ค่าเริ่มต้นเป็น true (ไม่จำเป็น) | อ่าน |
readSessionCacheDurationMins | กำหนดค่าเพื่อตั้งค่าระยะเวลาการแคชเซสชันการอ่านเป็นนาที ใช้งานได้เฉพาะในกรณีที่ enableReadSessionCaching เป็น true (ค่าเริ่มต้น) อนุญาตให้ระบุระยะเวลาในการแคชเซสชันการอ่าน ค่าสูงสุดที่อนุญาตคือ 300 ค่าเริ่มต้นคือ 5 (ไม่จำเป็น) | อ่าน |
bigQueryJobTimeoutInMinutes | กำหนดค่าเพื่อตั้งค่าระยะหมดเวลาของงาน BigQuery เป็นนาที ค่าเริ่มต้นคือ 360 นาที(ไม่จำเป็น) | อ่าน/เขียน |
snapshotTimeMillis | การประทับเวลาที่ระบุเป็นมิลลิวินาทีเพื่อใช้ในการอ่านสแนปชอตของตาราง โดยค่าเริ่มต้น จะไม่มีการตั้งค่าไว้และจะอ่านตารางเวอร์ชันล่าสุด (ไม่จำเป็น) | อ่าน |
bigNumericDefaultPrecision | ความแม่นยำเริ่มต้นทางเลือกสำหรับช่อง BigNumeric เนื่องจากค่าเริ่มต้นของ BigQuery กว้างเกินไปสำหรับ Spark ค่าสามารถอยู่ระหว่าง 1 ถึง 38 ค่าเริ่มต้นนี้จะใช้เฉพาะเมื่อฟิลด์มีประเภท BigNumeric ที่ไม่มีพารามิเตอร์ โปรดทราบว่าข้อมูลอาจสูญหายได้หากความแม่นยำของข้อมูลจริงมากกว่าที่ระบุไว้ (ไม่จำเป็น) | อ่าน/เขียน |
bigNumericDefaultScale | มาตราส่วนเริ่มต้นทางเลือกสำหรับฟิลด์ BigNumeric ค่าสามารถอยู่ระหว่าง 0 ถึง 38 และน้อยกว่า bigNumericFieldsPrecision ค่าเริ่มต้นนี้จะใช้เฉพาะเมื่อฟิลด์มีประเภท BigNumeric ที่ไม่มีพารามิเตอร์ โปรดทราบว่าข้อมูลอาจสูญหายได้หากขนาดของข้อมูลจริงมากกว่าที่ระบุไว้ (ไม่จำเป็น) | อ่าน/เขียน |
ตัวเลือกยังสามารถตั้งค่านอกโค้ดได้ โดยใช้พารามิเตอร์ --conf
ของ spark-submit
หรือ --properties
พารามิเตอร์ของ gcloud dataproc submit spark
หากต้องการใช้สิ่งนี้ ให้เพิ่มคำนำหน้า spark.datasource.bigquery.
ไปยังตัวเลือกใดๆ เช่น spark.conf.set("temporaryGcsBucket", "some-bucket")
ยังสามารถตั้งค่าเป็น --conf spark.datasource.bigquery.temporaryGcsBucket=some-bucket
ยกเว้น DATETIME
และ TIME
ประเภทข้อมูล BigQuery ทั้งหมดที่กำหนดให้แมปกับประเภทข้อมูล Spark SQL ที่สอดคล้องกัน นี่คือการแมปทั้งหมด:
ประเภทข้อมูล SQL มาตรฐานของ BigQuery | สปาร์ค SQL ประเภทข้อมูล | หมายเหตุ |
BOOL | BooleanType | |
INT64 | LongType | |
FLOAT64 | DoubleType | |
NUMERIC | DecimalType | โปรดดูการสนับสนุนตัวเลขและ BigNumeric |
BIGNUMERIC | DecimalType | โปรดดูการสนับสนุนตัวเลขและ BigNumeric |
STRING | StringType | |
BYTES | BinaryType | |
STRUCT | StructType | |
ARRAY | ArrayType | |
TIMESTAMP | TimestampType | |
DATE | DateType | |
DATETIME | StringType , TimestampNTZType * | Spark ไม่มีประเภท DATETIME สตริง Spark สามารถเขียนลงในคอลัมน์ BQ DATETIME ที่มีอยู่ได้ โดยต้องอยู่ในรูปแบบสำหรับตัวอักษร BQ DATETIME * สำหรับ Spark 3.4+ นั้น BQ DATETIME จะถูกอ่านเป็นประเภท TimestampNTZ ของ Spark เช่น java LocalDateTime |
TIME | LongType , StringType * | Spark ไม่มีประเภท TIME ความยาวที่สร้างขึ้นซึ่งระบุไมโครวินาทีตั้งแต่เที่ยงคืนสามารถส่งไปยัง TimestampType ได้อย่างปลอดภัย แต่จะทำให้วันที่ถูกอนุมานเป็นวันปัจจุบัน ดังนั้นเวลาจะเหลือตามระยะเวลาที่ผู้ใช้ร่ายได้หากต้องการ เมื่อส่งไปที่ Timestamp TIME จะมีปัญหา TimeZone เช่นเดียวกับ DATETIME * สตริง Spark สามารถเขียนลงในคอลัมน์ BQ TIME ที่มีอยู่ได้ โดยต้องอยู่ในรูปแบบสำหรับตัวอักษร BQ TIME |
JSON | StringType | Spark ไม่มีประเภท JSON ค่าจะถูกอ่านเป็นสตริง ในการเขียน JSON กลับไปยัง BigQuery จำเป็นต้องมี เงื่อนไขต่อไปนี้:
|
ARRAY<STRUCT<key,value>> | MapType | BigQuery ไม่มีประเภท MAP ดังนั้นจึงคล้ายกับการแปลงอื่นๆ เช่น งาน Apache Avro และ BigQuery Load โดยตัวเชื่อมต่อจะแปลง Spark Map เป็น REPEATED STRUCT<key,value> ซึ่งหมายความว่าในขณะที่เขียนและอ่านแผนที่ได้ แต่ระบบไม่รองรับการเรียกใช้ SQL บน BigQuery ที่ใช้ซีแมนทิกส์ของแผนที่ หากต้องการอ้างอิงค่าของแผนที่โดยใช้ BigQuery SQL โปรดตรวจสอบเอกสาร BigQuery เนื่องจากความไม่เข้ากันเหล่านี้ จึงมีข้อจำกัดบางประการ:
|
รองรับ Spark ML Vector และ Matrix รวมถึงเวอร์ชันหนาแน่นและเบาบาง ข้อมูลจะถูกบันทึกเป็น BigQuery RECORD โปรดสังเกตว่ามีการเพิ่มคำต่อท้ายในคำอธิบายของฟิลด์ซึ่งรวมถึงประเภทประกายไฟของฟิลด์ด้วย
หากต้องการเขียนประเภทเหล่านั้นไปยัง BigQuery ให้ใช้รูปแบบกลาง ORC หรือ Avro และกำหนดให้เป็นคอลัมน์ของแถว (นั่นคือ ไม่ใช่ฟิลด์ในโครงสร้าง)
BigNumeric ของ BigQuery มีความแม่นยำ 76.76 (หลักที่ 77 เป็นบางส่วน) และมาตราส่วน 38 เนื่องจากความแม่นยำและมาตราส่วนนี้อยู่นอกเหนือการสนับสนุน DecimalType (มาตราส่วน 38 และความแม่นยำ 38) ของ Spark หมายความว่าไม่สามารถใช้ช่อง BigNumeric ที่มีความแม่นยำมากกว่า 38 ได้ . เมื่อข้อจำกัดของ Spark นี้ได้รับการอัปเดตแล้ว ตัวเชื่อมต่อก็จะได้รับการอัปเดตตามนั้น
การแปลงตัวเลข Spark Decimal/BigQuery พยายามรักษาการกำหนดพารามิเตอร์ของประเภท เช่น NUMERIC(10,2)
จะถูกแปลงเป็น Decimal(10,2)
และในทางกลับกัน โปรดสังเกตว่ามีหลายกรณีที่พารามิเตอร์หายไป ซึ่งหมายความว่าพารามิเตอร์จะเปลี่ยนกลับเป็นค่าเริ่มต้น - NUMERIC (38,9) และ BIGNUMERIC (76,38) ซึ่งหมายความว่าในขณะนี้ การอ่าน BigNumeric ได้รับการสนับสนุนจากตารางมาตรฐานเท่านั้น แต่ไม่รองรับจากมุมมอง BigQuery หรือเมื่ออ่านข้อมูลจากการค้นหา BigQuery
ตัวเชื่อมต่อจะคำนวณคอลัมน์โดยอัตโนมัติและตัวกรองแบบกดลงจะกรองคำสั่ง SELECT
ของ DataFrame เช่น
spark.read.bigquery("bigquery-public-data:samples.shakespeare")
.select("word")
.where("word = 'Hamlet' or word = 'Claudius'")
.collect()
กรอง word
ในคอลัมน์และกดตัวกรองภาคแสดง word = 'hamlet' or word = 'Claudius'
หากคุณไม่ต้องการส่งคำขออ่านหลายรายการไปยัง BigQuery คุณสามารถแคช DataFrame ก่อนที่จะกรอง เช่น:
val cachedDF = spark.read.bigquery("bigquery-public-data:samples.shakespeare").cache()
val rows = cachedDF.select("word")
.where("word = 'Hamlet'")
.collect()
// All of the table was cached and this doesn't require an API call
val otherRows = cachedDF.select("word_count")
.where("word = 'Romeo'")
.collect()
คุณยังสามารถระบุตัวเลือก filter
ด้วยตนเอง ซึ่งจะแทนที่การกดลงอัตโนมัติ และ Spark จะดำเนินการกรองส่วนที่เหลือในไคลเอนต์
คอลัมน์หลอก _PARTITIONDATE และ _PARTITIONTIME ไม่ได้เป็นส่วนหนึ่งของสคีมาตาราง ดังนั้นเพื่อที่จะสอบถามโดยพาร์ติชั่นของตารางที่แบ่งพาร์ติชั่นอย่าใช้เมธอดwhere() ที่แสดงด้านบน ให้เพิ่มตัวเลือกตัวกรองในลักษณะต่อไปนี้แทน:
val df = spark.read.format("bigquery")
.option("filter", "_PARTITIONDATE > '2019-01-01'")
...
.load(TABLE)
ตามค่าเริ่มต้น ตัวเชื่อมต่อจะสร้างหนึ่งพาร์ติชันต่อ 400MB ในตารางที่กำลังอ่าน (ก่อนที่จะกรอง) โดยคร่าวๆ ควรสอดคล้องกับจำนวนผู้อ่านสูงสุดที่ BigQuery Storage API รองรับ ซึ่งสามารถกำหนดค่าได้อย่างชัดเจนด้วยคุณสมบัติ maxParallelism
BigQuery อาจจำกัดจำนวนพาร์ติชันตามข้อจำกัดของเซิร์ฟเวอร์
เพื่อรองรับการติดตามการใช้ทรัพยากร BigQuery ตัวเชื่อมต่อเสนอตัวเลือกต่อไปนี้ในการแท็กทรัพยากร BigQuery:
เครื่องมือเชื่อมต่อสามารถเรียกใช้งานการโหลดและการค้นหาของ BigQuery ได้ การเพิ่มป้ายกำกับให้กับงานทำได้ในลักษณะต่อไปนี้:
spark.conf.set("bigQueryJobLabel.cost_center", "analytics")
spark.conf.set("bigQueryJobLabel.usage", "nightly_etl")
สิ่งนี้จะสร้างป้ายกำกับ cost_center
= analytics
และ usage
= nightly_etl
ใช้เพื่ออธิบายเซสชันการอ่านและเขียน ID การติดตามอยู่ในรูปแบบ Spark:ApplicationName:JobID
นี่คือตัวเลือกแบบเลือกใช้ และหากต้องการใช้งาน ผู้ใช้จำเป็นต้องตั้งค่าคุณสมบัติ traceApplicationName
JobID สร้างขึ้นโดยอัตโนมัติโดยรหัสงาน Dataproc โดยมีรหัสสำรองเป็นรหัสแอปพลิเคชัน Spark (เช่น application_1648082975639_0001
) รหัสงานสามารถแทนที่ได้โดยการตั้งค่าตัวเลือก traceJobId
โปรดสังเกตว่าความยาวรวมของ ID ติดตามต้องไม่เกิน 256 อักขระ
สามารถใช้ตัวเชื่อมต่อในโน้ตบุ๊ก Jupyter ได้แม้ว่าจะไม่ได้ติดตั้งบนคลัสเตอร์ Spark ก็ตาม สามารถเพิ่มเป็น jar ภายนอกได้โดยใช้รหัสต่อไปนี้:
หลาม:
from pyspark . sql import SparkSession
spark = SparkSession . builder
. config ( "spark.jars.packages" , "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0" )
. getOrCreate ()
df = spark . read . format ( "bigquery" )
. load ( "dataset.table" )
สกาล่า:
val spark = SparkSession .builder
.config( " spark.jars.packages " , " com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.41.0 " )
.getOrCreate()
val df = spark.read.format( " bigquery " )
.load( " dataset.table " )
ในกรณีที่คลัสเตอร์ Spark ใช้ Scala 2.12 (เป็นทางเลือกสำหรับ Spark 2.4.x ซึ่งจำเป็นใน 3.0.x) แพ็คเกจที่เกี่ยวข้องคือ com.google.cloud.spark:spark-bigquery-with-dependencies_ 2.12 :0.41.0 หากต้องการทราบว่าใช้เวอร์ชัน Scala ใด โปรดเรียกใช้โค้ดต่อไปนี้:
หลาม:
spark . sparkContext . _jvm . scala . util . Properties . versionString ()
สกาล่า:
scala . util . Properties . versionString
เว้นแต่ว่าคุณต้องการใช้ Scala API โดยนัย spark.read.bigquery("TABLE_ID")
ก็ไม่จำเป็นต้องคอมไพล์กับตัวเชื่อมต่อ
หากต้องการรวมตัวเชื่อมต่อในโครงการของคุณ:
< dependency >
< groupId >com.google.cloud.spark</ groupId >
< artifactId >spark-bigquery-with-dependencies_${scala.version}</ artifactId >
< version >0.41.0</ version >
</ dependency >
libraryDependencies + = " com.google.cloud.spark " %% " spark-bigquery-with-dependencies " % " 0.41.0 "
Spark เติมตัววัดจำนวนมากซึ่งผู้ใช้ปลายทางสามารถพบได้ในหน้าประวัติ Spark แต่ตัวชี้วัดทั้งหมดนี้เกี่ยวข้องกับประกายไฟซึ่งจะถูกรวบรวมโดยปริยายโดยไม่มีการเปลี่ยนแปลงจากตัวเชื่อมต่อ แต่มีเมตริกจำนวนหนึ่งที่เติมจาก BigQuery และปัจจุบันมองเห็นได้ในบันทึกของแอปพลิเคชัน ซึ่งสามารถอ่านได้ในบันทึกของไดรเวอร์/ตัวดำเนินการ
จาก Spark 3.2 เป็นต้นไป Spark ได้จัดเตรียม API ให้เปิดเผยตัวชี้วัดที่กำหนดเองในหน้า Spark UI https://spark.apache.org/docs/3.2.0/api/java/org/apache/spark/sql/connector/metric /custommetric.html
ขณะนี้การใช้ API นี้ตัวเชื่อมต่อจะเปิดเผยตัวชี้วัด BigQuery ต่อไปนี้ในระหว่างการอ่าน
<style> ตาราง#metricstable td, ตาราง th th {break: break-word} </style>ชื่อเมตริก | คำอธิบาย |
---|---|
bytes read | จำนวน BigQuery Bytes อ่าน |
rows read | จำนวนแถว BigQuery อ่าน |
scan time | จำนวนเวลาที่ใช้ระหว่างการตอบสนองของแถวอ่านที่ร้องขอให้ได้รับจากผู้บริหารทั้งหมดในมิลลิวินาที |
parse time | ระยะเวลาที่ใช้ในการแยกแถวอ่านข้ามผู้บริหารทั้งหมดในมิลลิวินาที |
spark time | ระยะเวลาที่ใช้ใน Spark เพื่อประมวลผลการสืบค้น (เช่นนอกเหนือจากการสแกนและการแยกวิเคราะห์) ในผู้บริหารทั้งหมดในมิลลิวินาที |
หมายเหตุ: ในการใช้ตัวชี้วัดในหน้า Spark UI คุณต้องตรวจสอบให้แน่ใจว่า spark-bigquery-metrics-0.41.0.jar
เป็นเส้นทางชั้นเรียนก่อนที่จะเริ่มต้นประวัติศาสตร์เซิร์ฟเวอร์และตัวเชื่อมต่อคือ spark-3.2
หรือสูงกว่า
ดูเอกสารการกำหนดราคา BigQuery
คุณสามารถตั้งค่าจำนวนพาร์ติชันด้วยตนเองด้วยคุณสมบัติ maxParallelism
BigQuery อาจให้พาร์ติชันน้อยกว่าที่คุณขอ ดูการกำหนดค่าพาร์ติชัน
คุณยังสามารถเริ่มต้นใหม่หลังจากอ่านใน Spark
หากมีพาร์ติชั่นมากเกินไปอาจเกินพาร์ทิชัน createWritestream หรือโควตาปริมาณงาน สิ่งนี้เกิดขึ้นเนื่องจากในขณะที่ข้อมูลภายในแต่ละพาร์ทิชันได้รับการประมวลผลแบบอนุกรมพาร์ติชันอิสระอาจถูกประมวลผลแบบขนานบนโหนดที่แตกต่างกันภายในกลุ่ม Spark โดยทั่วไปเพื่อให้แน่ใจว่าปริมาณงานที่ยั่งยืนสูงสุดคุณควรยื่นคำขอเพิ่มโควต้า อย่างไรก็ตามคุณสามารถลดจำนวนพาร์ติชันที่เขียนด้วยตนเองโดยเรียก coalesce
บน DataFrame เพื่อลดปัญหานี้
desiredPartitionCount = 5
dfNew = df.coalesce(desiredPartitionCount)
dfNew.write
กฎง่ายๆคือการมีการจัดการพาร์ติชันเดียวอย่างน้อย 1GB ของข้อมูล
นอกจากนี้โปรดทราบว่างานที่ทำงานกับคุณสมบัติ writeAtLeastOnce
ที่เปิดอยู่จะไม่พบข้อผิดพลาดของโควต้า CreateWritestream
ตัวเชื่อมต่อต้องการอินสแตนซ์ของ googlecredentials เพื่อเชื่อมต่อกับ API BigQuery มีหลายตัวเลือกที่จะให้:
GOOGLE_APPLICATION_CREDENTIALS
ตามที่อธิบายไว้ที่นี่ // Globally
spark.conf.set("credentialsFile", "</path/to/key/file>")
// Per read/Write
spark.read.format("bigquery").option("credentialsFile", "</path/to/key/file>")
// Globally
spark.conf.set("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
// Per read/Write
spark.read.format("bigquery").option("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
gcpAccessTokenProvider
AccessTokenProvider
จะต้องดำเนินการใน Java หรือภาษา JVM อื่น ๆ เช่น Scala หรือ Kotlin มันจะต้องมีตัวสร้าง No-Arg หรือตัวสร้างที่ยอมรับอาร์กิวเมนต์ java.util.String
เดียว พารามิเตอร์การกำหนดค่านี้สามารถจัดหาได้โดยใช้ตัวเลือก gcpAccessTokenProviderConfig
หากไม่ได้ให้สิ่งนี้แล้วตัวสร้างที่ไม่มีการต่ออาร์กจะถูกเรียก ขวดที่มีการใช้งานควรอยู่ใน Classpath ของคลัสเตอร์ // Globally
spark.conf.set("gcpAccessTokenProvider", "com.example.ExampleAccessTokenProvider")
// Per read/Write
spark.read.format("bigquery").option("gcpAccessTokenProvider", "com.example.ExampleAccessTokenProvider")
การเลียนแบบบัญชีบริการสามารถกำหนดค่าสำหรับชื่อผู้ใช้เฉพาะและชื่อกลุ่มหรือสำหรับผู้ใช้ทั้งหมดโดยค่าเริ่มต้นโดยใช้คุณสมบัติด้านล่าง:
gcpImpersonationServiceAccountForUser_<USER_NAME>
(ไม่ได้ตั้งค่าตามค่าเริ่มต้น)
การเลียนแบบบัญชีบริการสำหรับผู้ใช้เฉพาะ
gcpImpersonationServiceAccountForGroup_<GROUP_NAME>
(ไม่ได้ตั้งค่าตามค่าเริ่มต้น)
การแอบอ้างบัญชีบริการสำหรับกลุ่มเฉพาะ
gcpImpersonationServiceAccount
(ไม่ได้ตั้งค่าตามค่าเริ่มต้น)
การแอบอ้างบัญชีบริการเริ่มต้นสำหรับผู้ใช้ทั้งหมด
หากมีการตั้งค่าคุณสมบัติใด ๆ ข้างต้นบัญชีบริการที่ระบุจะถูกแอบอ้างโดยการสร้างข้อมูลรับรองระยะสั้นเมื่อเข้าถึง BigQuery
หากมีการตั้งค่าคุณสมบัติมากกว่าหนึ่งบัญชีบัญชีบริการที่เกี่ยวข้องกับชื่อผู้ใช้จะมีความสำคัญกว่าบัญชีบริการที่เกี่ยวข้องกับชื่อกลุ่มสำหรับผู้ใช้ที่จับคู่และกลุ่มซึ่งจะมีความสำคัญกว่าการเลียนแบบบัญชีบริการเริ่มต้น
สำหรับแอปพลิเคชันที่ง่ายกว่าที่ไม่จำเป็นต้องใช้การรีเฟรชโทเค็นทางเลือกอื่นคือการผ่านโทเค็นการเข้าถึงเป็นตัวเลือกการกำหนดค่า gcpAccessToken
คุณสามารถรับโทเค็นการเข้าถึงได้โดยเรียกใช้ gcloud auth application-default print-access-token
// Globally
spark.conf.set("gcpAccessToken", "<access-token>")
// Per read/Write
spark.read.format("bigquery").option("gcpAccessToken", "<acccess-token>")
สิ่งสำคัญ: CredentialsProvider
และ AccessTokenProvider
จำเป็นต้องใช้ใน Java หรือภาษา JVM อื่น ๆ เช่น Scala หรือ Kotlin ขวดที่มีการใช้งานควรอยู่ใน Classpath ของคลัสเตอร์
ข้อสังเกต: ควรมีตัวเลือกใด ๆ ข้างต้นอย่างใดอย่างหนึ่ง
ในการเชื่อมต่อกับพร็อกซีไปข้างหน้าและเพื่อตรวจสอบสิทธิ์ของผู้ใช้ให้กำหนดค่าตัวเลือกต่อไปนี้
proxyAddress
: ที่อยู่ของพร็อกซีเซิร์ฟเวอร์ พร็อกซีจะต้องเป็นพร็อกซี HTTP และที่อยู่ควรอยู่ใน host:port
proxyUsername
: ชื่อผู้ใช้ที่ใช้เชื่อมต่อกับพร็อกซี
proxyPassword
: รหัสผ่านที่ใช้ในการเชื่อมต่อกับพร็อกซี
val df = spark.read.format("bigquery")
.option("proxyAddress", "http://my-proxy:1234")
.option("proxyUsername", "my-username")
.option("proxyPassword", "my-password")
.load("some-table")
พารามิเตอร์พร็อกซีเดียวกันสามารถตั้งค่าได้ทั่วโลกโดยใช้ Runtimeconfig ของ Spark เช่นนี้:
spark.conf.set("proxyAddress", "http://my-proxy:1234")
spark.conf.set("proxyUsername", "my-username")
spark.conf.set("proxyPassword", "my-password")
val df = spark.read.format("bigquery")
.load("some-table")
คุณสามารถตั้งค่าต่อไปนี้ในการกำหนดค่า Hadoop ได้เช่นกัน
fs.gs.proxy.address
(คล้ายกับ "proxyaddress"), fs.gs.proxy.username
(คล้ายกับ "proxyusername") และ fs.gs.proxy.password
(คล้ายกับ "proxypassword")
หากมีการตั้งค่าพารามิเตอร์เดียวกันในหลายสถานที่ลำดับของลำดับความสำคัญมีดังนี้:
ตัวเลือก ("คีย์", "ค่า")> spark.conf> การกำหนดค่า Hadoop