Delta Live Tablesによるチェンジデータキャプチャ(CDC)


Change data capture with Delta Live Tables | Databricks on AWS [2022/2/8時点]の翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

プレビュー
この機能はパブリックプレビューです。サインアップするにはRequest Access to Delta Live Tablesにアクセスしてください。

ソースデータの変更に基づいてテーブルをアップデートするために、Delta Live Tablesでチェンジデータキャプチャ(CDC)を利用することができます。CDCはDelta Live TablesのSQLおよびPythonインタフェースでサポートされています。

要件

Delta Live Tables CDCを使用するためには、パイプライン設定に以下の設定を追加することで、それぞれのパイプラインで機能を有効化する必要があります。

JSON
{
  "configuration": {
    "pipelines.applyChangesPreviewEnabled": "true"
  }
}

SQL

Delta Live Tables CDCの機能を使用するためには、APPLY CHANGES INTO文を使用します。

SQL
APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[WHERE condition]
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
KEYS

ソースデータで行を一意に特定するカラム、あるいはカラムの組み合わせです。これは、ターゲットテーブルの特定のレコードに適用されるCDCイベントの特定に使用されます。

この句は必須です。
WHERE

パーティションプルーニングのような最適化処理を起動するために、ソース、ターゲットテーブルの両方に適用される条件です。この条件はソースのレコードを削除することに使用することはできません。ソーステーブルのすべてのCDCレコードはこの条件を満足する必要があり、そうでない場合にはエラーが発生します。WHERE句の使用は任意であり、お使いの処理で特定の最適化が必要な場合に使用されるべきです。

この句は任意です。
IGNORE NULL UPDATES

ターゲットカラムのサブセットに対する取り込み更新を許可します。CDCイベントが既存のレコードと合致し、IGNORE NULL UPDATESが指定されている場合、nullを含むカラムは、ターゲットにおける既存の値を維持します。これはnull値を持つネストされたカラムにも適用されます。

この句は任意です。

デフォルトでは、null値を持つ既存カラムを上書きします。
APPLY AS DELETE WHEN

CDCイベントがupsertではなくDELETEとして取り扱われるべきタイミングを指定します。out-of-orderのデータを取り扱うために、削除されたレコードは背後のDeltaテーブルで一時的にtombstoneとして保持され、これらのtombstoneを表示するビューがメタストアに作成されます。保持期間はテーブルプロパティpipelines.cdc.tombstoneGCThresholdInSecondsで設定することができます。

この句は任意です。
SEQUENCE BY

ソースデータにおけるCDCイベントの論理的順序を指定するカラム名です。Delta Live Tablesは、out-of-orderな変更イベントを取り扱うために、この順序を使用します。

この句は任意です。
COLUMNS

ターゲットテーブルに含めるカラムのサブセットを指定します。以下のいずれかが可能です。
  • 含めるカラムの完全なリストを指定:COLUMNS (userId, name, city)
  • 除外するカラムのリストを指定:COLUMNS * EXCEPT (operation, sequenceNum)
この句は任意です。

デフォルトでは、COLUMNS句が指定されない場合、ターゲットテーブルには全てのカラムが含まれます。

INSERTUPDATEイベントのデフォルトの挙動は、ソースからのCDCイベントをupsertするというものです:指定されたキーにマッチするターゲットテーブルの行を更新する、あるいはターゲットテーブルにマッチする行がない場合には行を挿入します。DELETEイベントの挙動をAPPLY AS DELETE WHEN条件で指定することも可能です。

Python

Delta Live Tables CDC機能を使うためには、Python APIのapply_changes()関数を使用します。

Python
apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_delete = None,
  column_list = None,
  except_column_list = None
)
引数
target
Type:str
更新されるテーブル名です。
このパラメーターは必須です。
source
Type:str
CDCレコードを含むデータソースです。
このパラメーターは必須です。
keys
Type:list
ソースデータで行を一意に特定するカラム、あるいはカラムの組み合わせです。これは、ターゲットテーブルの特定のレコードに適用されるCDCイベントの特定に使用されます。

