Perpustakaan Klien Azure DataBricks menawarkan antarmuka yang nyaman untuk mengotomatisasi ruang kerja databricks Azure Anda melalui AZURE DataBricks REST API.
Implementasi perpustakaan ini didasarkan pada REST API versi 2.0 dan di atas.
Cabang master adalah untuk versi 2. Versi 1.1 (stabil) ada di cabang rilis/1.1.
Anda harus memiliki Token Akses Pribadi (PAT) atau Token Direktori Aktif Azure (Token AAD) untuk mengakses API REST Databricks.
API istirahat | Versi | Keterangan |
---|---|---|
Cluster | 2.0 | CLUSTERS API memungkinkan Anda untuk membuat, memulai, mengedit, mendaftar, mengakhiri, dan menghapus cluster. |
Pekerjaan | 2.1 | API Jobs memungkinkan Anda untuk mengelola pekerjaan databricks Azure secara terprogram. |
DBFS | 2.0 | API DBFS adalah API databricks yang membuatnya mudah untuk berinteraksi dengan berbagai sumber data tanpa harus memasukkan kredensial Anda setiap kali Anda membaca file. |
Rahasia | 2.0 | API Rahasia memungkinkan Anda untuk mengelola rahasia, lingkup rahasia, dan izin akses. |
Kelompok | 2.0 | API Grup memungkinkan Anda untuk mengelola grup pengguna. |
Perpustakaan | 2.0 | API Perpustakaan memungkinkan Anda untuk menginstal dan menghapus pustaka dan mendapatkan status pustaka pada sebuah cluster. |
Token | 2.0 | Token API memungkinkan Anda membuat, mendaftar, dan mencabut token yang dapat digunakan untuk mengotentikasi dan mengakses Azure DataBricks REST API. |
Ruang kerja | 2.0 | Workspace API memungkinkan Anda untuk mendaftar, mengimpor, mengekspor, dan menghapus notebook dan folder. |
Instancepool | 2.0 | Instance Pools API memungkinkan Anda untuk membuat, mengedit, menghapus, dan daftar kumpulan instance. |
Izin | 2.0 | API Izin memungkinkan Anda mengelola izin untuk token, cluster, pool, pekerjaan, pipa Tabel Langsung Delta, notebook, direktori, eksperimen MLFLOW, model terdaftar MLFLOW, kebijakan SQL Warehouse, repo dan cluster. |
Kebijakan cluster | 2.0 | API Kebijakan Cluster memungkinkan Anda membuat, mendaftar, dan mengedit kebijakan cluster. |
Skrip init global | 2.0 | API Global Init Scripts memungkinkan administrator databricks Azure menambahkan skrip inisialisasi cluster global dengan cara yang aman dan terkontrol. |
Gudang SQL | 2.0 | API Gudang SQL memungkinkan Anda untuk mengelola sumber daya komputasi yang memungkinkan Anda menjalankan perintah SQL pada objek data dalam databricks SQL. |
Repo | 2.0 | API Repo memungkinkan pengguna untuk mengelola repo git mereka. Pengguna dapat menggunakan API untuk mengakses semua repo yang telah mereka kelola izin. |
Pipa (Tabel Langsung Delta) | 2.0 | Delta Live Tables API memungkinkan Anda membuat, mengedit, menghapus, memulai, dan melihat detail tentang pipa. |
Lihatlah proyek sampel untuk penggunaan yang lebih rinci.
Dalam contoh-contoh berikut, variabel baseUrl
harus diatur ke URL basis ruang kerja, yang terlihat seperti https://adb-<workspace-id>.<random-number>.azuredatabricks.net
, dan variabel token
harus diatur ke Anda Databricks Token Akses Pribadi.
using ( var client = DatabricksClient . CreateClient ( baseUrl , token ) )
{
// ...
}
var clusterConfig = ClusterAttributes
. GetNewClusterConfiguration ( " Sample cluster " )
. WithRuntimeVersion ( RuntimeVersions . Runtime_10_4 )
. WithAutoScale ( 3 , 7 )
. WithAutoTermination ( 30 )
. WithClusterLogConf ( " dbfs:/logs/ " )
. WithNodeType ( NodeTypes . Standard_D3_v2 )
. WithClusterMode ( ClusterMode . SingleNode ) ;
var clusterId = await client . Clusters . Create ( clusterConfig ) ;
using Policy = Polly . Policy ;
static async Task WaitForCluster ( IClustersApi clusterClient , string clusterId , int pollIntervalSeconds = 15 )
{
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. OrResult < ClusterInfo > ( info => info . State is not ( ClusterState . RUNNING or ClusterState . ERROR or ClusterState . TERMINATED ) )
. WaitAndRetryForeverAsync (
_ => TimeSpan . FromSeconds ( pollIntervalSeconds ) ,
( delegateResult , _ ) =>
{
if ( delegateResult . Exception != null )
{
Console . WriteLine ( $" [ { DateTime . UtcNow : s } ] Failed to query cluster info - { delegateResult . Exception } " ) ;
}
} ) ;
await retryPolicy . ExecuteAsync ( async ( ) =>
{
var info = await clusterClient . Get ( clusterId ) ;
Console . WriteLine ( $" [ { DateTime . UtcNow : s } ] Cluster: { clusterId } t State: { info . State } t Message: { info . StateMessage } " ) ;
return info ;
} ) ;
}
await WaitForCluster ( client . Clusters , clusterId ) ;
await client . Clusters . Terminate ( clusterId ) ;
await WaitForCluster ( client . Clusters , clusterId ) ;
await client . Clusters . Delete ( clusterId ) ;
// Job schedule
var schedule = new CronSchedule
{
QuartzCronExpression = " 0 0 9 ? * MON-FRI " ,
TimezoneId = " Europe/London " ,
PauseStatus = PauseStatus . UNPAUSED
} ;
// Run with a job cluster
var newCluster = ClusterAttributes . GetNewClusterConfiguration ( )
. WithClusterMode ( ClusterMode . SingleNode )
. WithNodeType ( NodeTypes . Standard_D3_v2 )
. WithRuntimeVersion ( RuntimeVersions . Runtime_10_4 ) ;
// Create job settings
var jobSettings = new JobSettings
{
MaxConcurrentRuns = 1 ,
Schedule = schedule ,
Name = " Sample Job "
} ;
// Adding 3 tasks to the job settings.
var task1 = jobSettings . AddTask ( " task1 " , new NotebookTask { NotebookPath = SampleNotebookPath } )
. WithDescription ( " Sample Job - task1 " )
. WithNewCluster ( newCluster ) ;
var task2 = jobSettings . AddTask ( " task2 " , new NotebookTask { NotebookPath = SampleNotebookPath } )
. WithDescription ( " Sample Job - task2 " )
. WithNewCluster ( newCluster ) ;
jobSettings . AddTask ( " task3 " , new NotebookTask { NotebookPath = SampleNotebookPath } , new [ ] { task1 , task2 } )
. WithDescription ( " Sample Job - task3 " )
. WithNewCluster ( newCluster ) ;
// Create the job.
Console . WriteLine ( " Creating new job " ) ;
var jobId = await client . Jobs . Create ( jobSettings ) ;
Console . WriteLine ( " Job created: {0} " , jobId ) ;
// Start the job and retrieve the run id.
Console . WriteLine ( " Run now: {0} " , jobId ) ;
var runId = await client . Jobs . RunNow ( jobId ) ;
using Policy = Polly . Policy ;
static async Task WaitForRun ( IJobsApi jobClient , long runId , int pollIntervalSeconds = 15 )
{
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. OrResult < RunState > ( state =>
state . LifeCycleState is RunLifeCycleState . PENDING or RunLifeCycleState . RUNNING
or RunLifeCycleState . TERMINATING )
. WaitAndRetryForeverAsync (
_ => TimeSpan . FromSeconds ( pollIntervalSeconds ) ,
( delegateResult , _ ) =>
{
if ( delegateResult . Exception != null )
{
Console . WriteLine (
$" [ { DateTime . UtcNow : s } ] Failed to query run - { delegateResult . Exception } " ) ;
}
} ) ;
await retryPolicy . ExecuteAsync ( async ( ) =>
{
var ( run , _ ) = await jobClient . RunsGet ( runId ) ;
Console . WriteLine (
$" [ { DateTime . UtcNow : s } ] Run: { runId } t LifeCycleState: { run . State . LifeCycleState } t ResultState: { run . State . ResultState } t Completed: { run . IsCompleted } "
) ;
return run . State ;
} ) ;
}
await WaitForRun ( client . Jobs , runId ) ;
var ( run , _ ) = await client . Jobs . RunsGet ( runId ) ;
foreach ( var runTask in run . Tasks )
{
var viewItems = await client . Jobs . RunsExport ( runTask . RunId ) ;
foreach ( var viewItem in viewItems )
{
Console . WriteLine ( $" Exported view item from run { runTask . RunId } , task " { runTask . TaskKey } " , view " { viewItem . Name } " " ) ;
Console . WriteLine ( " ==================== " ) ;
Console . WriteLine ( viewItem . Content [ .. 200 ] + " ... " ) ;
Console . WriteLine ( " ==================== " ) ;
}
}
Menciptakan ruang lingkup rahasia
const string scope = " SampleScope " ;
await client . Secrets . CreateScope ( scope , null ) ;
Buat rahasia teks
var secretName = " secretkey.text " ;
await client . Secrets . PutSecret ( " secret text " , scope , secretName ) ;
Buat rahasia biner
var secretName = " secretkey.bin " ;
await client . Secrets . PutSecret ( new byte [ ] { 0x01 , 0x02 , 0x03 , 0x04 } , scope , secretName ) ;
clusters/create
, jobs/run-now
dan jobs/runs/submit
Token Idempotency Dukungan API. Ini adalah token opsional untuk menjamin idempotensi permintaan. Jika sumber daya (cluster atau run) dengan token yang disediakan sudah ada, permintaan tidak membuat sumber daya baru tetapi mengembalikan ID dari sumber daya yang ada sebagai gantinya.
Jika Anda menentukan token idempotensi, setelah kegagalan Anda dapat mencoba lagi sampai permintaan berhasil. DataBricks menjamin bahwa tepat satu sumber daya diluncurkan dengan token idempotensi itu.
Kode berikut menggambarkan cara menggunakan Polly untuk mencoba lagi permintaan dengan idempotency_token
jika permintaan gagal.
using Polly ;
double retryIntervalSec = 15 ;
string idempotencyToken = Guid . NewGuid ( ) . ToString ( ) ;
var clusterInfo = ClusterAttributes . GetNewClusterConfiguration ( " my-cluster " )
. WithNodeType ( " Standard_D3_v2 " )
. WithNumberOfWorkers ( 25 )
. WithRuntimeVersion ( RuntimeVersions . Runtime_7_3 ) ;
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . ServiceUnavailable )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. WaitAndRetryForeverAsync ( _ => TimeSpan . FromSeconds ( retryIntervalSec ) ) ;
var clusterId = await retryPolicy . ExecuteAsync ( async ( ) => await client . Clusters . Create ( clusterInfo , idempotencyToken ) ) ;
V2 dari perpustakaan menargetkan .NET 6 Runtime.
API Jobs didesain ulang untuk menyelaraskan dengan versi 2.1 dari API REST.
Di versi sebelumnya, API Jobs hanya mendukung tugas tunggal per pekerjaan. API Jobs baru mendukung banyak tugas per pekerjaan, di mana tugas -tugas tersebut diwakili sebagai DAG.
Versi baru ini mendukung dua jenis tugas lagi: Tugas Roda Python dan Tugas Pipa Tabel Langsung Delta.
Proyek ini menyambut kontribusi dan saran. Sebagian besar kontribusi mengharuskan Anda untuk menyetujui perjanjian lisensi kontributor (CLA) yang menyatakan bahwa Anda memiliki hak untuk, dan benar -benar melakukannya, beri kami hak untuk menggunakan kontribusi Anda. Untuk detailnya, kunjungi Perjanjian Lisensi Kontributor Microsoft (CLA).
Saat Anda mengirimkan permintaan tarik, CLA-bot akan secara otomatis menentukan apakah Anda perlu memberikan CLA dan menghiasi PR secara tepat (misalnya, label, komentar). Cukup ikuti instruksi yang disediakan oleh bot. Anda hanya perlu melakukan ini sekali di semua repo menggunakan CLA kami.
Proyek ini telah mengadopsi kode perilaku open source Microsoft. Untuk informasi lebih lanjut, lihat FAQ Kode Perilaku atau hubungi [email protected] dengan pertanyaan atau komentar tambahan.