Azure Databricks Client Library는 Azure Databricks REST API를 통해 Azure Databricks 작업 영역을 자동화하기위한 편리한 인터페이스를 제공합니다.
이 라이브러리의 구현은 REST API 버전 2.0 이상을 기반으로합니다.
마스터 브랜치는 버전 2입니다. 버전 1.1 (stable)은 릴리스/1.1 분기에 있습니다.
Databricks REST API에 액세스하려면 개인 액세스 토큰 (PAT) 또는 ADORE Active Directory Tokens (AAD Token)가 있어야합니다.
휴식 API | 버전 | 설명 |
---|---|---|
클러스터 | 2.0 | 클러스터 API를 사용하면 클러스터를 작성, 시작, 편집, 목록, 종료 및 삭제할 수 있습니다. |
일자리 | 2.1 | Jobs API를 사용하면 Azure Databricks 작업을 프로그래밍 방식으로 관리 할 수 있습니다. |
DBFS | 2.0 | DBFS API는 파일을 읽을 때마다 자격 증명을 포함시키지 않고도 다양한 데이터 소스와 간단하게 상호 작용할 수있는 Databricks API입니다. |
기미 | 2.0 | 비밀 API를 사용하면 비밀, 비밀 범위 및 액세스 권한을 관리 할 수 있습니다. |
여러 떼 | 2.0 | 그룹 API를 사용하면 사용자 그룹을 관리 할 수 있습니다. |
도서관 | 2.0 | 라이브러리 API를 사용하면 라이브러리를 설치하고 제거하고 클러스터에서 라이브러리 상태를 얻을 수 있습니다. |
토큰 | 2.0 | Token API를 사용하면 Azure Databricks REST API를 인증하고 액세스하는 데 사용할 수있는 토큰을 작성, 목록 및 취소 할 수 있습니다. |
작업 공간 | 2.0 | 작업 공간 API를 사용하면 노트북 및 폴더를 나열, 가져 오기, 내보내기 및 삭제할 수 있습니다. |
인스턴스 풀 | 2.0 | 인스턴스 Pools API를 사용하면 인스턴스 풀을 작성, 편집, 삭제 및 목록 할 수 있습니다. |
권한 | 2.0 | 권한 API를 사용하면 토큰, 클러스터, 풀, 작업, 델타 라이브 테이블 파이프 라인, 노트북, 디렉토리, MLFLOW 실험, MLFLOW 등록 모델, SQL 창고, 레포 및 클러스터 정책에 대한 권한을 관리 할 수 있습니다. |
클러스터 정책 | 2.0 | 클러스터 정책 API를 사용하면 클러스터 정책을 작성, 나열 및 편집 할 수 있습니다. |
글로벌 초기 스크립트 | 2.0 | Global Init Scripts API를 사용하면 Azure Databricks 관리자는 안전하고 제어 된 방식으로 글로벌 클러스터 초기화 스크립트를 추가 할 수 있습니다. |
SQL 창고 | 2.0 | SQL 창고 API를 사용하면 Databricks SQL 내 데이터 객체에서 SQL 명령을 실행할 수있는 컴퓨팅 리소스를 관리 할 수 있습니다. |
레포지스 | 2.0 | REPOS API를 사용하면 사용자가 GIT 리포를 관리 할 수 있습니다. 사용자는 API를 사용하여 권한을 관리하는 모든 저장소에 액세스 할 수 있습니다. |
파이프 라인 (델타 라이브 테이블) | 2.0 | Delta Live Tables API를 사용하면 파이프 라인에 대한 세부 정보를 작성, 편집, 삭제, 시작 및 볼 수 있습니다. |
보다 자세한 사용법은 샘플 프로젝트를 확인하십시오.
다음 예제에서는 baseUrl
변수 token
https://adb-<workspace-id>.<random-number>.azuredatabricks.net
Databricks 개인 액세스 토큰.
using ( var client = DatabricksClient . CreateClient ( baseUrl , token ) )
{
// ...
}
var clusterConfig = ClusterAttributes
. GetNewClusterConfiguration ( " Sample cluster " )
. WithRuntimeVersion ( RuntimeVersions . Runtime_10_4 )
. WithAutoScale ( 3 , 7 )
. WithAutoTermination ( 30 )
. WithClusterLogConf ( " dbfs:/logs/ " )
. WithNodeType ( NodeTypes . Standard_D3_v2 )
. WithClusterMode ( ClusterMode . SingleNode ) ;
var clusterId = await client . Clusters . Create ( clusterConfig ) ;
using Policy = Polly . Policy ;
static async Task WaitForCluster ( IClustersApi clusterClient , string clusterId , int pollIntervalSeconds = 15 )
{
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. OrResult < ClusterInfo > ( info => info . State is not ( ClusterState . RUNNING or ClusterState . ERROR or ClusterState . TERMINATED ) )
. WaitAndRetryForeverAsync (
_ => TimeSpan . FromSeconds ( pollIntervalSeconds ) ,
( delegateResult , _ ) =>
{
if ( delegateResult . Exception != null )
{
Console . WriteLine ( $" [ { DateTime . UtcNow : s } ] Failed to query cluster info - { delegateResult . Exception } " ) ;
}
} ) ;
await retryPolicy . ExecuteAsync ( async ( ) =>
{
var info = await clusterClient . Get ( clusterId ) ;
Console . WriteLine ( $" [ { DateTime . UtcNow : s } ] Cluster: { clusterId } t State: { info . State } t Message: { info . StateMessage } " ) ;
return info ;
} ) ;
}
await WaitForCluster ( client . Clusters , clusterId ) ;
await client . Clusters . Terminate ( clusterId ) ;
await WaitForCluster ( client . Clusters , clusterId ) ;
await client . Clusters . Delete ( clusterId ) ;
// Job schedule
var schedule = new CronSchedule
{
QuartzCronExpression = " 0 0 9 ? * MON-FRI " ,
TimezoneId = " Europe/London " ,
PauseStatus = PauseStatus . UNPAUSED
} ;
// Run with a job cluster
var newCluster = ClusterAttributes . GetNewClusterConfiguration ( )
. WithClusterMode ( ClusterMode . SingleNode )
. WithNodeType ( NodeTypes . Standard_D3_v2 )
. WithRuntimeVersion ( RuntimeVersions . Runtime_10_4 ) ;
// Create job settings
var jobSettings = new JobSettings
{
MaxConcurrentRuns = 1 ,
Schedule = schedule ,
Name = " Sample Job "
} ;
// Adding 3 tasks to the job settings.
var task1 = jobSettings . AddTask ( " task1 " , new NotebookTask { NotebookPath = SampleNotebookPath } )
. WithDescription ( " Sample Job - task1 " )
. WithNewCluster ( newCluster ) ;
var task2 = jobSettings . AddTask ( " task2 " , new NotebookTask { NotebookPath = SampleNotebookPath } )
. WithDescription ( " Sample Job - task2 " )
. WithNewCluster ( newCluster ) ;
jobSettings . AddTask ( " task3 " , new NotebookTask { NotebookPath = SampleNotebookPath } , new [ ] { task1 , task2 } )
. WithDescription ( " Sample Job - task3 " )
. WithNewCluster ( newCluster ) ;
// Create the job.
Console . WriteLine ( " Creating new job " ) ;
var jobId = await client . Jobs . Create ( jobSettings ) ;
Console . WriteLine ( " Job created: {0} " , jobId ) ;
// Start the job and retrieve the run id.
Console . WriteLine ( " Run now: {0} " , jobId ) ;
var runId = await client . Jobs . RunNow ( jobId ) ;
using Policy = Polly . Policy ;
static async Task WaitForRun ( IJobsApi jobClient , long runId , int pollIntervalSeconds = 15 )
{
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. OrResult < RunState > ( state =>
state . LifeCycleState is RunLifeCycleState . PENDING or RunLifeCycleState . RUNNING
or RunLifeCycleState . TERMINATING )
. WaitAndRetryForeverAsync (
_ => TimeSpan . FromSeconds ( pollIntervalSeconds ) ,
( delegateResult , _ ) =>
{
if ( delegateResult . Exception != null )
{
Console . WriteLine (
$" [ { DateTime . UtcNow : s } ] Failed to query run - { delegateResult . Exception } " ) ;
}
} ) ;
await retryPolicy . ExecuteAsync ( async ( ) =>
{
var ( run , _ ) = await jobClient . RunsGet ( runId ) ;
Console . WriteLine (
$" [ { DateTime . UtcNow : s } ] Run: { runId } t LifeCycleState: { run . State . LifeCycleState } t ResultState: { run . State . ResultState } t Completed: { run . IsCompleted } "
) ;
return run . State ;
} ) ;
}
await WaitForRun ( client . Jobs , runId ) ;
var ( run , _ ) = await client . Jobs . RunsGet ( runId ) ;
foreach ( var runTask in run . Tasks )
{
var viewItems = await client . Jobs . RunsExport ( runTask . RunId ) ;
foreach ( var viewItem in viewItems )
{
Console . WriteLine ( $" Exported view item from run { runTask . RunId } , task " { runTask . TaskKey } " , view " { viewItem . Name } " " ) ;
Console . WriteLine ( " ==================== " ) ;
Console . WriteLine ( viewItem . Content [ .. 200 ] + " ... " ) ;
Console . WriteLine ( " ==================== " ) ;
}
}
비밀 범위 생성
const string scope = " SampleScope " ;
await client . Secrets . CreateScope ( scope , null ) ;
텍스트 비밀을 만듭니다
var secretName = " secretkey.text " ;
await client . Secrets . PutSecret ( " secret text " , scope , secretName ) ;
이진 비밀을 만듭니다
var secretName = " secretkey.bin " ;
await client . Secrets . PutSecret ( new byte [ ] { 0x01 , 0x02 , 0x03 , 0x04 } , scope , secretName ) ;
clusters/create
, jobs/run-now
및 jobs/runs/submit
API 지원 Idempotency 토큰. 요청의 Idempotency를 보장하는 것은 선택 사항입니다. 제공된 토큰이있는 리소스 (클러스터 또는 실행)가 이미 존재하는 경우 요청은 새 리소스를 생성하지 않고 대신 기존 리소스의 ID를 반환합니다.
Idempotency 토큰을 지정하면 실패시 요청이 성공할 때까지 다시 시도 할 수 있습니다. Databricks는 해당 Idempotency 토큰으로 정확히 하나의 리소스가 시작되도록 보장합니다.
다음 코드는 폴리를 사용하여 요청이 실패하면 idempotency_token
으로 요청을 다시 시도하는 방법을 보여줍니다.
using Polly ;
double retryIntervalSec = 15 ;
string idempotencyToken = Guid . NewGuid ( ) . ToString ( ) ;
var clusterInfo = ClusterAttributes . GetNewClusterConfiguration ( " my-cluster " )
. WithNodeType ( " Standard_D3_v2 " )
. WithNumberOfWorkers ( 25 )
. WithRuntimeVersion ( RuntimeVersions . Runtime_7_3 ) ;
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . ServiceUnavailable )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. WaitAndRetryForeverAsync ( _ => TimeSpan . FromSeconds ( retryIntervalSec ) ) ;
var clusterId = await retryPolicy . ExecuteAsync ( async ( ) => await client . Clusters . Create ( clusterInfo , idempotencyToken ) ) ;
라이브러리의 V2는 .NET 6 런타임을 대상으로합니다.
Jobs API는 REST API의 버전 2.1과 정렬되도록 재 설계되었습니다.
이전 버전에서 Jobs API는 작업 당 단일 작업 만 지원합니다. 새로운 Jobs API는 작업 당 여러 작업을 지원하며 작업은 DAG로 표시됩니다.
새 버전은 Python Wheel Task와 Delta Live Tables 파이프 라인 작업의 두 가지 유형의 작업을 지원합니다.
이 프로젝트는 기여와 제안을 환영합니다. 대부분의 기부금은 귀하가 귀하가 귀하의 기부금을 사용할 권리를 부여 할 권리가 있다고 선언하는 기고자 라이센스 계약 (CLA)에 동의해야합니다. 자세한 내용은 Microsoft 기고자 라이센스 계약 (CLA)을 방문하십시오.
풀 요청을 제출할 때 CLA-BOT은 CLA를 제공하고 PR을 적절하게 장식 해야하는지 자동으로 결정합니다 (예 : 레이블, 댓글). 봇이 제공 한 지침을 따르십시오. CLA를 사용하여 모든 저장소에서 한 번만이 작업을 수행하면됩니다.
이 프로젝트는 Microsoft 오픈 소스 행동 강령을 채택했습니다. 자세한 내용은 추가 질문이나 의견이 있으면 행동 강령 FAQ 또는 [email protected]에 문의하십시오.