Azure Databricks客戶庫庫提供了一個方便的接口,用於通過Azure Databricks REST API自動化Azure Databricks工作區。
該庫的實現基於REST API 2.0及更高版本。
主分支是版本2。版本1.1(穩定)在版本/1.1分支中。
您必須具有個人訪問令牌(PAT)或Azure Active Directory標記(AAD令牌)才能訪問Databricks REST API。
REST API | 版本 | 描述 |
---|---|---|
集群 | 2.0 | 簇API允許您創建,啟動,編輯,列表,終止和刪除簇。 |
工作 | 2.1 | 作業API允許您通過編程方式管理Azure Databricks作業。 |
DBFS | 2.0 | DBFS API是一個數據鏈球api,它使與各種數據源進行交互變得易於互動,而無需每次讀取文件時都包含您的憑據。 |
秘密 | 2.0 | Secrets API允許您管理秘密,秘密範圍和訪問權限。 |
組 | 2.0 | 組API允許您管理用戶組。 |
庫 | 2.0 | 庫API允許您安裝和卸載庫,並在集群上獲取庫的狀態。 |
令牌 | 2.0 | 令牌API允許您創建,列出和撤銷可用於身份驗證和訪問Azure Databricks REST API的令牌。 |
工作區 | 2.0 | 工作區API允許您列出,導入,導出和刪除筆記本電腦和文件夾。 |
InstancePool | 2.0 | 實例池API允許您創建,編輯,刪除和列表實例池。 |
權限 | 2.0 | 權限API使您可以管理令牌,群集,池,作業,Delta Live Tables Pipeline,筆記本,目錄,MLFLOW實驗,MLFlow註冊模型,SQL倉庫,回購和集群策略的權限。 |
集群策略 | 2.0 | 集群策略API允許您創建,列出和編輯群集策略。 |
全局初始腳本 | 2.0 | 全局初始腳本API使Azure Databricks管理員以安全和控制的方式添加全局群集初始化腳本。 |
SQL倉庫 | 2.0 | SQL倉庫API允許您管理計算資源,使您可以在Databricks SQL中的數據對像上運行SQL命令。 |
存儲庫 | 2.0 | 存儲庫API允許用戶管理其GIT存儲庫。用戶可以使用API訪問其管理權限的所有存儲庫。 |
管道(三角洲現場表) | 2.0 | Delta Live Tables API允許您創建,編輯,刪除,啟動和查看有關管道的詳細信息。 |
查看示例項目以獲取更多詳細的用法。
在以下示例中,應將baseUrl
變量設置為工作區鹼URL,該token
看起來像https://adb-<workspace-id>.<random-number>.azuredatabricks.net
Databricks個人訪問令牌。
using ( var client = DatabricksClient . CreateClient ( baseUrl , token ) )
{
// ...
}
var clusterConfig = ClusterAttributes
. GetNewClusterConfiguration ( " Sample cluster " )
. WithRuntimeVersion ( RuntimeVersions . Runtime_10_4 )
. WithAutoScale ( 3 , 7 )
. WithAutoTermination ( 30 )
. WithClusterLogConf ( " dbfs:/logs/ " )
. WithNodeType ( NodeTypes . Standard_D3_v2 )
. WithClusterMode ( ClusterMode . SingleNode ) ;
var clusterId = await client . Clusters . Create ( clusterConfig ) ;
using Policy = Polly . Policy ;
static async Task WaitForCluster ( IClustersApi clusterClient , string clusterId , int pollIntervalSeconds = 15 )
{
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. OrResult < ClusterInfo > ( info => info . State is not ( ClusterState . RUNNING or ClusterState . ERROR or ClusterState . TERMINATED ) )
. WaitAndRetryForeverAsync (
_ => TimeSpan . FromSeconds ( pollIntervalSeconds ) ,
( delegateResult , _ ) =>
{
if ( delegateResult . Exception != null )
{
Console . WriteLine ( $" [ { DateTime . UtcNow : s } ] Failed to query cluster info - { delegateResult . Exception } " ) ;
}
} ) ;
await retryPolicy . ExecuteAsync ( async ( ) =>
{
var info = await clusterClient . Get ( clusterId ) ;
Console . WriteLine ( $" [ { DateTime . UtcNow : s } ] Cluster: { clusterId } t State: { info . State } t Message: { info . StateMessage } " ) ;
return info ;
} ) ;
}
await WaitForCluster ( client . Clusters , clusterId ) ;
await client . Clusters . Terminate ( clusterId ) ;
await WaitForCluster ( client . Clusters , clusterId ) ;
await client . Clusters . Delete ( clusterId ) ;
// Job schedule
var schedule = new CronSchedule
{
QuartzCronExpression = " 0 0 9 ? * MON-FRI " ,
TimezoneId = " Europe/London " ,
PauseStatus = PauseStatus . UNPAUSED
} ;
// Run with a job cluster
var newCluster = ClusterAttributes . GetNewClusterConfiguration ( )
. WithClusterMode ( ClusterMode . SingleNode )
. WithNodeType ( NodeTypes . Standard_D3_v2 )
. WithRuntimeVersion ( RuntimeVersions . Runtime_10_4 ) ;
// Create job settings
var jobSettings = new JobSettings
{
MaxConcurrentRuns = 1 ,
Schedule = schedule ,
Name = " Sample Job "
} ;
// Adding 3 tasks to the job settings.
var task1 = jobSettings . AddTask ( " task1 " , new NotebookTask { NotebookPath = SampleNotebookPath } )
. WithDescription ( " Sample Job - task1 " )
. WithNewCluster ( newCluster ) ;
var task2 = jobSettings . AddTask ( " task2 " , new NotebookTask { NotebookPath = SampleNotebookPath } )
. WithDescription ( " Sample Job - task2 " )
. WithNewCluster ( newCluster ) ;
jobSettings . AddTask ( " task3 " , new NotebookTask { NotebookPath = SampleNotebookPath } , new [ ] { task1 , task2 } )
. WithDescription ( " Sample Job - task3 " )
. WithNewCluster ( newCluster ) ;
// Create the job.
Console . WriteLine ( " Creating new job " ) ;
var jobId = await client . Jobs . Create ( jobSettings ) ;
Console . WriteLine ( " Job created: {0} " , jobId ) ;
// Start the job and retrieve the run id.
Console . WriteLine ( " Run now: {0} " , jobId ) ;
var runId = await client . Jobs . RunNow ( jobId ) ;
using Policy = Polly . Policy ;
static async Task WaitForRun ( IJobsApi jobClient , long runId , int pollIntervalSeconds = 15 )
{
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. OrResult < RunState > ( state =>
state . LifeCycleState is RunLifeCycleState . PENDING or RunLifeCycleState . RUNNING
or RunLifeCycleState . TERMINATING )
. WaitAndRetryForeverAsync (
_ => TimeSpan . FromSeconds ( pollIntervalSeconds ) ,
( delegateResult , _ ) =>
{
if ( delegateResult . Exception != null )
{
Console . WriteLine (
$" [ { DateTime . UtcNow : s } ] Failed to query run - { delegateResult . Exception } " ) ;
}
} ) ;
await retryPolicy . ExecuteAsync ( async ( ) =>
{
var ( run , _ ) = await jobClient . RunsGet ( runId ) ;
Console . WriteLine (
$" [ { DateTime . UtcNow : s } ] Run: { runId } t LifeCycleState: { run . State . LifeCycleState } t ResultState: { run . State . ResultState } t Completed: { run . IsCompleted } "
) ;
return run . State ;
} ) ;
}
await WaitForRun ( client . Jobs , runId ) ;
var ( run , _ ) = await client . Jobs . RunsGet ( runId ) ;
foreach ( var runTask in run . Tasks )
{
var viewItems = await client . Jobs . RunsExport ( runTask . RunId ) ;
foreach ( var viewItem in viewItems )
{
Console . WriteLine ( $" Exported view item from run { runTask . RunId } , task " { runTask . TaskKey } " , view " { viewItem . Name } " " ) ;
Console . WriteLine ( " ==================== " ) ;
Console . WriteLine ( viewItem . Content [ .. 200 ] + " ... " ) ;
Console . WriteLine ( " ==================== " ) ;
}
}
創建秘密範圍
const string scope = " SampleScope " ;
await client . Secrets . CreateScope ( scope , null ) ;
創建文字秘密
var secretName = " secretkey.text " ;
await client . Secrets . PutSecret ( " secret text " , scope , secretName ) ;
創建二進制秘密
var secretName = " secretkey.bin " ;
await client . Secrets . PutSecret ( new byte [ ] { 0x01 , 0x02 , 0x03 , 0x04 } , scope , secretName ) ;
clusters/create
, jobs/run-now
和jobs/runs/submit
API支持IDEMPOTENCY令牌。保證請求的勢力是可選的令牌。如果已經存在帶有令牌的資源(集群或運行),則該請求不會創建新資源,而是返回現有資源的ID。
如果您指定了diDempotency代幣,則在失敗後,您可以重試,直到請求成功。 Databricks保證使用該勢力令牌啟動一個資源。
以下代碼說明瞭如何使用Polly使用idempotency_token
重試該請求(如果請求失敗)。
using Polly ;
double retryIntervalSec = 15 ;
string idempotencyToken = Guid . NewGuid ( ) . ToString ( ) ;
var clusterInfo = ClusterAttributes . GetNewClusterConfiguration ( " my-cluster " )
. WithNodeType ( " Standard_D3_v2 " )
. WithNumberOfWorkers ( 25 )
. WithRuntimeVersion ( RuntimeVersions . Runtime_7_3 ) ;
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . ServiceUnavailable )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. WaitAndRetryForeverAsync ( _ => TimeSpan . FromSeconds ( retryIntervalSec ) ) ;
var clusterId = await retryPolicy . ExecuteAsync ( async ( ) => await client . Clusters . Create ( clusterInfo , idempotencyToken ) ) ;
庫的V2目標.NET 6運行時。
重新設計了Jobs API,以與REST API的2.1版對齊。
在上一個版本中,作業API僅支持每個作業的單個任務。新的作業API支持每個作業的多個任務,其中任務表示為DAG。
新版本支持另外兩種類型的任務:Python Wheel任務和Delta Live Tables Pipeline Task。
該項目歡迎貢獻和建議。大多數捐款要求您同意撰寫貢獻者許可協議(CLA),宣布您有權並實際上授予我們使用您的貢獻的權利。有關詳細信息,請訪問Microsoft貢獻者許可協議(CLA)。
當您提交拉動請求時,CLA機器人將自動確定您是否需要提供CLA並適當裝飾PR(例如,標籤,評論)。只需按照機器人提供的說明即可。您只需要使用我們的CLA在所有存儲庫中進行一次。
該項目採用了Microsoft開源的行為代碼。有關更多信息,請參見《行為守則常見問題守則》或與其他問題或評論聯繫[email protected]。