Die Azure Databricks Client -Bibliothek bietet eine bequeme Schnittstelle zum Automatisieren Ihres Azure Databricks Workspace über die Azure -Datenbeutel REST -API.
Die Implementierung dieser Bibliothek basiert auf REST -API -Version 2.0 und höher.
Der Master -Zweig ist für Version 2. Version 1.1 (stabil) befindet sich in den Releases/1.1 -Zweig.
Sie müssen über persönliche Zugriffstoken (PAT) oder Azure Active Directory -Token (AAD -Token) verfügen, um auf die REST -API von Databricks zuzugreifen.
Ruhe -API | Version | Beschreibung |
---|---|---|
Cluster | 2.0 | Mit der Cluster -API können Sie Cluster erstellen, starten, bearbeiten, listen, beenden und löschen. |
Jobs | 2.1 | Mit der Job -API können Sie azure Databricks -Jobs programmgesteuert verwalten. |
DBFS | 2.0 | Die DBFS -API ist eine Datenbank -API, die es einfach macht, mit verschiedenen Datenquellen zu interagieren, ohne dass Sie Ihre Anmeldeinformationen jedes Mal einbeziehen müssen, wenn Sie eine Datei lesen. |
Geheimnisse | 2.0 | Mit der API von Secrets können Sie Geheimnisse, geheime Bereiche und Zugriffsberechtigungen verwalten. |
Gruppen | 2.0 | Mit der API der Gruppen können Sie Gruppen von Benutzern verwalten. |
Bibliotheken | 2.0 | Mit der API der Bibliotheken können Sie Bibliotheken installieren und deinstallieren und den Status von Bibliotheken auf einem Cluster erhalten. |
Token | 2.0 | Mit der Token -API können Sie Token erstellen, auflisten und widerrufen, mit denen Sie Azure Databricks Rest -APIs authentifizieren und zugreifen können. |
Arbeitsplatz | 2.0 | Mit der API der Arbeitsbereich können Sie Notizbücher und Ordner auflisten, importieren, exportieren und löschen. |
Instanzpool | 2.0 | Mit der API von Instance Pools können Sie Pools erstellen, bearbeiten, löschen und listen. |
Berechtigungen | 2.0 | Mit der API -Berechtigungs -API können Sie Berechtigungen für Token, Cluster, Pool, Job, Delta Live -Tabellen -Pipeline, Notizbuch, Verzeichnis, MLFlow -Experiment, MLFlow -Registrierter Modell, SQL Warehouse, Repo- und Cluster -Richtlinien verwalten. |
Cluster -Richtlinien | 2.0 | Mit der API von Cluster -Richtlinien können Sie Cluster -Richtlinien erstellen, auflisten und bearbeiten. |
Globale Init -Skripte | 2.0 | Mit der API Global Init Skrips können Azure Databricks -Administratoren auf sichere und kontrollierte Weise globale Cluster -Initialisierungsskripte hinzufügen. |
SQL -Lagerhäuser | 2.0 | Mit der SQL Warehouses -API können Sie Rechnungsressourcen verwalten, mit denen Sie SQL -Befehle für Datenobjekte in Databricks SQL ausführen können. |
Repos | 2.0 | Mit der Repos -API können Benutzer ihre Git -Repos verwalten. Benutzer können die API verwenden, um auf alle Repos zugreifen zu können, für die Berechtigungen verwaltet werden. |
Pipelines (Delta lebende Tische) | 2.0 | Mit der API von Delta Live Tabellen können Sie Details zu Pipelines erstellen, bearbeiten, löschen, starten und anzeigen. |
Weitere detailliertere Verwendungen finden Sie im Beispielprojekt.
In den folgenden Beispielen token
die baseUrl
Variable auf die Arbeitsbereichs-Basis-URL eingestellt werden, die wie https://adb-<workspace-id>.<random-number>.azuredatabricks.net
Databricks Personal Access Token.
using ( var client = DatabricksClient . CreateClient ( baseUrl , token ) )
{
// ...
}
var clusterConfig = ClusterAttributes
. GetNewClusterConfiguration ( " Sample cluster " )
. WithRuntimeVersion ( RuntimeVersions . Runtime_10_4 )
. WithAutoScale ( 3 , 7 )
. WithAutoTermination ( 30 )
. WithClusterLogConf ( " dbfs:/logs/ " )
. WithNodeType ( NodeTypes . Standard_D3_v2 )
. WithClusterMode ( ClusterMode . SingleNode ) ;
var clusterId = await client . Clusters . Create ( clusterConfig ) ;
using Policy = Polly . Policy ;
static async Task WaitForCluster ( IClustersApi clusterClient , string clusterId , int pollIntervalSeconds = 15 )
{
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. OrResult < ClusterInfo > ( info => info . State is not ( ClusterState . RUNNING or ClusterState . ERROR or ClusterState . TERMINATED ) )
. WaitAndRetryForeverAsync (
_ => TimeSpan . FromSeconds ( pollIntervalSeconds ) ,
( delegateResult , _ ) =>
{
if ( delegateResult . Exception != null )
{
Console . WriteLine ( $" [ { DateTime . UtcNow : s } ] Failed to query cluster info - { delegateResult . Exception } " ) ;
}
} ) ;
await retryPolicy . ExecuteAsync ( async ( ) =>
{
var info = await clusterClient . Get ( clusterId ) ;
Console . WriteLine ( $" [ { DateTime . UtcNow : s } ] Cluster: { clusterId } t State: { info . State } t Message: { info . StateMessage } " ) ;
return info ;
} ) ;
}
await WaitForCluster ( client . Clusters , clusterId ) ;
await client . Clusters . Terminate ( clusterId ) ;
await WaitForCluster ( client . Clusters , clusterId ) ;
await client . Clusters . Delete ( clusterId ) ;
// Job schedule
var schedule = new CronSchedule
{
QuartzCronExpression = " 0 0 9 ? * MON-FRI " ,
TimezoneId = " Europe/London " ,
PauseStatus = PauseStatus . UNPAUSED
} ;
// Run with a job cluster
var newCluster = ClusterAttributes . GetNewClusterConfiguration ( )
. WithClusterMode ( ClusterMode . SingleNode )
. WithNodeType ( NodeTypes . Standard_D3_v2 )
. WithRuntimeVersion ( RuntimeVersions . Runtime_10_4 ) ;
// Create job settings
var jobSettings = new JobSettings
{
MaxConcurrentRuns = 1 ,
Schedule = schedule ,
Name = " Sample Job "
} ;
// Adding 3 tasks to the job settings.
var task1 = jobSettings . AddTask ( " task1 " , new NotebookTask { NotebookPath = SampleNotebookPath } )
. WithDescription ( " Sample Job - task1 " )
. WithNewCluster ( newCluster ) ;
var task2 = jobSettings . AddTask ( " task2 " , new NotebookTask { NotebookPath = SampleNotebookPath } )
. WithDescription ( " Sample Job - task2 " )
. WithNewCluster ( newCluster ) ;
jobSettings . AddTask ( " task3 " , new NotebookTask { NotebookPath = SampleNotebookPath } , new [ ] { task1 , task2 } )
. WithDescription ( " Sample Job - task3 " )
. WithNewCluster ( newCluster ) ;
// Create the job.
Console . WriteLine ( " Creating new job " ) ;
var jobId = await client . Jobs . Create ( jobSettings ) ;
Console . WriteLine ( " Job created: {0} " , jobId ) ;
// Start the job and retrieve the run id.
Console . WriteLine ( " Run now: {0} " , jobId ) ;
var runId = await client . Jobs . RunNow ( jobId ) ;
using Policy = Polly . Policy ;
static async Task WaitForRun ( IJobsApi jobClient , long runId , int pollIntervalSeconds = 15 )
{
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. OrResult < RunState > ( state =>
state . LifeCycleState is RunLifeCycleState . PENDING or RunLifeCycleState . RUNNING
or RunLifeCycleState . TERMINATING )
. WaitAndRetryForeverAsync (
_ => TimeSpan . FromSeconds ( pollIntervalSeconds ) ,
( delegateResult , _ ) =>
{
if ( delegateResult . Exception != null )
{
Console . WriteLine (
$" [ { DateTime . UtcNow : s } ] Failed to query run - { delegateResult . Exception } " ) ;
}
} ) ;
await retryPolicy . ExecuteAsync ( async ( ) =>
{
var ( run , _ ) = await jobClient . RunsGet ( runId ) ;
Console . WriteLine (
$" [ { DateTime . UtcNow : s } ] Run: { runId } t LifeCycleState: { run . State . LifeCycleState } t ResultState: { run . State . ResultState } t Completed: { run . IsCompleted } "
) ;
return run . State ;
} ) ;
}
await WaitForRun ( client . Jobs , runId ) ;
var ( run , _ ) = await client . Jobs . RunsGet ( runId ) ;
foreach ( var runTask in run . Tasks )
{
var viewItems = await client . Jobs . RunsExport ( runTask . RunId ) ;
foreach ( var viewItem in viewItems )
{
Console . WriteLine ( $" Exported view item from run { runTask . RunId } , task " { runTask . TaskKey } " , view " { viewItem . Name } " " ) ;
Console . WriteLine ( " ==================== " ) ;
Console . WriteLine ( viewItem . Content [ .. 200 ] + " ... " ) ;
Console . WriteLine ( " ==================== " ) ;
}
}
Geheimen Umfang erstellen
const string scope = " SampleScope " ;
await client . Secrets . CreateScope ( scope , null ) ;
Textgeheimnis erstellen
var secretName = " secretkey.text " ;
await client . Secrets . PutSecret ( " secret text " , scope , secretName ) ;
Binäres Geheimnis erstellen
var secretName = " secretkey.bin " ;
await client . Secrets . PutSecret ( new byte [ ] { 0x01 , 0x02 , 0x03 , 0x04 } , scope , secretName ) ;
Die clusters/create
, jobs/run-now
und jobs/runs/submit
Unterstützung für APIs unterstützen idealcy token. Es ist optional Token, um die Idempotenz von Anforderungen zu gewährleisten. Wenn eine Ressource (ein Cluster oder ein Lauf) mit dem bereits vorgesehenen Token bereits vorhanden ist, erstellt die Anfrage keine neue Ressource, sondern gibt stattdessen die ID der vorhandenen Ressource zurück.
Wenn Sie das idealcy -Token angeben, können Sie nach dem Erfolg der Anfrage nach dem Ausfall wiederholen. Databricks garantiert, dass genau eine Ressource mit diesem Idempotenz -Token gestartet wird.
Der folgende Code zeigt, wie Polly die Anforderung mit idempotency_token
wiederholt, wenn die Anforderung fehlschlägt.
using Polly ;
double retryIntervalSec = 15 ;
string idempotencyToken = Guid . NewGuid ( ) . ToString ( ) ;
var clusterInfo = ClusterAttributes . GetNewClusterConfiguration ( " my-cluster " )
. WithNodeType ( " Standard_D3_v2 " )
. WithNumberOfWorkers ( 25 )
. WithRuntimeVersion ( RuntimeVersions . Runtime_7_3 ) ;
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . ServiceUnavailable )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. WaitAndRetryForeverAsync ( _ => TimeSpan . FromSeconds ( retryIntervalSec ) ) ;
var clusterId = await retryPolicy . ExecuteAsync ( async ( ) => await client . Clusters . Create ( clusterInfo , idempotencyToken ) ) ;
Die V2 der Bibliothek zielt auf .net 6 Laufzeit.
Die Job -API wurde neu gestaltet, um sich mit der Version 2.1 der Rest -API auszurichten.
In der vorherigen Version unterstützt die Job -API nur eine einzelne Aufgabe pro Job. Die neue Job -API unterstützt mehrere Aufgaben pro Job, bei denen die Aufgaben als DAG dargestellt werden.
Die neue Version unterstützt zwei weitere Arten von Aufgaben: Python Wheel Task und Delta Live Tables Pipeline Task.
Dieses Projekt begrüßt Beiträge und Vorschläge. Bei den meisten Beiträgen müssen Sie einer Mitarbeiters Lizenzvereinbarung (CLA) zustimmen, in der Sie erklären, dass Sie das Recht haben und uns tatsächlich tun, um uns die Rechte zu gewähren, Ihren Beitrag zu verwenden. Weitere Informationen finden Sie unter Microsoft Contributor Lizenzvereinbarung (CLA).
Wenn Sie eine Pull-Anfrage einreichen, enthält ein ClA-BOT automatisch, ob Sie eine CLA angeben und die PR angemessen dekorieren müssen (z. B. Etikett, Kommentar). Befolgen Sie einfach die vom Bot bereitgestellten Anweisungen. Sie müssen dies nur einmal über alle Repos mit unserem CLA tun.
Dieses Projekt hat den Microsoft Open Source -Verhaltenscode übernommen. Weitere Informationen finden Sie im FAQ oder wenden Sie sich an [email protected] mit zusätzlichen Fragen oder Kommentaren.