Azure Databricksクライアントライブラリは、Azure Databricks Rest APIを介してAzure Databricksワークスペースを自動化するための便利なインターフェイスを提供します。
このライブラリの実装は、REST APIバージョン2.0以上に基づいています。
マスターブランチはバージョン2用です。バージョン1.1(stable)はリリース/1.1ブランチにあります。
DataBricks REST APIにアクセスするには、個人的なアクセストークン(PAT)またはAzure Active Directoryトークン(AADトークン)が必要です。
REST API | バージョン | 説明 |
---|---|---|
クラスター | 2.0 | クラスターAPIを使用すると、クラスターを作成、開始、編集、リスト、終了、削除できます。 |
仕事 | 2.1 | Jobs APIを使用すると、Azure Databricksジョブをプログラム的に管理できます。 |
DBFS | 2.0 | DBFS APIは、ファイルを読むたびに資格情報を含めることなく、さまざまなデータソースと簡単にやり取りできるようにするDataBricks APIです。 |
秘密 | 2.0 | Secrets APIを使用すると、秘密、秘密のスコープ、アクセス権限を管理できます。 |
グループ | 2.0 | グループAPIでは、ユーザーのグループを管理できます。 |
ライブラリ | 2.0 | ライブラリAPIを使用すると、ライブラリをインストールおよびアンインストールし、クラスターにライブラリのステータスを取得できます。 |
トークン | 2.0 | トークンAPIを使用すると、Azure Databricks Rest APIを認証およびアクセスするために使用できるトークンを作成、リスト、および取り消しできます。 |
ワークスペース | 2.0 | ワークスペースAPIを使用すると、ノートブックとフォルダーをリスト、インポート、エクスポート、削除できます。 |
InstancePool | 2.0 | インスタンスプールAPIを使用すると、インスタンスプールを作成、編集、削除、リストできます。 |
権限 | 2.0 | Permissions APIを使用すると、トークン、クラスター、プール、ジョブ、デルタライブテーブルパイプライン、ノートブック、ディレクトリ、MLFLOW実験、MLFLOW登録モデル、SQLウェアハウス、レポ、クラスターポリシーのアクセス許可を管理できます。 |
クラスターポリシー | 2.0 | クラスターポリシーAPIを使用すると、クラスターポリシーを作成、リスト、編集できます。 |
グローバルINITスクリプト | 2.0 | グローバルINITスクリプトAPIにより、Azure Databricks管理者は、グローバルクラスター初期化スクリプトを安全で制御された方法で追加できます。 |
SQLウェアハウス | 2.0 | SQLウェアハウスAPIを使用すると、DataBricks SQL内のデータオブジェクトでSQLコマンドを実行できるコンピューティングリソースを管理できます。 |
レポス | 2.0 | Repos APIを使用すると、ユーザーはGit Reposを管理できます。ユーザーはAPIを使用して、アクセス許可を管理するすべてのリポジトリにアクセスできます。 |
パイプライン(デルタライブテーブル) | 2.0 | Delta Live Tables APIを使用すると、パイプラインの詳細を作成、編集、削除、開始、および表示できます。 |
より詳細な使用については、サンプルプロジェクトをご覧ください。
次の例では、 baseUrl
変数token
ワークスペースベースURLに設定する必要があります。これはhttps://adb-<workspace-id>.<random-number>.azuredatabricks.net
Databricks個人アクセストークン。
using ( var client = DatabricksClient . CreateClient ( baseUrl , token ) )
{
// ...
}
var clusterConfig = ClusterAttributes
. GetNewClusterConfiguration ( " Sample cluster " )
. WithRuntimeVersion ( RuntimeVersions . Runtime_10_4 )
. WithAutoScale ( 3 , 7 )
. WithAutoTermination ( 30 )
. WithClusterLogConf ( " dbfs:/logs/ " )
. WithNodeType ( NodeTypes . Standard_D3_v2 )
. WithClusterMode ( ClusterMode . SingleNode ) ;
var clusterId = await client . Clusters . Create ( clusterConfig ) ;
using Policy = Polly . Policy ;
static async Task WaitForCluster ( IClustersApi clusterClient , string clusterId , int pollIntervalSeconds = 15 )
{
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. OrResult < ClusterInfo > ( info => info . State is not ( ClusterState . RUNNING or ClusterState . ERROR or ClusterState . TERMINATED ) )
. WaitAndRetryForeverAsync (
_ => TimeSpan . FromSeconds ( pollIntervalSeconds ) ,
( delegateResult , _ ) =>
{
if ( delegateResult . Exception != null )
{
Console . WriteLine ( $" [ { DateTime . UtcNow : s } ] Failed to query cluster info - { delegateResult . Exception } " ) ;
}
} ) ;
await retryPolicy . ExecuteAsync ( async ( ) =>
{
var info = await clusterClient . Get ( clusterId ) ;
Console . WriteLine ( $" [ { DateTime . UtcNow : s } ] Cluster: { clusterId } t State: { info . State } t Message: { info . StateMessage } " ) ;
return info ;
} ) ;
}
await WaitForCluster ( client . Clusters , clusterId ) ;
await client . Clusters . Terminate ( clusterId ) ;
await WaitForCluster ( client . Clusters , clusterId ) ;
await client . Clusters . Delete ( clusterId ) ;
// Job schedule
var schedule = new CronSchedule
{
QuartzCronExpression = " 0 0 9 ? * MON-FRI " ,
TimezoneId = " Europe/London " ,
PauseStatus = PauseStatus . UNPAUSED
} ;
// Run with a job cluster
var newCluster = ClusterAttributes . GetNewClusterConfiguration ( )
. WithClusterMode ( ClusterMode . SingleNode )
. WithNodeType ( NodeTypes . Standard_D3_v2 )
. WithRuntimeVersion ( RuntimeVersions . Runtime_10_4 ) ;
// Create job settings
var jobSettings = new JobSettings
{
MaxConcurrentRuns = 1 ,
Schedule = schedule ,
Name = " Sample Job "
} ;
// Adding 3 tasks to the job settings.
var task1 = jobSettings . AddTask ( " task1 " , new NotebookTask { NotebookPath = SampleNotebookPath } )
. WithDescription ( " Sample Job - task1 " )
. WithNewCluster ( newCluster ) ;
var task2 = jobSettings . AddTask ( " task2 " , new NotebookTask { NotebookPath = SampleNotebookPath } )
. WithDescription ( " Sample Job - task2 " )
. WithNewCluster ( newCluster ) ;
jobSettings . AddTask ( " task3 " , new NotebookTask { NotebookPath = SampleNotebookPath } , new [ ] { task1 , task2 } )
. WithDescription ( " Sample Job - task3 " )
. WithNewCluster ( newCluster ) ;
// Create the job.
Console . WriteLine ( " Creating new job " ) ;
var jobId = await client . Jobs . Create ( jobSettings ) ;
Console . WriteLine ( " Job created: {0} " , jobId ) ;
// Start the job and retrieve the run id.
Console . WriteLine ( " Run now: {0} " , jobId ) ;
var runId = await client . Jobs . RunNow ( jobId ) ;
using Policy = Polly . Policy ;
static async Task WaitForRun ( IJobsApi jobClient , long runId , int pollIntervalSeconds = 15 )
{
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. OrResult < RunState > ( state =>
state . LifeCycleState is RunLifeCycleState . PENDING or RunLifeCycleState . RUNNING
or RunLifeCycleState . TERMINATING )
. WaitAndRetryForeverAsync (
_ => TimeSpan . FromSeconds ( pollIntervalSeconds ) ,
( delegateResult , _ ) =>
{
if ( delegateResult . Exception != null )
{
Console . WriteLine (
$" [ { DateTime . UtcNow : s } ] Failed to query run - { delegateResult . Exception } " ) ;
}
} ) ;
await retryPolicy . ExecuteAsync ( async ( ) =>
{
var ( run , _ ) = await jobClient . RunsGet ( runId ) ;
Console . WriteLine (
$" [ { DateTime . UtcNow : s } ] Run: { runId } t LifeCycleState: { run . State . LifeCycleState } t ResultState: { run . State . ResultState } t Completed: { run . IsCompleted } "
) ;
return run . State ;
} ) ;
}
await WaitForRun ( client . Jobs , runId ) ;
var ( run , _ ) = await client . Jobs . RunsGet ( runId ) ;
foreach ( var runTask in run . Tasks )
{
var viewItems = await client . Jobs . RunsExport ( runTask . RunId ) ;
foreach ( var viewItem in viewItems )
{
Console . WriteLine ( $" Exported view item from run { runTask . RunId } , task " { runTask . TaskKey } " , view " { viewItem . Name } " " ) ;
Console . WriteLine ( " ==================== " ) ;
Console . WriteLine ( viewItem . Content [ .. 200 ] + " ... " ) ;
Console . WriteLine ( " ==================== " ) ;
}
}
秘密の範囲を作成します
const string scope = " SampleScope " ;
await client . Secrets . CreateScope ( scope , null ) ;
テキストの秘密を作成します
var secretName = " secretkey.text " ;
await client . Secrets . PutSecret ( " secret text " , scope , secretName ) ;
バイナリシークレットを作成します
var secretName = " secretkey.bin " ;
await client . Secrets . PutSecret ( new byte [ ] { 0x01 , 0x02 , 0x03 , 0x04 } , scope , secretName ) ;
clusters/create
、 jobs/run-now
およびjobs/runs/submit
APISサポートidempotencyトークン。リクエストの識別力を保証するためのオプションのトークンです。提供されたトークンが既に存在するリソース(クラスターまたは実行)が既に存在する場合、リクエストは新しいリソースを作成せず、代わりに既存のリソースのIDを返します。
iDempotencyトークンを指定した場合、障害時にリクエストが成功するまで再試行できます。 Databricksは、そのiDempotencyトークンで正確に1つのリソースが起動されることを保証します。
次のコードは、リクエストが失敗した場合にidempotency_token
でPollyを使用してリクエストを再試行する方法を示しています。
using Polly ;
double retryIntervalSec = 15 ;
string idempotencyToken = Guid . NewGuid ( ) . ToString ( ) ;
var clusterInfo = ClusterAttributes . GetNewClusterConfiguration ( " my-cluster " )
. WithNodeType ( " Standard_D3_v2 " )
. WithNumberOfWorkers ( 25 )
. WithRuntimeVersion ( RuntimeVersions . Runtime_7_3 ) ;
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . ServiceUnavailable )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. WaitAndRetryForeverAsync ( _ => TimeSpan . FromSeconds ( retryIntervalSec ) ) ;
var clusterId = await retryPolicy . ExecuteAsync ( async ( ) => await client . Clusters . Create ( clusterInfo , idempotencyToken ) ) ;
ライブラリのV2は、.NET 6ランタイムをターゲットにします。
Jobs APIは、REST APIのバージョン2.1に合わせて再設計されました。
以前のバージョンでは、Jobs APIはジョブごとの単一タスクのみをサポートしています。新しいJobs APIは、タスクがDAGとして表されるジョブごとの複数のタスクをサポートしています。
新しいバージョンは、さらに2種類のタスクをサポートしています。Pythonホイールタスクとデルタライブテーブルパイプラインタスクです。
このプロジェクトは、貢献と提案を歓迎します。ほとんどの貢献では、貢献者ライセンス契約(CLA)に同意する必要があります。詳細については、Microsoft Contributorライセンス契約(CLA)をご覧ください。
プルリクエストを送信すると、CLAボットはCLAを提供し、PRを適切に飾る必要があるかどうかを自動的に決定します(例:ラベル、コメント)。ボットが提供する指示に従うだけです。 CLAを使用して、すべてのレポでこれを1回だけ行う必要があります。
このプロジェクトは、Microsoftのオープンソース行動規範を採用しています。詳細については、FAQのコードを参照するか、追加の質問やコメントについては[email protected]にお問い合わせください。