A Biblioteca Cliente do Azure Databricks oferece uma interface conveniente para automatizar o espaço de trabalho do Azure Databricks através da API REST do Azure Databricks.
A implementação desta biblioteca é baseada na API Rest versão 2.0 e acima.
A filial principal é para a versão 2. Versão 1.1 (estável) está na ramificação de liberações/1.1.
Você deve ter tokens de acesso pessoal (PAT) ou tokens do Azure Active Directory (token AAD) para acessar a API REST DATABRICKS.
Rest API | Versão | Descrição |
---|---|---|
Clusters | 2.0 | A API de clusters permite criar, iniciar, editar, listar, encerrar e excluir clusters. |
Empregos | 2.1 | A API de Jobs permite gerenciar programaticamente os trabalhos do Azure Databricks. |
Dbfs | 2.0 | A API do DBFS é uma API do Databricks que simplifica interagir com várias fontes de dados sem precisar incluir suas credenciais toda vez que você ler um arquivo. |
Segredos | 2.0 | A API de segredos permite gerenciar segredos, escopos secretos e permissões de acesso. |
Grupos | 2.0 | A API de grupos permite gerenciar grupos de usuários. |
Bibliotecas | 2.0 | A API das bibliotecas permite que você instale e desinstale as bibliotecas e obtenha o status das bibliotecas em um cluster. |
Token | 2.0 | A API de token permite criar, listar e revogar tokens que podem ser usados para autenticar e acessar as APIs REST do Azure Databricks. |
Espaço de trabalho | 2.0 | A API do espaço de trabalho permite listar, importar, exportar e excluir notebooks e pastas. |
InstancePool | 2.0 | A API de Pools de instância permite criar, editar, excluir e listar pools de instância. |
Permissões | 2.0 | A API de permissões permite gerenciar permissões para token, cluster, piscina, emprego, pipeline de tabelas LIVE Delta, notebook, diretório, experimento de mlflow, modelo registrado em mlflow, armazém SQL, repo e políticas de cluster. |
Políticas de cluster | 2.0 | A API de políticas de cluster permite criar, listar e editar políticas de cluster. |
Scripts iniciados globais | 2.0 | A API global de scripts inits permite que os administradores do Azure Databricks adicionem scripts globais de inicialização de cluster de maneira segura e controlada. |
Armazéns SQL | 2.0 | A API SQL Warehouses permite gerenciar recursos de computação que permitem executar comandos SQL em objetos de dados no Databricks SQL. |
Repos | 2.0 | A API do Repos permite que os usuários gerenciem seus repositórios Git. Os usuários podem usar a API para acessar todos os repositórios nos quais gerenciam permissões. |
Pipelines (Delta Live Tables) | 2.0 | A API da Delta Live Tables permite criar, editar, excluir, iniciar e visualizar detalhes sobre pipelines. |
Confira o projeto de amostra para obter usos mais detalhados.
Nos exemplos a seguir, a variável baseUrl
deve ser definida como o URL da base do espaço de trabalho, que se parece com https://adb-<workspace-id>.<random-number>.azuredatabricks.net
, e a variável token
deve ser definida para o seu Databricks Token de acesso pessoal.
using ( var client = DatabricksClient . CreateClient ( baseUrl , token ) )
{
// ...
}
var clusterConfig = ClusterAttributes
. GetNewClusterConfiguration ( " Sample cluster " )
. WithRuntimeVersion ( RuntimeVersions . Runtime_10_4 )
. WithAutoScale ( 3 , 7 )
. WithAutoTermination ( 30 )
. WithClusterLogConf ( " dbfs:/logs/ " )
. WithNodeType ( NodeTypes . Standard_D3_v2 )
. WithClusterMode ( ClusterMode . SingleNode ) ;
var clusterId = await client . Clusters . Create ( clusterConfig ) ;
using Policy = Polly . Policy ;
static async Task WaitForCluster ( IClustersApi clusterClient , string clusterId , int pollIntervalSeconds = 15 )
{
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. OrResult < ClusterInfo > ( info => info . State is not ( ClusterState . RUNNING or ClusterState . ERROR or ClusterState . TERMINATED ) )
. WaitAndRetryForeverAsync (
_ => TimeSpan . FromSeconds ( pollIntervalSeconds ) ,
( delegateResult , _ ) =>
{
if ( delegateResult . Exception != null )
{
Console . WriteLine ( $" [ { DateTime . UtcNow : s } ] Failed to query cluster info - { delegateResult . Exception } " ) ;
}
} ) ;
await retryPolicy . ExecuteAsync ( async ( ) =>
{
var info = await clusterClient . Get ( clusterId ) ;
Console . WriteLine ( $" [ { DateTime . UtcNow : s } ] Cluster: { clusterId } t State: { info . State } t Message: { info . StateMessage } " ) ;
return info ;
} ) ;
}
await WaitForCluster ( client . Clusters , clusterId ) ;
await client . Clusters . Terminate ( clusterId ) ;
await WaitForCluster ( client . Clusters , clusterId ) ;
await client . Clusters . Delete ( clusterId ) ;
// Job schedule
var schedule = new CronSchedule
{
QuartzCronExpression = " 0 0 9 ? * MON-FRI " ,
TimezoneId = " Europe/London " ,
PauseStatus = PauseStatus . UNPAUSED
} ;
// Run with a job cluster
var newCluster = ClusterAttributes . GetNewClusterConfiguration ( )
. WithClusterMode ( ClusterMode . SingleNode )
. WithNodeType ( NodeTypes . Standard_D3_v2 )
. WithRuntimeVersion ( RuntimeVersions . Runtime_10_4 ) ;
// Create job settings
var jobSettings = new JobSettings
{
MaxConcurrentRuns = 1 ,
Schedule = schedule ,
Name = " Sample Job "
} ;
// Adding 3 tasks to the job settings.
var task1 = jobSettings . AddTask ( " task1 " , new NotebookTask { NotebookPath = SampleNotebookPath } )
. WithDescription ( " Sample Job - task1 " )
. WithNewCluster ( newCluster ) ;
var task2 = jobSettings . AddTask ( " task2 " , new NotebookTask { NotebookPath = SampleNotebookPath } )
. WithDescription ( " Sample Job - task2 " )
. WithNewCluster ( newCluster ) ;
jobSettings . AddTask ( " task3 " , new NotebookTask { NotebookPath = SampleNotebookPath } , new [ ] { task1 , task2 } )
. WithDescription ( " Sample Job - task3 " )
. WithNewCluster ( newCluster ) ;
// Create the job.
Console . WriteLine ( " Creating new job " ) ;
var jobId = await client . Jobs . Create ( jobSettings ) ;
Console . WriteLine ( " Job created: {0} " , jobId ) ;
// Start the job and retrieve the run id.
Console . WriteLine ( " Run now: {0} " , jobId ) ;
var runId = await client . Jobs . RunNow ( jobId ) ;
using Policy = Polly . Policy ;
static async Task WaitForRun ( IJobsApi jobClient , long runId , int pollIntervalSeconds = 15 )
{
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. OrResult < RunState > ( state =>
state . LifeCycleState is RunLifeCycleState . PENDING or RunLifeCycleState . RUNNING
or RunLifeCycleState . TERMINATING )
. WaitAndRetryForeverAsync (
_ => TimeSpan . FromSeconds ( pollIntervalSeconds ) ,
( delegateResult , _ ) =>
{
if ( delegateResult . Exception != null )
{
Console . WriteLine (
$" [ { DateTime . UtcNow : s } ] Failed to query run - { delegateResult . Exception } " ) ;
}
} ) ;
await retryPolicy . ExecuteAsync ( async ( ) =>
{
var ( run , _ ) = await jobClient . RunsGet ( runId ) ;
Console . WriteLine (
$" [ { DateTime . UtcNow : s } ] Run: { runId } t LifeCycleState: { run . State . LifeCycleState } t ResultState: { run . State . ResultState } t Completed: { run . IsCompleted } "
) ;
return run . State ;
} ) ;
}
await WaitForRun ( client . Jobs , runId ) ;
var ( run , _ ) = await client . Jobs . RunsGet ( runId ) ;
foreach ( var runTask in run . Tasks )
{
var viewItems = await client . Jobs . RunsExport ( runTask . RunId ) ;
foreach ( var viewItem in viewItems )
{
Console . WriteLine ( $" Exported view item from run { runTask . RunId } , task " { runTask . TaskKey } " , view " { viewItem . Name } " " ) ;
Console . WriteLine ( " ==================== " ) ;
Console . WriteLine ( viewItem . Content [ .. 200 ] + " ... " ) ;
Console . WriteLine ( " ==================== " ) ;
}
}
Criando escopo secreto
const string scope = " SampleScope " ;
await client . Secrets . CreateScope ( scope , null ) ;
Criar text segredo
var secretName = " secretkey.text " ;
await client . Secrets . PutSecret ( " secret text " , scope , secretName ) ;
Crie segredo binário
var secretName = " secretkey.bin " ;
await client . Secrets . PutSecret ( new byte [ ] { 0x01 , 0x02 , 0x03 , 0x04 } , scope , secretName ) ;
Os clusters/create
, jobs/run-now
e jobs/runs/submit
o Token de Idempotência de Suporte APIs. É um token opcional garantir a idempotência das solicitações. Se um recurso (um cluster ou uma execução) com o token fornecido já existir, a solicitação não cria um novo recurso, mas retorna o ID do recurso existente.
Se você especificar o token da idempotência, após a falha, poderá tentar novamente até que a solicitação seja bem -sucedida. O Databricks garante que exatamente um recurso seja lançado com esse token idempotência.
O código a seguir ilustra como usar Polly para tentar novamente a solicitação com idempotency_token
se a solicitação falhar.
using Polly ;
double retryIntervalSec = 15 ;
string idempotencyToken = Guid . NewGuid ( ) . ToString ( ) ;
var clusterInfo = ClusterAttributes . GetNewClusterConfiguration ( " my-cluster " )
. WithNodeType ( " Standard_D3_v2 " )
. WithNumberOfWorkers ( 25 )
. WithRuntimeVersion ( RuntimeVersions . Runtime_7_3 ) ;
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . ServiceUnavailable )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. WaitAndRetryForeverAsync ( _ => TimeSpan . FromSeconds ( retryIntervalSec ) ) ;
var clusterId = await retryPolicy . ExecuteAsync ( async ( ) => await client . Clusters . Create ( clusterInfo , idempotencyToken ) ) ;
O V2 da biblioteca tem como alvo .NET 6 Tempo de execução.
A API de Jobs foi redesenhada para se alinhar com a versão 2.1 da API REST.
Na versão anterior, a API de Jobs suporta apenas uma única tarefa por trabalho. A nova API de Jobs suporta várias tarefas por trabalho, onde as tarefas são representadas como um DAG.
A nova versão suporta mais dois tipos de tarefas: tarefa de roda Python e tarefa de pipeline de tabelas Delta Live Tables.
Este projeto recebe contribuições e sugestões. A maioria das contribuições exige que você concorde com um Contrato de Licença de Colaborador (CLA) declarando que você tem o direito e, na verdade, concede -nos os direitos de usar sua contribuição. Para detalhes, visite o Contrato de Licença de Colaborador da Microsoft (CLA).
Quando você envia uma solicitação de tração, um CLA-bot determina automaticamente se você precisa fornecer um CLA e decorar o PR adequadamente (por exemplo, etiqueta, comentário). Simplesmente siga as instruções fornecidas pelo bot. Você só precisará fazer isso uma vez em todos os repositórios usando nosso CLA.
Este projeto adotou o Código de Conduta Open Microsoft. Para obter mais informações, consulte o Código de Conduta Perguntas frequentes ou entre em contato com [email protected] com quaisquer perguntas ou comentários adicionais.