توفر مكتبة عميل Azure Databricks واجهة مريحة لأتمتة مساحة عمل DataBricks الخاصة بك من خلال Azure Databricks Rest API.
يعتمد تنفيذ هذه المكتبة على إصدار REST API 2.0 وما فوق.
الفرع الرئيسي هو للنسخة 2. الإصدار 1.1 (مستقر) في الفرع/1.1.
يجب أن يكون لديك رموز وصول شخصية (PAT) أو Azure Active Directory Tokens (AAD Token) للوصول إلى واجهة برمجة تطبيقات REST Databricks.
استراحة API | إصدار | وصف |
---|---|---|
مجموعات | 2.0 | تتيح لك API Clusters API إنشاء المجموعات والبدء والتحرير والقائمة وإنهاء وحذف. |
وظائف | 2.1 | تتيح لك واجهة برمجة تطبيقات الوظائف إدارة وظائف Azure Databricks برمجياً. |
DBFS | 2.0 | واجهة برمجة تطبيقات DBFS هي واجهة برمجة تطبيقات DATABRICKS التي تجعل من السهل التفاعل مع مصادر البيانات المختلفة دون الحاجة إلى تضمين بيانات الاعتماد الخاصة بك في كل مرة تقرأ فيها ملفًا. |
أسرار | 2.0 | تتيح لك API أسرار إدارة الأسرار والنطاقات السرية وأذونات الوصول. |
مجموعات | 2.0 | تتيح لك API للمجموعات إدارة مجموعات المستخدمين. |
المكتبات | 2.0 | تتيح لك واجهة برمجة تطبيقات المكتبات تثبيت المكتبات وإلغاء تثبيتها والحصول على حالة المكتبات على الكتلة. |
رمز | 2.0 | يتيح لك API الرمز المميز إنشاء الرموز المميزة التي يمكن استخدامها لمصادقة واجهات Azure Databricks Rest و REACK ، وإلغاءها ، وإلغاءها. |
مساحة العمل | 2.0 | تتيح لك واجهة برمجة تطبيقات مساحة العمل إدراج واستيراد وتصدير وحذف دفاتر الملاحظات والمجلدات. |
مثيل | 2.0 | يتيح لك API Pools API إنشاء وحذف وحذف وسرقة تجمعات مثيل. |
أذونات | 2.0 | تتيح لك API أذونات إدارة أذونات الرمز المميز والمجموعة والمجمع والوظيفة وخط أنابيب Delta Live Tables و Notebook و Directory و MLFlow ، ونموذج MLFLOW المسجل ، و SQL Warehouse ، و REPO و CLUSTER POCIES. |
سياسات الكتلة | 2.0 | تتيح لك واجهة برمجة تطبيقات سياسات الكتلة إنشاء سياسات الكتلة وإدراجها وتحريرها. |
البرامج النصية init العالمية | 2.0 | تتيح API Scripts Global Init Mostables Azure Databricks إضافة نصوص تهيئة تهيئة الكتلة العالمية بطريقة آمنة ومسيطر عليها. |
مستودعات SQL | 2.0 | تتيح لك واجهة برمجة تطبيقات مستودعات SQL إدارة موارد حسابية تتيح لك تشغيل أوامر SQL على كائنات البيانات داخل Databricks SQL. |
repos | 2.0 | تتيح API repos للمستخدمين إدارة إعادة repos git الخاصة بهم. يمكن للمستخدمين استخدام واجهة برمجة التطبيقات للوصول إلى جميع عمليات إعادة الشراء التي يقومون بإدارة الأذونات عليها. |
خطوط الأنابيب (طاولات دلتا الحية) | 2.0 | تتيح لك واجهة برمجة تطبيقات Delta Live Tables إنشاء التفاصيل وتحريرها وحذفها وبدءها وعرضها حول خطوط الأنابيب. |
تحقق من عينة مشروع للحصول على المزيد من الاستخدامات التفصيلية.
في الأمثلة التالية ، يجب تعيين token
baseUrl
على عنوان URL لقاعدة مساحة العمل ، والذي يبدو وكأنه https://adb-<workspace-id>.<random-number>.azuredatabricks.net
DataBricks الرمز المميز للوصول الشخصي.
using ( var client = DatabricksClient . CreateClient ( baseUrl , token ) )
{
// ...
}
var clusterConfig = ClusterAttributes
. GetNewClusterConfiguration ( " Sample cluster " )
. WithRuntimeVersion ( RuntimeVersions . Runtime_10_4 )
. WithAutoScale ( 3 , 7 )
. WithAutoTermination ( 30 )
. WithClusterLogConf ( " dbfs:/logs/ " )
. WithNodeType ( NodeTypes . Standard_D3_v2 )
. WithClusterMode ( ClusterMode . SingleNode ) ;
var clusterId = await client . Clusters . Create ( clusterConfig ) ;
using Policy = Polly . Policy ;
static async Task WaitForCluster ( IClustersApi clusterClient , string clusterId , int pollIntervalSeconds = 15 )
{
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. OrResult < ClusterInfo > ( info => info . State is not ( ClusterState . RUNNING or ClusterState . ERROR or ClusterState . TERMINATED ) )
. WaitAndRetryForeverAsync (
_ => TimeSpan . FromSeconds ( pollIntervalSeconds ) ,
( delegateResult , _ ) =>
{
if ( delegateResult . Exception != null )
{
Console . WriteLine ( $" [ { DateTime . UtcNow : s } ] Failed to query cluster info - { delegateResult . Exception } " ) ;
}
} ) ;
await retryPolicy . ExecuteAsync ( async ( ) =>
{
var info = await clusterClient . Get ( clusterId ) ;
Console . WriteLine ( $" [ { DateTime . UtcNow : s } ] Cluster: { clusterId } t State: { info . State } t Message: { info . StateMessage } " ) ;
return info ;
} ) ;
}
await WaitForCluster ( client . Clusters , clusterId ) ;
await client . Clusters . Terminate ( clusterId ) ;
await WaitForCluster ( client . Clusters , clusterId ) ;
await client . Clusters . Delete ( clusterId ) ;
// Job schedule
var schedule = new CronSchedule
{
QuartzCronExpression = " 0 0 9 ? * MON-FRI " ,
TimezoneId = " Europe/London " ,
PauseStatus = PauseStatus . UNPAUSED
} ;
// Run with a job cluster
var newCluster = ClusterAttributes . GetNewClusterConfiguration ( )
. WithClusterMode ( ClusterMode . SingleNode )
. WithNodeType ( NodeTypes . Standard_D3_v2 )
. WithRuntimeVersion ( RuntimeVersions . Runtime_10_4 ) ;
// Create job settings
var jobSettings = new JobSettings
{
MaxConcurrentRuns = 1 ,
Schedule = schedule ,
Name = " Sample Job "
} ;
// Adding 3 tasks to the job settings.
var task1 = jobSettings . AddTask ( " task1 " , new NotebookTask { NotebookPath = SampleNotebookPath } )
. WithDescription ( " Sample Job - task1 " )
. WithNewCluster ( newCluster ) ;
var task2 = jobSettings . AddTask ( " task2 " , new NotebookTask { NotebookPath = SampleNotebookPath } )
. WithDescription ( " Sample Job - task2 " )
. WithNewCluster ( newCluster ) ;
jobSettings . AddTask ( " task3 " , new NotebookTask { NotebookPath = SampleNotebookPath } , new [ ] { task1 , task2 } )
. WithDescription ( " Sample Job - task3 " )
. WithNewCluster ( newCluster ) ;
// Create the job.
Console . WriteLine ( " Creating new job " ) ;
var jobId = await client . Jobs . Create ( jobSettings ) ;
Console . WriteLine ( " Job created: {0} " , jobId ) ;
// Start the job and retrieve the run id.
Console . WriteLine ( " Run now: {0} " , jobId ) ;
var runId = await client . Jobs . RunNow ( jobId ) ;
using Policy = Polly . Policy ;
static async Task WaitForRun ( IJobsApi jobClient , long runId , int pollIntervalSeconds = 15 )
{
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. OrResult < RunState > ( state =>
state . LifeCycleState is RunLifeCycleState . PENDING or RunLifeCycleState . RUNNING
or RunLifeCycleState . TERMINATING )
. WaitAndRetryForeverAsync (
_ => TimeSpan . FromSeconds ( pollIntervalSeconds ) ,
( delegateResult , _ ) =>
{
if ( delegateResult . Exception != null )
{
Console . WriteLine (
$" [ { DateTime . UtcNow : s } ] Failed to query run - { delegateResult . Exception } " ) ;
}
} ) ;
await retryPolicy . ExecuteAsync ( async ( ) =>
{
var ( run , _ ) = await jobClient . RunsGet ( runId ) ;
Console . WriteLine (
$" [ { DateTime . UtcNow : s } ] Run: { runId } t LifeCycleState: { run . State . LifeCycleState } t ResultState: { run . State . ResultState } t Completed: { run . IsCompleted } "
) ;
return run . State ;
} ) ;
}
await WaitForRun ( client . Jobs , runId ) ;
var ( run , _ ) = await client . Jobs . RunsGet ( runId ) ;
foreach ( var runTask in run . Tasks )
{
var viewItems = await client . Jobs . RunsExport ( runTask . RunId ) ;
foreach ( var viewItem in viewItems )
{
Console . WriteLine ( $" Exported view item from run { runTask . RunId } , task " { runTask . TaskKey } " , view " { viewItem . Name } " " ) ;
Console . WriteLine ( " ==================== " ) ;
Console . WriteLine ( viewItem . Content [ .. 200 ] + " ... " ) ;
Console . WriteLine ( " ==================== " ) ;
}
}
خلق نطاق سري
const string scope = " SampleScope " ;
await client . Secrets . CreateScope ( scope , null ) ;
إنشاء نص نص
var secretName = " secretkey.text " ;
await client . Secrets . PutSecret ( " secret text " , scope , secretName ) ;
إنشاء سر ثنائي
var secretName = " secretkey.bin " ;
await client . Secrets . PutSecret ( new byte [ ] { 0x01 , 0x02 , 0x03 , 0x04 } , scope , secretName ) ;
clusters/create
، jobs/run-now
، jobs/runs/submit
الرمز المميز لدعم واجهات برمجة التطبيقات. من المميز الاختياري ضمان عجز الطلبات. إذا كان هناك مورد (مجموعة أو تشغيل) مع الرمز المميز المتوفر بالفعل ، فإن الطلب لا ينشئ موردًا جديدًا ولكنه يعيد معرف المورد الحالي بدلاً من ذلك.
إذا قمت بتحديد رمز التعريف ، عند الفشل ، يمكنك إعادة إعادة المحاولة حتى ينجح الطلب. تضمن Databricks أن يتم إطلاق مورد واحد بالضبط باستخدام رمز Idempotency هذا.
يوضح الرمز التالي كيفية استخدام Polly لإعادة إعادة الطلب باستخدام idempotency_token
إذا فشل الطلب.
using Polly ;
double retryIntervalSec = 15 ;
string idempotencyToken = Guid . NewGuid ( ) . ToString ( ) ;
var clusterInfo = ClusterAttributes . GetNewClusterConfiguration ( " my-cluster " )
. WithNodeType ( " Standard_D3_v2 " )
. WithNumberOfWorkers ( 25 )
. WithRuntimeVersion ( RuntimeVersions . Runtime_7_3 ) ;
var retryPolicy = Policy . Handle < WebException > ( )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . BadGateway )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . InternalServerError )
. Or < ClientApiException > ( e => e . StatusCode == HttpStatusCode . ServiceUnavailable )
. Or < ClientApiException > ( e => e . Message . Contains ( " " error_code " : " TEMPORARILY_UNAVAILABLE " " ) )
. Or < TaskCanceledException > ( e => ! e . CancellationToken . IsCancellationRequested )
. WaitAndRetryForeverAsync ( _ => TimeSpan . FromSeconds ( retryIntervalSec ) ) ;
var clusterId = await retryPolicy . ExecuteAsync ( async ( ) => await client . Clusters . Create ( clusterInfo , idempotencyToken ) ) ;
أهداف V2 من المكتبة .NET 6 وقت التشغيل.
تم إعادة تصميم API للوظائف للتوافق مع الإصدار 2.1 من API REST.
في الإصدار السابق ، تدعم API Jobs مهمة واحدة فقط لكل وظيفة. تدعم واجهة برمجة تطبيقات Jobs الجديدة مهام متعددة لكل وظيفة ، حيث يتم تمثيل المهام باعتبارها DAG.
يدعم الإصدار الجديد نوعين أخريين من المهمة: مهمة Python Wheel ومهمة خط أنابيب Delta Live Tables.
يرحب هذا المشروع بالمساهمات والاقتراحات. تطلب منك معظم المساهمات الموافقة على اتفاقية ترخيص المساهم (CLA) مع إعلان أن لديك الحق في ذلك في الواقع ، ويفعلنا في الواقع حقوق استخدام مساهمتك. لمزيد من التفاصيل ، تفضل بزيارة اتفاقية ترخيص Microsoft (CLA).
عند إرسال طلب سحب ، ستحدد CLA-Bot تلقائيًا ما إذا كنت بحاجة إلى توفير CLA وتزيين العلاقات العامة بشكل مناسب (على سبيل المثال ، التسمية ، التعليق). ببساطة اتبع الإرشادات التي يقدمها الروبوت. ستحتاج فقط إلى القيام بذلك مرة واحدة عبر جميع عمليات إعادة الشراء باستخدام CLA لدينا.
اعتمد هذا المشروع رمز سلوك المصدر المفتوح Microsoft. لمزيد من المعلومات ، راجع مدونة الشهادة الأسئلة الشائعة أو الاتصال بـ [email protected] مع أي أسئلة أو تعليقات إضافية.