以下のいずれかの方法で指定できます:
  • 文字列のリスト:["userId", "orderId"]
  • Spark SQLのcol関数のリスト:[col("userId"), col("orderId"]
col()関数の引数にはクオリファイアを含めることはできません。例えば、col(userId)は使えますが、col(source.userId)は使えません。

このパラメーターは必須です。
sequence_by
Type:strあるいはcol()
ソースデータにおけるCDCイベントの論理的順序を指定するカラム名です。Delta Live Tablesは、out-of-orderな変更イベントを取り扱うために、この順序を使用します。

以下のいずれかの方法で指定できます:
  • 文字列:"sequenceNum"
  • Spark SQLのcol関数:col("sequenceNum")
col()関数の引数にはクオリファイアを含めることはできません。例えば、col(userId)は使えますが、col(source.userId)は使えません。

このパラメーターは必須です。
ignore_null_updates
Type:bool
ターゲットカラムのサブセットに対する取り込み更新を許可します。CDCイベントが既存のレコードと合致し、ignore_null_updatesTrueの場合、nullを含むカラムは、ターゲットにおける既存の値を維持します。これはnull値を持つネストされたカラムにも適用されます。ignore_null_updatesFalseの場合、既存の値はnullで上書きされます。

このパラメーターは任意です。

デフォルトはFalseです。
apply_as_delete
Type:strあるいはexpr()
CDCイベントがupsertではなくDELETEとして取り扱われるべきタイミングを指定します。out-of-orderのデータを取り扱うために、削除されたレコードは背後のDeltaテーブルで一時的にtombstoneとして保持され、これらのtombstoneを表示するビューがメタストアに作成されます。保持期間はテーブルプロパティpipelines.cdc.tombstoneGCThresholdInSecondsで設定することができます。

以下のいずれかの方法で指定できます。
  • 文字列:"Operation = 'DELETE'"
  • Spark SQLのexpr()関数:expr("Operation = 'DELETE'")
このパラメーターは任意です。
column_list except_column_list
Type:list
ターゲットテーブルに含めるカラムのサブセットを指定します。含めるカラムの完全なリストを指定するにはcolumn_listを使用します。除外するカラムを指定するにはexcept_column_listを使用します。文字列のリストあるいはSpark SQLのcol()関数で値を宣言することができます
  • column_list = ["userId", "name", "city"]
  • column_list = [col("userId"), col("name"), col("city")]
  • except_column_list = ["operation", "sequenceNum"]
  • except_column_list = [col("operation"), col("sequenceNum")
col("sequenceNum")col()関数の引数にはクオリファイアを含めることはできません。例えば、col(userId)は使えますが、col(source.userId)は使えません。

このパラメーターは任意です。

column_listexcept_column_list引数が関数に渡されていない場合、デフォルトではターゲットテーブルの全てのカラムが含まれます。

INSERTUPDATEイベントのデフォルトの挙動は、ソースからのCDCイベントをupsertするというものです:指定されたキーにマッチするターゲットテーブルの行を更新する、あるいはターゲットテーブルにマッチする行がない場合には行を挿入します。DELETEイベントの挙動を引数apply_as_deleteで指定することも可能です。

注意

  • APPLY CHANGES INTOクエリーやapply_changes関数を実行する前に、ターゲットテーブルが作成されていることを確認してください。サンプルクエリーを参照ください。
  • 出力行数などターゲットテーブルのメトリクスは利用できません。
  • スキーマエボリューションはサポートされていません。
  • APPLY CHANGES INTOクエリーやapply_changes関数ではエクスペクテーションはサポートされていません。ソース、ターゲットデータセットにエクスペクテーションを適用するには、以下の手順を踏んでください。
    • 必要なエクスペクテーションを持つ中間テーブルを定義することでソースデータにエクスペクテーションを追加し、このデータセットをターゲットテーブルに対するソースとします。
    • ターゲットテーブルから入力データを読み込む下流のテーブルを用いてターゲットデータに対するエクスペクテーションを追加します。

テーブルプロパティ

以下のテーブルプロパティは、DELETEイベントにおけるtombstone管理を制御するために追加されています。

テーブルプロパティ
pipelines.cdc.tombstoneGCThresholdInSeconds
out-of-orderのデータに期待する最大の間隔にマッチする値を設定します。
pipelines.cdc.tombstoneGCFrequencyInSeconds
tombstoneとチェックされたデータのクリーンアップの頻度を指定します。
デフォルト:5分

サンプル

このサンプルでは、以下のソースイベントに基づいてターゲットテーブルをアップデートするDelta Live Tablesのクエリーをデモします。

  1. 新規ユーザーのレコードを作成
  2. ユーザーレコードを削除
  3. ユーザーレコードを更新。out-of-orderのイベントの取り扱いをデモするために、最後のUPDATEオペレーションは遅れて到着したので、ターゲットテーブルからは除外されます。
userId name city operation sequenceNum
124 Raul Oaxaca INSERT 1
123 Isabel Monterrey INSERT 1
125 Mercedes Tijuana INSERT 2
123 null null DELETE 5
125 Mercedes Guadalajara UPDATE 5
125 Mercedes Mexicali UPDATE 4
123 Isabel Chihuahua UPDATE 4

このサンプルのクエリーを実行した後では、ターゲットテーブルには以下のレコードが含まれることになります。

userId name city
124 Raul Oaxaca
125 Mercedes Guadalajara

クエリー

SQL
-- Create sample input events.
CREATE LIVE TABLE
  users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  -- One new user.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 5),
  (125, "Mercedes", "Guadalajara", "UPDATE", 5),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
  (123, "Isabel",   "Chihuahua",   "UPDATE", 4),
  (125, "Mercedes", "Mexicali",    "UPDATE", 4)
);

-- Create and populate the target table.
CREATE INCREMENTAL LIVE TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(live.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum);

Databricks 無料トライアル

Databricks 無料トライアル