La bibliothèque client Azure Databricks offre une interface pratique pour automatiser votre espace de travail Azure Databricks via l'API Azure Databricks REST.
L'implémentation de cette bibliothèque est basée sur l'API REST version 2.0 et supérieure.
La branche maître est pour la version 2. La version 1.1 (stable) est dans la branche des versions / 1.1.
Vous devez avoir des jetons d'accès personnels (PAT) ou des jetons Azure Active Directory (token AAD) pour accéder à l'API REST DATABRICKS.
API REST | Version | Description |
---|---|---|
Groupes | 2.0 | L'API des clusters vous permet de créer, démarrer, modifier, liste, terminer et supprimer les clusters. |
Emplois | 2.1 | L'API Jobs vous permet de gérer programmatiques des travaux Azure Databricks. |
DBFS | 2.0 | L'API DBFS est une API Databricks qui facilite l'interaction avec diverses sources de données sans avoir à inclure vos informations d'identification chaque fois que vous lisez un fichier. |
Secrets | 2.0 | L'API Secrets vous permet de gérer les secrets, les lunettes secrètes et les autorisations d'accès. |
Groupes | 2.0 | L'API des groupes vous permet de gérer des groupes d'utilisateurs. |
Bibliothèques | 2.0 | L'API des bibliothèques vous permet d'installer et de désinstaller des bibliothèques et d'obtenir l'état des bibliothèques sur un cluster. |
Jeton | 2.0 | L'API de jeton vous permet de créer, de répertorier et de révoquer les jetons qui peuvent être utilisés pour authentifier et accéder aux API de repos de données Azure. |
Espace de travail | 2.0 | L'API de l'espace de travail vous permet d'énumérer, d'importer, d'exporter et de supprimer les ordinateurs portables et les dossiers. |
Instancepool | 2.0 | L'API des pools d'instance vous permet de créer, modifier, supprimer et répertorier les pools d'instance. |
Autorisation | 2.0 | L'API des autorisations vous permet de gérer les autorisations pour les jetons, le cluster, le pool, le travail, le pipeline de tables en direct delta, le cahier, le répertoire, l'expérience MLFlow, le modèle enregistré MLFlow, l'entrepôt SQL, les politiques de réapprovisionnement et de cluster. |
Politiques de cluster | 2.0 | L'API des politiques de cluster vous permet de créer, de répertorier et de modifier les politiques de cluster. |
Scripts mondiaux init | 2.0 | L'API Global Init Scripts permet aux administrateurs Azure Databricks ajouter des scripts d'initialisation globale de cluster de manière sécurisée et contrôlée. |
Entrepôts SQL | 2.0 | L'API SQL Warehouses vous permet de gérer des ressources de calcul qui vous permettent d'exécuter des commandes SQL sur des objets de données dans Databricks SQL. |
Se soustraire | 2.0 | L'API REPOS permet aux utilisateurs de gérer leurs repos GIT. Les utilisateurs peuvent utiliser l'API pour accéder à tous les dépositions sur lesquelles ils ont géré les autorisations. |
Pipelines (tables en direct delta) | 2.0 | L'API Delta Live Tables vous permet de créer, modifier, supprimer, démarrer et afficher les détails sur les pipelines. |
Consultez l'exemple de projet pour des usages plus détaillés.
Dans les exemples suivants, la variable baseUrl
doit être définie sur l'URL de base de l'espace de travail, qui ressemble à https://adb-<workspace-id>.<random-number>.azuredatabricks.net
, et la variable token
doit être définie sur votre Token d'accès personnel de Databricks.
using ( var client = DatabricksClient . CreateClient ( baseUrl , token ) )
{
// ...
}
var clusterConfig = ClusterAttributes
. GetNewClusterConfiguration ( " Sample cluster " )
. WithRuntimeVersion ( RuntimeVersions . Runtime_10_4 )
. WithAutoScale ( 3 , 7 )
. WithAutoTermination ( 30 )
. WithClusterLogConf ( " dbfs:/logs/ " )
. WithNodeType ( NodeTypes . Standard_D3_v2 )
. WithClusterMode ( ClusterMode . SingleNode ) ;
var clusterId = await client . Clusters . Create ( clusterConfig ) ;
using Policy = Polly . Policy ;
static async Task WaitForCluster ( IClustersApi clusterClient , string clusterId , int pollIntervalSeconds = 15 )
{
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. OrResult < ClusterInfo > ( info => info . State is not ( ClusterState . RUNNING or ClusterState . ERROR or ClusterState . TERMINATED ) )
. WaitAndRetryForeverAsync (
_ => TimeSpan . FromSeconds ( pollIntervalSeconds ) ,
( delegateResult , _ ) =>
{
if ( delegateResult . Exception != null )
{
Console . WriteLine ( $" [ { DateTime . UtcNow : s } ] Failed to query cluster info - { delegateResult . Exception } " ) ;
}
} ) ;
await retryPolicy . ExecuteAsync ( async ( ) =>
{
var info = await clusterClient . Get ( clusterId ) ;
Console . WriteLine ( $" [ { DateTime . UtcNow : s } ] Cluster: { clusterId } t State: { info . State } t Message: { info . StateMessage } " ) ;
return info ;
} ) ;
}
await WaitForCluster ( client . Clusters , clusterId ) ;
await client . Clusters . Terminate ( clusterId ) ;
await WaitForCluster ( client . Clusters , clusterId ) ;
await client . Clusters . Delete ( clusterId ) ;
// Job schedule
var schedule = new CronSchedule
{
QuartzCronExpression = " 0 0 9 ? * MON-FRI " ,
TimezoneId = " Europe/London " ,
PauseStatus = PauseStatus . UNPAUSED
} ;
// Run with a job cluster
var newCluster = ClusterAttributes . GetNewClusterConfiguration ( )
. WithClusterMode ( ClusterMode . SingleNode )
. WithNodeType ( NodeTypes . Standard_D3_v2 )
. WithRuntimeVersion ( RuntimeVersions . Runtime_10_4 ) ;
// Create job settings
var jobSettings = new JobSettings
{
MaxConcurrentRuns = 1 ,
Schedule = schedule ,
Name = " Sample Job "
} ;
// Adding 3 tasks to the job settings.
var task1 = jobSettings . AddTask ( " task1 " , new NotebookTask { NotebookPath = SampleNotebookPath } )
. WithDescription ( " Sample Job - task1 " )
. WithNewCluster ( newCluster ) ;
var task2 = jobSettings . AddTask ( " task2 " , new NotebookTask { NotebookPath = SampleNotebookPath } )
. WithDescription ( " Sample Job - task2 " )
. WithNewCluster ( newCluster ) ;
jobSettings . AddTask ( " task3 " , new NotebookTask { NotebookPath = SampleNotebookPath } , new [ ] { task1 , task2 } )
. WithDescription ( " Sample Job - task3 " )
. WithNewCluster ( newCluster ) ;
// Create the job.
Console . WriteLine ( " Creating new job " ) ;
var jobId = await client . Jobs . Create ( jobSettings ) ;
Console . WriteLine ( " Job created: {0} " , jobId ) ;
// Start the job and retrieve the run id.
Console . WriteLine ( " Run now: {0} " , jobId ) ;
var runId = await client . Jobs . RunNow ( jobId ) ;
using Policy = Polly . Policy ;
static async Task WaitForRun ( IJobsApi jobClient , long runId , int pollIntervalSeconds = 15 )
{
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. OrResult < RunState > ( state =>
state . LifeCycleState is RunLifeCycleState . PENDING or RunLifeCycleState . RUNNING
or RunLifeCycleState . TERMINATING )
. WaitAndRetryForeverAsync (
_ => TimeSpan . FromSeconds ( pollIntervalSeconds ) ,
( delegateResult , _ ) =>
{
if ( delegateResult . Exception != null )
{
Console . WriteLine (
$" [ { DateTime . UtcNow : s } ] Failed to query run - { delegateResult . Exception } " ) ;
}
} ) ;
await retryPolicy . ExecuteAsync ( async ( ) =>
{
var ( run , _ ) = await jobClient . RunsGet ( runId ) ;
Console . WriteLine (
$" [ { DateTime . UtcNow : s } ] Run: { runId } t LifeCycleState: { run . State . LifeCycleState } t ResultState: { run . State . ResultState } t Completed: { run . IsCompleted } "
) ;
return run . State ;
} ) ;
}
await WaitForRun ( client . Jobs , runId ) ;
var ( run , _ ) = await client . Jobs . RunsGet ( runId ) ;
foreach ( var runTask in run . Tasks )
{
var viewItems = await client . Jobs . RunsExport ( runTask . RunId ) ;
foreach ( var viewItem in viewItems )
{
Console . WriteLine ( $" Exported view item from run { runTask . RunId } , task " { runTask . TaskKey } " , view " { viewItem . Name } " " ) ;
Console . WriteLine ( " ==================== " ) ;
Console . WriteLine ( viewItem . Content [ .. 200 ] + " ... " ) ;
Console . WriteLine ( " ==================== " ) ;
}
}
Créer une portée secrète
const string scope = " SampleScope " ;
await client . Secrets . CreateScope ( scope , null ) ;
Créer un secret de texte
var secretName = " secretkey.text " ;
await client . Secrets . PutSecret ( " secret text " , scope , secretName ) ;
Créer un secret binaire
var secretName = " secretkey.bin " ;
await client . Secrets . PutSecret ( new byte [ ] { 0x01 , 0x02 , 0x03 , 0x04 } , scope , secretName ) ;
Les clusters/create
, jobs/run-now
et jobs/runs/submit
le jeton d'identification des API de support. Il s'agit d'un jeton facultatif pour garantir l'idémpotence des demandes. Si une ressource (un cluster ou une exécution) avec le jeton fourni existe déjà, la demande ne crée pas de nouvelle ressource mais renvoie l'ID de la ressource existante à la place.
Si vous spécifiez le jeton Idempotence, en cas de défaillance, vous pouvez réessayer jusqu'à ce que la demande réussit. Databricks garantit que exactement une ressource est lancée avec ce jeton d'identification.
Le code suivant illustre comment utiliser Polly pour réessayer la demande avec idempotency_token
en cas d'échec de la demande.
using Polly ;
double retryIntervalSec = 15 ;
string idempotencyToken = Guid . NewGuid ( ) . ToString ( ) ;
var clusterInfo = ClusterAttributes . GetNewClusterConfiguration ( " my-cluster " )
. WithNodeType ( " Standard_D3_v2 " )
. WithNumberOfWorkers ( 25 )
. WithRuntimeVersion ( RuntimeVersions . Runtime_7_3 ) ;
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . ServiceUnavailable )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. WaitAndRetryForeverAsync ( _ => TimeSpan . FromSeconds ( retryIntervalSec ) ) ;
var clusterId = await retryPolicy . ExecuteAsync ( async ( ) => await client . Clusters . Create ( clusterInfo , idempotencyToken ) ) ;
Le V2 de la bibliothèque cible .NET 6 Runtime.
L'API Jobs a été redessinée pour s'aligner sur la version 2.1 de l'API REST.
Dans la version précédente, l'API Jobs ne prend en charge que la tâche par emploi. L'API New Jobs prend en charge plusieurs tâches par emploi, où les tâches sont représentées comme un DAG.
La nouvelle version prend en charge deux autres types de tâches: Python Wheel Task et Delta Live Tables Pipeline Task.
Ce projet accueille les contributions et les suggestions. La plupart des contributions vous obligent à accepter un accord de licence de contributeur (CLA) déclarant que vous avez le droit de faire et en fait, accordez-nous les droits d'utilisation de votre contribution. Pour plus de détails, visitez le contrat de licence de contributeur Microsoft (CLA).
Lorsque vous soumettez une demande de traction, un CLA-BOT déterminera automatiquement si vous devez fournir un CLA et décorer le PR de manière appropriée (par exemple, étiqueter, commentaire). Suivez simplement les instructions fournies par le bot. Vous n'aurez besoin de le faire qu'une seule fois sur tous les dépositions en utilisant notre CLA.
Ce projet a adopté le code de conduite open source Microsoft. Pour plus d'informations, consultez le code de conduite FAQ ou contactez [email protected] avec toute question ou commentaire supplémentaire.