La Biblioteca de clientes de Azure Databricks ofrece una interfaz conveniente para automatizar su espacio de trabajo de Azure Databricks a través de Azure Databricks REST API.
La implementación de esta biblioteca se basa en REST API versión 2.0 y superior.
La rama maestra es para la versión 2. Versión 1.1 (estable) está en la rama de lanzamientos/1.1.
Debe tener tokens de acceso personal (PAT) o tokens Azure Active Directory (TOKEN AAD) para acceder a la API REST de Databricks.
API REST | Versión | Descripción |
---|---|---|
Racimos | 2.0 | La API de Clusters le permite crear, iniciar, editar, enumerar, terminar y eliminar grupos. |
Trabajos | 2.1 | La API de trabajos le permite administrar programáticamente los trabajos de Databricks de Azure. |
DBFS | 2.0 | La API DBFS es una API de Databricks que hace que sea fácil interactuar con varias fuentes de datos sin tener que incluir sus credenciales cada vez que lea un archivo. |
Misterios | 2.0 | La API Secrets le permite administrar secretos, ámbitos secretos y permisos de acceso. |
Grupos | 2.0 | La API de grupos le permite administrar grupos de usuarios. |
Bibliotecas | 2.0 | La API de las bibliotecas le permite instalar y desinstalar bibliotecas y obtener el estado de las bibliotecas en un clúster. |
Simbólico | 2.0 | La API de token le permite crear, enumerar y revocar tokens que se pueden usar para autenticar y acceder a Azure Databricks REST API. |
Espacio de trabajo | 2.0 | La API del espacio de trabajo le permite enumerar, importar, exportar y eliminar cuadernos y carpetas. |
Instancia | 2.0 | La API de Pools de instancias le permite crear, editar, eliminar y enumerar los grupos de instancias. |
Permisos | 2.0 | La API de permisos le permite administrar los permisos para token, clúster, piscina, trabajo, tubería de tablas en vivo delta, cuaderno, directorio, experimento mlflow, modelo registrado de mlflow, póliza de almacén SQL, repo y clúster. |
Políticas de clúster | 2.0 | La API de políticas de clúster le permite crear, enumerar y editar políticas de clúster. |
Global Init Scripts | 2.0 | La API Global Init Scripts permite a los administradores de Databricks de Azure agregan scripts de inicialización de clúster global de manera segura y controlada. |
Almacenes SQL | 2.0 | La API SQL Warehouses le permite administrar recursos de cálculo que le permite ejecutar comandos SQL en objetos de datos dentro de Databricks SQL. |
Repositar | 2.0 | La API Repos permite a los usuarios administrar sus repositorios GIT. Los usuarios pueden usar la API para acceder a todos los repositorios en los que han administrado los permisos. |
Tuberías (tablas de Delta Live) | 2.0 | La API de Tablas Live Delta le permite crear, editar, eliminar, iniciar y ver detalles sobre las tuberías. |
Consulte el proyecto de muestra para obtener usos más detallados.
En los siguientes ejemplos, la variable baseUrl
debe establecerse en la URL base del espacio de trabajo, que parece https://adb-<workspace-id>.<random-number>.azuredatabricks.net
, y la variable token
debe configurarse en su DataBricks Token de acceso personal.
using ( var client = DatabricksClient . CreateClient ( baseUrl , token ) )
{
// ...
}
var clusterConfig = ClusterAttributes
. GetNewClusterConfiguration ( " Sample cluster " )
. WithRuntimeVersion ( RuntimeVersions . Runtime_10_4 )
. WithAutoScale ( 3 , 7 )
. WithAutoTermination ( 30 )
. WithClusterLogConf ( " dbfs:/logs/ " )
. WithNodeType ( NodeTypes . Standard_D3_v2 )
. WithClusterMode ( ClusterMode . SingleNode ) ;
var clusterId = await client . Clusters . Create ( clusterConfig ) ;
using Policy = Polly . Policy ;
static async Task WaitForCluster ( IClustersApi clusterClient , string clusterId , int pollIntervalSeconds = 15 )
{
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. OrResult < ClusterInfo > ( info => info . State is not ( ClusterState . RUNNING or ClusterState . ERROR or ClusterState . TERMINATED ) )
. WaitAndRetryForeverAsync (
_ => TimeSpan . FromSeconds ( pollIntervalSeconds ) ,
( delegateResult , _ ) =>
{
if ( delegateResult . Exception != null )
{
Console . WriteLine ( $" [ { DateTime . UtcNow : s } ] Failed to query cluster info - { delegateResult . Exception } " ) ;
}
} ) ;
await retryPolicy . ExecuteAsync ( async ( ) =>
{
var info = await clusterClient . Get ( clusterId ) ;
Console . WriteLine ( $" [ { DateTime . UtcNow : s } ] Cluster: { clusterId } t State: { info . State } t Message: { info . StateMessage } " ) ;
return info ;
} ) ;
}
await WaitForCluster ( client . Clusters , clusterId ) ;
await client . Clusters . Terminate ( clusterId ) ;
await WaitForCluster ( client . Clusters , clusterId ) ;
await client . Clusters . Delete ( clusterId ) ;
// Job schedule
var schedule = new CronSchedule
{
QuartzCronExpression = " 0 0 9 ? * MON-FRI " ,
TimezoneId = " Europe/London " ,
PauseStatus = PauseStatus . UNPAUSED
} ;
// Run with a job cluster
var newCluster = ClusterAttributes . GetNewClusterConfiguration ( )
. WithClusterMode ( ClusterMode . SingleNode )
. WithNodeType ( NodeTypes . Standard_D3_v2 )
. WithRuntimeVersion ( RuntimeVersions . Runtime_10_4 ) ;
// Create job settings
var jobSettings = new JobSettings
{
MaxConcurrentRuns = 1 ,
Schedule = schedule ,
Name = " Sample Job "
} ;
// Adding 3 tasks to the job settings.
var task1 = jobSettings . AddTask ( " task1 " , new NotebookTask { NotebookPath = SampleNotebookPath } )
. WithDescription ( " Sample Job - task1 " )
. WithNewCluster ( newCluster ) ;
var task2 = jobSettings . AddTask ( " task2 " , new NotebookTask { NotebookPath = SampleNotebookPath } )
. WithDescription ( " Sample Job - task2 " )
. WithNewCluster ( newCluster ) ;
jobSettings . AddTask ( " task3 " , new NotebookTask { NotebookPath = SampleNotebookPath } , new [ ] { task1 , task2 } )
. WithDescription ( " Sample Job - task3 " )
. WithNewCluster ( newCluster ) ;
// Create the job.
Console . WriteLine ( " Creating new job " ) ;
var jobId = await client . Jobs . Create ( jobSettings ) ;
Console . WriteLine ( " Job created: {0} " , jobId ) ;
// Start the job and retrieve the run id.
Console . WriteLine ( " Run now: {0} " , jobId ) ;
var runId = await client . Jobs . RunNow ( jobId ) ;
using Policy = Polly . Policy ;
static async Task WaitForRun ( IJobsApi jobClient , long runId , int pollIntervalSeconds = 15 )
{
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. OrResult < RunState > ( state =>
state . LifeCycleState is RunLifeCycleState . PENDING or RunLifeCycleState . RUNNING
or RunLifeCycleState . TERMINATING )
. WaitAndRetryForeverAsync (
_ => TimeSpan . FromSeconds ( pollIntervalSeconds ) ,
( delegateResult , _ ) =>
{
if ( delegateResult . Exception != null )
{
Console . WriteLine (
$" [ { DateTime . UtcNow : s } ] Failed to query run - { delegateResult . Exception } " ) ;
}
} ) ;
await retryPolicy . ExecuteAsync ( async ( ) =>
{
var ( run , _ ) = await jobClient . RunsGet ( runId ) ;
Console . WriteLine (
$" [ { DateTime . UtcNow : s } ] Run: { runId } t LifeCycleState: { run . State . LifeCycleState } t ResultState: { run . State . ResultState } t Completed: { run . IsCompleted } "
) ;
return run . State ;
} ) ;
}
await WaitForRun ( client . Jobs , runId ) ;
var ( run , _ ) = await client . Jobs . RunsGet ( runId ) ;
foreach ( var runTask in run . Tasks )
{
var viewItems = await client . Jobs . RunsExport ( runTask . RunId ) ;
foreach ( var viewItem in viewItems )
{
Console . WriteLine ( $" Exported view item from run { runTask . RunId } , task " { runTask . TaskKey } " , view " { viewItem . Name } " " ) ;
Console . WriteLine ( " ==================== " ) ;
Console . WriteLine ( viewItem . Content [ .. 200 ] + " ... " ) ;
Console . WriteLine ( " ==================== " ) ;
}
}
Creando alcance secreto
const string scope = " SampleScope " ;
await client . Secrets . CreateScope ( scope , null ) ;
Crear secreto de texto
var secretName = " secretkey.text " ;
await client . Secrets . PutSecret ( " secret text " , scope , secretName ) ;
Crear secreto binario
var secretName = " secretkey.bin " ;
await client . Secrets . PutSecret ( new byte [ ] { 0x01 , 0x02 , 0x03 , 0x04 } , scope , secretName ) ;
Los clusters/create
, jobs/run-now
y jobs/runs/submit
el token de idempotencia de soporte de APIS. Es token opcional para garantizar la idempotencia de las solicitudes. Si ya existe un recurso (un clúster o una ejecución) con el token proporcionado, la solicitud no crea un nuevo recurso, sino que devuelve la identificación del recurso existente.
Si especifica el token de idempotencia, al fallar puede volver a intentar hasta que la solicitud tenga éxito. Databricks garantiza que exactamente un recurso se lance con ese token idempotencia.
El siguiente código ilustra cómo usar Polly para volver a intentar la solicitud con idempotency_token
si la solicitud falla.
using Polly ;
double retryIntervalSec = 15 ;
string idempotencyToken = Guid . NewGuid ( ) . ToString ( ) ;
var clusterInfo = ClusterAttributes . GetNewClusterConfiguration ( " my-cluster " )
. WithNodeType ( " Standard_D3_v2 " )
. WithNumberOfWorkers ( 25 )
. WithRuntimeVersion ( RuntimeVersions . Runtime_7_3 ) ;
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . ServiceUnavailable )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. WaitAndRetryForeverAsync ( _ => TimeSpan . FromSeconds ( retryIntervalSec ) ) ;
var clusterId = await retryPolicy . ExecuteAsync ( async ( ) => await client . Clusters . Create ( clusterInfo , idempotencyToken ) ) ;
El V2 de la biblioteca se dirige al tiempo de ejecución .NET 6.
La API de Jobs fue rediseñada para alinearse con la versión 2.1 de la API REST.
En la versión anterior, la API de trabajos solo admite una sola tarea por trabajo. La nueva API de trabajos admite múltiples tareas por trabajo, donde las tareas se representan como un DAG.
La nueva versión admite dos tipos más de tareas: tarea de la rueda de Python y tarea de tubería de tablas Live Delta.
Este proyecto da la bienvenida a las contribuciones y sugerencias. La mayoría de las contribuciones requieren que acepte un Acuerdo de Licencia de Contributor (CLA) que declare que tiene derecho y realmente hacernos los derechos para utilizar su contribución. Para más detalles, visite el Acuerdo de licencia de contribuyente de Microsoft (CLA).
Cuando envíe una solicitud de extracción, un bote CLA determinará automáticamente si necesita proporcionar un CLA y decorar el PR de manera apropiada (por ejemplo, etiqueta, comentario). Simplemente siga las instrucciones proporcionadas por el bot. Solo necesitará hacer esto una vez en todos los reposos usando nuestro CLA.
Este proyecto ha adoptado el Código de Conducta Open Open Microsoft. Para obtener más información, consulte el Código de Conducta Preguntas frecuentes o comuníquese con [email protected] con cualquier pregunta o comentario adicional.