Клиентская библиотека Azure Databricks предлагает удобный интерфейс для автоматизации вашего рабочего пространства Dazure Databricks через API Databricks Databricks Databricks.
Реализация этой библиотеки основана на REST API версии 2.0 и выше.
Главная филиал для версии 2. Версия 1.1 (стабильная) находится в филиале «Выпуски/1.1».
У вас должны быть личные токены доступа (PAT) или токены Azure Active Directory (Token), чтобы получить доступ к API DataBricks REST.
REST API | Версия | Описание |
---|---|---|
Кластеры | 2.0 | API кластеров позволяет создавать, запускать, редактировать, перечислить, прекращать и удалять кластеры. |
Рабочие места | 2.1 | API JOBS позволяет программно управлять заданиями Azure DataBricks. |
DBFS | 2.0 | API DBFS - это API DataBricks, которая позволяет проще взаимодействовать с различными источниками данных без необходимости включать ваши учетные данные каждый раз, когда вы читаете файл. |
Секреты | 2.0 | API Secrets позволяет управлять секретами, секретными областями и разрешениями доступа. |
Группа | 2.0 | API группы позволяет вам управлять группами пользователей. |
Библиотеки | 2.0 | API библиотеки позволяет устанавливать и удалять библиотеки и получить статус библиотек в кластере. |
Токен | 2.0 | Token API позволяет вам создавать, перечислить и отозвать токены, которые можно использовать для аутентификации и доступа к API Databricks Databricks. |
Рабочее пространство | 2.0 | API Workspace позволяет перечислять, импортировать, экспортировать и удалять ноутбуки и папки. |
Экземпляр | 2.0 | API API Pools Extance позволяет создавать, редактировать, удалять и перечислить пулы экземпляров. |
Разрешения | 2.0 | API разрешений позволяет вам управлять разрешениями для токена, кластера, пула, работы, Delta Live Tables Pipeline, ноутбука, каталога, эксперимента MLFLOW, зарегистрированной модели MLFLOW, политик SQL, репо и кластера. |
Кластерные политики | 2.0 | API Cluster Policies позволяет вам создавать, перечислить и редактировать политики кластера. |
Глобальные сценарии инициирования | 2.0 | API Global Init Scripts позволяет Azure Databricks Administrations добавлять сценарии инициализации глобальных кластеров безопасным и контролируемым образом. |
Склады SQL | 2.0 | API SQL Warehouses позволяет управлять вычислительными ресурсами, которые позволяют запускать команды SQL на объектах данных в DataBricks SQL. |
Репо | 2.0 | API RepoS позволяет пользователям управлять своими git Repos. Пользователи могут использовать API для доступа ко всем репозициям, которые они управляют разрешениями. |
Трубопроводы (Delta Live Tables) | 2.0 | Delta Live Tables API позволяет вам создавать, редактировать, удалять, запустить и просмотреть подробности о трубопроводах. |
Ознакомьтесь с образцом проекта для более подробных использования.
В следующих примерах переменная baseUrl
должна быть установлена token
URL-адрес базового пространства, который выглядит как https://adb-<workspace-id>.<random-number>.azuredatabricks.net
DataBricks личный токен доступа.
using ( var client = DatabricksClient . CreateClient ( baseUrl , token ) )
{
// ...
}
var clusterConfig = ClusterAttributes
. GetNewClusterConfiguration ( " Sample cluster " )
. WithRuntimeVersion ( RuntimeVersions . Runtime_10_4 )
. WithAutoScale ( 3 , 7 )
. WithAutoTermination ( 30 )
. WithClusterLogConf ( " dbfs:/logs/ " )
. WithNodeType ( NodeTypes . Standard_D3_v2 )
. WithClusterMode ( ClusterMode . SingleNode ) ;
var clusterId = await client . Clusters . Create ( clusterConfig ) ;
using Policy = Polly . Policy ;
static async Task WaitForCluster ( IClustersApi clusterClient , string clusterId , int pollIntervalSeconds = 15 )
{
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. OrResult < ClusterInfo > ( info => info . State is not ( ClusterState . RUNNING or ClusterState . ERROR or ClusterState . TERMINATED ) )
. WaitAndRetryForeverAsync (
_ => TimeSpan . FromSeconds ( pollIntervalSeconds ) ,
( delegateResult , _ ) =>
{
if ( delegateResult . Exception != null )
{
Console . WriteLine ( $" [ { DateTime . UtcNow : s } ] Failed to query cluster info - { delegateResult . Exception } " ) ;
}
} ) ;
await retryPolicy . ExecuteAsync ( async ( ) =>
{
var info = await clusterClient . Get ( clusterId ) ;
Console . WriteLine ( $" [ { DateTime . UtcNow : s } ] Cluster: { clusterId } t State: { info . State } t Message: { info . StateMessage } " ) ;
return info ;
} ) ;
}
await WaitForCluster ( client . Clusters , clusterId ) ;
await client . Clusters . Terminate ( clusterId ) ;
await WaitForCluster ( client . Clusters , clusterId ) ;
await client . Clusters . Delete ( clusterId ) ;
// Job schedule
var schedule = new CronSchedule
{
QuartzCronExpression = " 0 0 9 ? * MON-FRI " ,
TimezoneId = " Europe/London " ,
PauseStatus = PauseStatus . UNPAUSED
} ;
// Run with a job cluster
var newCluster = ClusterAttributes . GetNewClusterConfiguration ( )
. WithClusterMode ( ClusterMode . SingleNode )
. WithNodeType ( NodeTypes . Standard_D3_v2 )
. WithRuntimeVersion ( RuntimeVersions . Runtime_10_4 ) ;
// Create job settings
var jobSettings = new JobSettings
{
MaxConcurrentRuns = 1 ,
Schedule = schedule ,
Name = " Sample Job "
} ;
// Adding 3 tasks to the job settings.
var task1 = jobSettings . AddTask ( " task1 " , new NotebookTask { NotebookPath = SampleNotebookPath } )
. WithDescription ( " Sample Job - task1 " )
. WithNewCluster ( newCluster ) ;
var task2 = jobSettings . AddTask ( " task2 " , new NotebookTask { NotebookPath = SampleNotebookPath } )
. WithDescription ( " Sample Job - task2 " )
. WithNewCluster ( newCluster ) ;
jobSettings . AddTask ( " task3 " , new NotebookTask { NotebookPath = SampleNotebookPath } , new [ ] { task1 , task2 } )
. WithDescription ( " Sample Job - task3 " )
. WithNewCluster ( newCluster ) ;
// Create the job.
Console . WriteLine ( " Creating new job " ) ;
var jobId = await client . Jobs . Create ( jobSettings ) ;
Console . WriteLine ( " Job created: {0} " , jobId ) ;
// Start the job and retrieve the run id.
Console . WriteLine ( " Run now: {0} " , jobId ) ;
var runId = await client . Jobs . RunNow ( jobId ) ;
using Policy = Polly . Policy ;
static async Task WaitForRun ( IJobsApi jobClient , long runId , int pollIntervalSeconds = 15 )
{
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. OrResult < RunState > ( state =>
state . LifeCycleState is RunLifeCycleState . PENDING or RunLifeCycleState . RUNNING
or RunLifeCycleState . TERMINATING )
. WaitAndRetryForeverAsync (
_ => TimeSpan . FromSeconds ( pollIntervalSeconds ) ,
( delegateResult , _ ) =>
{
if ( delegateResult . Exception != null )
{
Console . WriteLine (
$" [ { DateTime . UtcNow : s } ] Failed to query run - { delegateResult . Exception } " ) ;
}
} ) ;
await retryPolicy . ExecuteAsync ( async ( ) =>
{
var ( run , _ ) = await jobClient . RunsGet ( runId ) ;
Console . WriteLine (
$" [ { DateTime . UtcNow : s } ] Run: { runId } t LifeCycleState: { run . State . LifeCycleState } t ResultState: { run . State . ResultState } t Completed: { run . IsCompleted } "
) ;
return run . State ;
} ) ;
}
await WaitForRun ( client . Jobs , runId ) ;
var ( run , _ ) = await client . Jobs . RunsGet ( runId ) ;
foreach ( var runTask in run . Tasks )
{
var viewItems = await client . Jobs . RunsExport ( runTask . RunId ) ;
foreach ( var viewItem in viewItems )
{
Console . WriteLine ( $" Exported view item from run { runTask . RunId } , task " { runTask . TaskKey } " , view " { viewItem . Name } " " ) ;
Console . WriteLine ( " ==================== " ) ;
Console . WriteLine ( viewItem . Content [ .. 200 ] + " ... " ) ;
Console . WriteLine ( " ==================== " ) ;
}
}
Создание секретного масштаба
const string scope = " SampleScope " ;
await client . Secrets . CreateScope ( scope , null ) ;
Создать текстовый секрет
var secretName = " secretkey.text " ;
await client . Secrets . PutSecret ( " secret text " , scope , secretName ) ;
Создать бинарный секрет
var secretName = " secretkey.bin " ;
await client . Secrets . PutSecret ( new byte [ ] { 0x01 , 0x02 , 0x03 , 0x04 } , scope , secretName ) ;
clusters/create
, jobs/run-now
и jobs/runs/submit
токен поддержки API-интерфейсов. Необязательно токен гарантировать идентичность запросов. Если ресурс (кластер или прогон) с предоставленным токеном уже существует, запрос не создает новый ресурс, а вместо этого возвращает идентификатор существующего ресурса.
Если вы указываете токен Idempotency, после сбоя вы сможете повторно повторить, пока запрос не достигнет успеха. DataBricks гарантирует, что именно один ресурс запускается с этим токеном Idempotency.
Следующий код иллюстрирует, как использовать Polly для повторения запроса с помощью idempotency_token
, если запрос не удастся.
using Polly ;
double retryIntervalSec = 15 ;
string idempotencyToken = Guid . NewGuid ( ) . ToString ( ) ;
var clusterInfo = ClusterAttributes . GetNewClusterConfiguration ( " my-cluster " )
. WithNodeType ( " Standard_D3_v2 " )
. WithNumberOfWorkers ( 25 )
. WithRuntimeVersion ( RuntimeVersions . Runtime_7_3 ) ;
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . ServiceUnavailable )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. WaitAndRetryForeverAsync ( _ => TimeSpan . FromSeconds ( retryIntervalSec ) ) ;
var clusterId = await retryPolicy . ExecuteAsync ( async ( ) => await client . Clusters . Create ( clusterInfo , idempotencyToken ) ) ;
V2 библиотеки целевых .NET 6 Средства выполнения.
API рабочих мест был перепроектирован, чтобы соответствовать версии 2.1 REST API.
В предыдущей версии API Jobs поддерживает только одну задачу на работу. Новый API Jobs поддерживает несколько задач на работу, где задачи представлены как DAG.
Новая версия поддерживает еще два типа задачи: задача Python Wheel и Delta Live Tables Tables.
Этот проект приветствует вклады и предложения. Большинство взносов требуют, чтобы вы согласились с лицензионным соглашением о участнике (CLA), заявив, что вы имеете право и фактически предоставить нам права на использование вашего вклада. Для получения подробной информации, посетите лицензионное соглашение Microsoft Antrivor (CLA).
Когда вы отправляете запрос на привлечение, CLA-бот автоматически определит, нужно ли вам предоставить CLA и должным образом украсить PR (например, метка, комментарий). Просто следуйте инструкциям, предоставленным ботом. Вам нужно будет сделать это только один раз во всех репо, используя наш CLA.
Этот проект принял код поведения с открытым исходным кодом Microsoft. Для получения дополнительной информации см. Code of Perving FAQ или свяжитесь с [email protected] с любыми дополнительными вопросами или комментариями.