ワークフローでDelta Live Tablesパイプラインを実行する


Run a Delta Live Tables pipeline in a workflow | Databricks on AWS [2022/1/12時点]の翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

Databricksジョブ、Apache Airflow、Azure Data Factoryのデータ処理ワークフローの一部としてDelta Live Tablesパイプラインを実行することができます。

ジョブ

データ処理ワークフローを実行するために、Databricksジョブのマルチタスクをオーケストレーションすることができます。ジョブにDelta Live Tablesパイプラインを含めるには、ジョブを作成する際にPipelineタスクを使用します。

Apache Airflow

Apache Airflowは、データワークフローを管理、スケジュールするためのオープンソースのソリューションです。Airflowはワークフローをオペレーションの有向非巡回グラフ(DAG)として表現します。Pythonでワークフローを定義して、Airflowは処理の実行とスケジューリングを管理します、DatabricksでAirflowをインストールし、活用するための情報についてはApache Airflowをご覧ください。

Airflowワークフローの一部としてDelta Live Tablesパイプラインを実行するためには、DatabricksSubmitRunOperatorを使用してください。

要件

AirflowのDelta Live Tablesサポートを使用するには、以下の要件を満足する必要があります。

  • Airflowバージョン2.1.0以降
  • Databricks providerパッケージバージョン2.1.0以降

サンプル

以下のサンプルでは、ID8279d543-063c-4d63-9926-dae38e35ce8bのDelta Live Tablesパイプラインのアップデートを起動するAirflow DAGを作成しています。

Python
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago

default_args = {
  'owner': 'airflow'
}

with DAG('dlt',
         start_date=days_ago(2),
         schedule_interval="@once",
         default_args=default_args
         ) as dag:

  opr_run_now=DatabricksSubmitRunOperator(
    task_id='run_now',
    databricks_conn_id='CONNECTION_ID',
    pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
  )

CONNECTION_IDをお使いのワークスペースのAirflowコネクションのIDで置き換えてください。

このサンプルをairflow/dagsディレクトリに保存し、DAGを参照・実行するためにはAirflowのUIを使用します。パイプラインのアップデートの詳細を参照するには、Delta Live TablesのUIを使用します。

Azure Data Factory

Azure Data Factoryは、データのインテグレーション、変換ワークフローのオーケストレーションを実現するクラウドベースのETLサービスです。Azure Data Factoryは、ノートブック、JARタスク、Pythonスクリプトを含むワークフローのDatabricksタスクの実行を直接サポートしています。Azure Data FactoryのWebアクティビティからDelta Live TablesのAPIを呼び出すことで、ワークフローにパイプラインを含めることができます。

  1. データファクトリーを作成するか、既存のデータファクトリーを開きます。

  2. 作成が完了したら、データファクトリーのページを開き、Open Azure Data Factory Studioタイルをクリックします。Azure Data FactoryのUIが表示されます。

  3. Databricksリンクサービスを作成します。

  4. Azure Data Factory StudioのUIのNewドロップダウンからPipelineを選択し、新規Azure Data Factoryパイプラインを作成します。

  5. Activitiesツールボックスで、Generalを展開し、パイプラインキャンバスにWebアクティビティをドラッグします。Settingsタブをクリックし、以下を入力します。

    • URL: https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates

    <databricks-instance>Databricksのワークスペースインスタンス名、例えば、dbc-a1b2345c-d6e7.cloud.databricks.comで置き換えてください。

    <pipeline-id>をパイプラインIDで置き換えてください。

    • Method: ドロップダウンからPOSTを選択します。
    • Headers: + Newをクリックします。NameテキストボックスにAuthorizationを入力します。ValueBearer <personal-access-token>を入力します。

    <personal-access-token>をDatabricksパーソナルアクセストークンで置き換えてください。

    • Body: 追加のリクエストパラメーターを指定するには、パラメーターを含むJSONドキュメントを入力します。例えば、パイプラインの全てのデータのアップデート、再処理を起動するには:{"full_refresh": "true"}を指定します。追加のリクエストパラメーターがない場合は、空の中括弧({})を指定します。

Webアクティビティをテストするには、Data Factory UIのパイプラインツールバーのDebugをクリックします。Azure Data FactoryパイプラインのOutputタブに、エラーを含む出力、実行ステータスが表示されます。パイプラインのアップデートの詳細を参照するにはDelta Live TablesのUIを使用します。

ティップス
一般的なワークフロー要件には、以前のタスクの完了後にタスクを起動するというものがあります。Delta Live Tablesのupdatesリクエストは非同期であり、リクエストはあっ苦デートが完了した後ではなくアップデートが起動した後に値を返します。Delta Live Tablesのアップデートに依存するお使いのAzure Data Factoryパイプラインは、アップデートが完了するまで待たなくてはなりません。アップデートの完了まで待つためのオプションは、Delta Live Tablesアップデートを起動するWebアクティビティの後にUntilアクティビティを追加するというものです。

  1. アップデートの完了後の待ち時間を、Waitアクティビティに秒数で指定します。
  2. Waitアクティビティの後に、アップデートの状態を取得するためにDelta Live TablesのGet update detailsリクエストを使用するWebアクティビティを追加します。レスポンスのstateフィールドには、アップデートの完了を含む、アップデートの状態が含まれます。
  3. Untilアクティビティの終了条件を指定するためにstateフィールドの値を使用します。stateの値に基づいてパイプラインの変数を追加し、この変数を終了条件に使用するためにSet Variableアクティビティを使用することもできます。

Databricks 無料トライアル

Databricks 無料トライアル