ไลบรารีไคลเอนต์ Azure Databricks นำเสนออินเทอร์เฟซที่สะดวกสำหรับการทำงาน Azure Databricks Workspace ของคุณผ่าน Azure Databricks REST API API
การใช้งานไลบรารีนี้ขึ้นอยู่กับ REST API เวอร์ชัน 2.0 ขึ้นไป
สาขาหลักสำหรับรุ่น 2 เวอร์ชัน 1.1 (เสถียร) อยู่ในรุ่น/1.1 สาขา
คุณต้องมีโทเค็นการเข้าถึงส่วนบุคคล (PAT) หรือ Azure Active Directory Tokens (AAD Token) เพื่อเข้าถึง Databricks REST API
REST API | รุ่น | คำอธิบาย |
---|---|---|
กลุ่ม | 2.0 | กลุ่ม API ช่วยให้คุณสร้างเริ่มต้นแก้ไขรายการยุติและลบกลุ่ม |
งาน | 2.1 | Jobs API ช่วยให้คุณจัดการงาน Azure Databricks โดยทางโปรแกรม |
DBFS | 2.0 | DBFS API เป็น Databricks API ที่ทำให้การโต้ตอบกับแหล่งข้อมูลต่าง ๆ โดยไม่ต้องรวมข้อมูลประจำตัวของคุณทุกครั้งที่คุณอ่านไฟล์ |
ความลับ | 2.0 | SECRETS API ช่วยให้คุณจัดการความลับขอบเขตความลับและสิทธิ์การเข้าถึง |
กลุ่ม | 2.0 | กลุ่ม API ช่วยให้คุณจัดการกลุ่มผู้ใช้ |
ห้องสมุด | 2.0 | Libraries API ช่วยให้คุณสามารถติดตั้งและถอนการติดตั้งไลบรารีและรับสถานะของไลบรารีบนคลัสเตอร์ |
โทเค็น | 2.0 | โทเค็น API ช่วยให้คุณสร้างรายการและเพิกถอนโทเค็นที่สามารถใช้ในการตรวจสอบสิทธิ์และเข้าถึง Azure Databricks REST APIs |
พื้นที่ทำงาน | 2.0 | Workspace API ช่วยให้คุณสามารถแสดงรายการนำเข้าส่งออกและลบโน้ตบุ๊กและโฟลเดอร์ |
อินสแตนซ์พูล | 2.0 | พูลอินสแตนซ์ API ช่วยให้คุณสร้างแก้ไขลบและแสดงรายการพูลอินสแตนซ์ |
การอนุญาต | 2.0 | Permissions API ช่วยให้คุณจัดการสิทธิ์สำหรับโทเค็น, คลัสเตอร์, พูล, งาน, เดลต้า Live Tables Pipeline, สมุดบันทึก, ไดเรกทอรี, การทดลอง MLFlow, โมเดลที่ลงทะเบียนของ MLFLOW, SQL Warehouse, นโยบายการซื้อคืนและคลัสเตอร์ |
นโยบายคลัสเตอร์ | 2.0 | นโยบายคลัสเตอร์ API ช่วยให้คุณสร้างรายการและแก้ไขนโยบายคลัสเตอร์ |
สคริปต์สคริปต์เริ่มต้นทั่วโลก | 2.0 | สคริปต์ INIT ทั่วโลกช่วยให้ผู้ดูแลระบบ Azure Databricks เพิ่มสคริปต์การเริ่มต้นคลัสเตอร์ทั่วโลกในลักษณะที่ปลอดภัยและควบคุมได้ |
คลังสินค้า SQL | 2.0 | SQL Warehouses API ช่วยให้คุณจัดการทรัพยากรการคำนวณที่ให้คุณเรียกใช้คำสั่ง SQL บนวัตถุข้อมูลภายใน Databricks SQL |
repos | 2.0 | REPOS API ช่วยให้ผู้ใช้สามารถจัดการ GIT repos ของพวกเขา ผู้ใช้สามารถใช้ API เพื่อเข้าถึง repos ทั้งหมดที่พวกเขามีการจัดการสิทธิ์ |
Pipelines (Delta Live Tables) | 2.0 | Delta Live Tables API ช่วยให้คุณสร้างแก้ไขลบเริ่มต้นและดูรายละเอียดเกี่ยวกับท่อ |
ตรวจสอบโครงการตัวอย่างสำหรับการใช้งานโดยละเอียดเพิ่มเติม
ในตัวอย่างต่อไปนี้ควรตั้งค่าตัวแปร baseUrl
เป็น URL ฐานเวิร์กสเป token
ซึ่งดูเหมือนว่า https://adb-<workspace-id>.<random-number>.azuredatabricks.net
โทเค็นการเข้าถึงส่วนบุคคลของ Databricks
using ( var client = DatabricksClient . CreateClient ( baseUrl , token ) )
{
// ...
}
var clusterConfig = ClusterAttributes
. GetNewClusterConfiguration ( " Sample cluster " )
. WithRuntimeVersion ( RuntimeVersions . Runtime_10_4 )
. WithAutoScale ( 3 , 7 )
. WithAutoTermination ( 30 )
. WithClusterLogConf ( " dbfs:/logs/ " )
. WithNodeType ( NodeTypes . Standard_D3_v2 )
. WithClusterMode ( ClusterMode . SingleNode ) ;
var clusterId = await client . Clusters . Create ( clusterConfig ) ;
using Policy = Polly . Policy ;
static async Task WaitForCluster ( IClustersApi clusterClient , string clusterId , int pollIntervalSeconds = 15 )
{
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. OrResult < ClusterInfo > ( info => info . State is not ( ClusterState . RUNNING or ClusterState . ERROR or ClusterState . TERMINATED ) )
. WaitAndRetryForeverAsync (
_ => TimeSpan . FromSeconds ( pollIntervalSeconds ) ,
( delegateResult , _ ) =>
{
if ( delegateResult . Exception != null )
{
Console . WriteLine ( $" [ { DateTime . UtcNow : s } ] Failed to query cluster info - { delegateResult . Exception } " ) ;
}
} ) ;
await retryPolicy . ExecuteAsync ( async ( ) =>
{
var info = await clusterClient . Get ( clusterId ) ;
Console . WriteLine ( $" [ { DateTime . UtcNow : s } ] Cluster: { clusterId } t State: { info . State } t Message: { info . StateMessage } " ) ;
return info ;
} ) ;
}
await WaitForCluster ( client . Clusters , clusterId ) ;
await client . Clusters . Terminate ( clusterId ) ;
await WaitForCluster ( client . Clusters , clusterId ) ;
await client . Clusters . Delete ( clusterId ) ;
// Job schedule
var schedule = new CronSchedule
{
QuartzCronExpression = " 0 0 9 ? * MON-FRI " ,
TimezoneId = " Europe/London " ,
PauseStatus = PauseStatus . UNPAUSED
} ;
// Run with a job cluster
var newCluster = ClusterAttributes . GetNewClusterConfiguration ( )
. WithClusterMode ( ClusterMode . SingleNode )
. WithNodeType ( NodeTypes . Standard_D3_v2 )
. WithRuntimeVersion ( RuntimeVersions . Runtime_10_4 ) ;
// Create job settings
var jobSettings = new JobSettings
{
MaxConcurrentRuns = 1 ,
Schedule = schedule ,
Name = " Sample Job "
} ;
// Adding 3 tasks to the job settings.
var task1 = jobSettings . AddTask ( " task1 " , new NotebookTask { NotebookPath = SampleNotebookPath } )
. WithDescription ( " Sample Job - task1 " )
. WithNewCluster ( newCluster ) ;
var task2 = jobSettings . AddTask ( " task2 " , new NotebookTask { NotebookPath = SampleNotebookPath } )
. WithDescription ( " Sample Job - task2 " )
. WithNewCluster ( newCluster ) ;
jobSettings . AddTask ( " task3 " , new NotebookTask { NotebookPath = SampleNotebookPath } , new [ ] { task1 , task2 } )
. WithDescription ( " Sample Job - task3 " )
. WithNewCluster ( newCluster ) ;
// Create the job.
Console . WriteLine ( " Creating new job " ) ;
var jobId = await client . Jobs . Create ( jobSettings ) ;
Console . WriteLine ( " Job created: {0} " , jobId ) ;
// Start the job and retrieve the run id.
Console . WriteLine ( " Run now: {0} " , jobId ) ;
var runId = await client . Jobs . RunNow ( jobId ) ;
using Policy = Polly . Policy ;
static async Task WaitForRun ( IJobsApi jobClient , long runId , int pollIntervalSeconds = 15 )
{
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. OrResult < RunState > ( state =>
state . LifeCycleState is RunLifeCycleState . PENDING or RunLifeCycleState . RUNNING
or RunLifeCycleState . TERMINATING )
. WaitAndRetryForeverAsync (
_ => TimeSpan . FromSeconds ( pollIntervalSeconds ) ,
( delegateResult , _ ) =>
{
if ( delegateResult . Exception != null )
{
Console . WriteLine (
$" [ { DateTime . UtcNow : s } ] Failed to query run - { delegateResult . Exception } " ) ;
}
} ) ;
await retryPolicy . ExecuteAsync ( async ( ) =>
{
var ( run , _ ) = await jobClient . RunsGet ( runId ) ;
Console . WriteLine (
$" [ { DateTime . UtcNow : s } ] Run: { runId } t LifeCycleState: { run . State . LifeCycleState } t ResultState: { run . State . ResultState } t Completed: { run . IsCompleted } "
) ;
return run . State ;
} ) ;
}
await WaitForRun ( client . Jobs , runId ) ;
var ( run , _ ) = await client . Jobs . RunsGet ( runId ) ;
foreach ( var runTask in run . Tasks )
{
var viewItems = await client . Jobs . RunsExport ( runTask . RunId ) ;
foreach ( var viewItem in viewItems )
{
Console . WriteLine ( $" Exported view item from run { runTask . RunId } , task " { runTask . TaskKey } " , view " { viewItem . Name } " " ) ;
Console . WriteLine ( " ==================== " ) ;
Console . WriteLine ( viewItem . Content [ .. 200 ] + " ... " ) ;
Console . WriteLine ( " ==================== " ) ;
}
}
การสร้างขอบเขตความลับ
const string scope = " SampleScope " ;
await client . Secrets . CreateScope ( scope , null ) ;
สร้างความลับข้อความ
var secretName = " secretkey.text " ;
await client . Secrets . PutSecret ( " secret text " , scope , secretName ) ;
สร้างความลับแบบไบนารี
var secretName = " secretkey.bin " ;
await client . Secrets . PutSecret ( new byte [ ] { 0x01 , 0x02 , 0x03 , 0x04 } , scope , secretName ) ;
clusters/create
jobs/run-now
และ jobs/runs/submit
โทเค็น idempotency สนับสนุน APIs เป็นโทเค็นทางเลือกในการรับประกันความผิดปกติของคำขอ หากทรัพยากร (คลัสเตอร์หรือการรัน) ที่มีโทเค็นที่มีอยู่แล้วมีอยู่แล้วคำขอจะไม่สร้างทรัพยากรใหม่ แต่ส่งคืน ID ของทรัพยากรที่มีอยู่แทน
หากคุณระบุโทเค็น idempotency เมื่อล้มเหลวคุณสามารถลองอีกครั้งจนกว่าคำขอจะสำเร็จ Databricks รับประกันได้ว่ามีการเปิดตัวทรัพยากรหนึ่งอย่างด้วยโทเค็น idempotency นั้น
รหัสต่อไปนี้แสดงวิธีการใช้ Polly เพื่อลองคำขอด้วย idempotency_token
หากคำขอล้มเหลว
using Polly ;
double retryIntervalSec = 15 ;
string idempotencyToken = Guid . NewGuid ( ) . ToString ( ) ;
var clusterInfo = ClusterAttributes . GetNewClusterConfiguration ( " my-cluster " )
. WithNodeType ( " Standard_D3_v2 " )
. WithNumberOfWorkers ( 25 )
. WithRuntimeVersion ( RuntimeVersions . Runtime_7_3 ) ;
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . ServiceUnavailable )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. WaitAndRetryForeverAsync ( _ => TimeSpan . FromSeconds ( retryIntervalSec ) ) ;
var clusterId = await retryPolicy . ExecuteAsync ( async ( ) => await client . Clusters . Create ( clusterInfo , idempotencyToken ) ) ;
V2 ของ Library เป้าหมาย. NET 6 Runtime
Jobs API ได้รับการออกแบบใหม่ให้สอดคล้องกับเวอร์ชัน 2.1 ของ REST API
ในเวอร์ชันก่อนหน้า Jobs API รองรับงานเดียวต่องานเท่านั้น งานใหม่ API รองรับงานหลายงานต่องานโดยที่งานจะแสดงเป็น DAG
เวอร์ชันใหม่รองรับงานอีกสองประเภท: งาน Python Wheel และ Delta Live Tables Pipeline
โครงการนี้ยินดีต้อนรับการมีส่วนร่วมและข้อเสนอแนะ การมีส่วนร่วมส่วนใหญ่กำหนดให้คุณต้องยอมรับข้อตกลงใบอนุญาตผู้มีส่วนร่วม (CLA) ประกาศว่าคุณมีสิทธิ์และทำจริงให้สิทธิ์ในการใช้การบริจาคของคุณ สำหรับรายละเอียดเยี่ยมชมข้อตกลงใบอนุญาตผู้สนับสนุน Microsoft (CLA)
เมื่อคุณส่งคำขอดึง CLA-bot จะพิจารณาโดยอัตโนมัติว่าคุณจำเป็นต้องจัดเตรียม CLA และตกแต่ง PR อย่างเหมาะสม (เช่นฉลาก, ความคิดเห็น) เพียงทำตามคำแนะนำที่จัดทำโดยบอท คุณจะต้องทำสิ่งนี้เพียงครั้งเดียวใน repos ทั้งหมดโดยใช้ CLA ของเรา
โครงการนี้ได้นำรหัสการดำเนินงานของ Microsoft โอเพ่นซอร์สมาใช้ สำหรับข้อมูลเพิ่มเติมโปรดดูจรรยาบรรณคำถามที่พบบ่อยหรือติดต่อ [email protected] พร้อมคำถามหรือความคิดเห็นเพิ่มเติมใด ๆ