Delta Live Tablesによるチェンジデータキャプチャ(CDC)
Change data capture with Delta Live Tables | Databricks on AWS [2022/2/8時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
プレビュー
この機能はパブリックプレビューです。サインアップするにはRequest Access to Delta Live Tablesにアクセスしてください。
ソースデータの変更に基づいてテーブルをアップデートするために、Delta Live Tablesでチェンジデータキャプチャ(CDC)を利用することができます。CDCはDelta Live TablesのSQLおよびPythonインタフェースでサポートされています。
要件
Delta Live Tables CDCを使用するためには、パイプライン設定に以下の設定を追加することで、それぞれのパイプラインで機能を有効化する必要があります。
{
"configuration": {
"pipelines.applyChangesPreviewEnabled": "true"
}
}
SQL
Delta Live Tables CDCの機能を使用するためには、APPLY CHANGES INTO
文を使用します。
APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[WHERE condition]
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
句 |
---|
KEYS ソースデータで行を一意に特定するカラム、あるいはカラムの組み合わせです。これは、ターゲットテーブルの特定のレコードに適用されるCDCイベントの特定に使用されます。 この句は必須です。 |
WHERE パーティションプルーニングのような最適化処理を起動するために、ソース、ターゲットテーブルの両方に適用される条件です。この条件はソースのレコードを削除することに使用することはできません。ソーステーブルのすべてのCDCレコードはこの条件を満足する必要があり、そうでない場合にはエラーが発生します。WHERE句の使用は任意であり、お使いの処理で特定の最適化が必要な場合に使用されるべきです。 この句は任意です。 |
IGNORE NULL UPDATES ターゲットカラムのサブセットに対する取り込み更新を許可します。CDCイベントが既存のレコードと合致し、IGNORE NULL UPDATESが指定されている場合、 null を含むカラムは、ターゲットにおける既存の値を維持します。これはnull 値を持つネストされたカラムにも適用されます。この句は任意です。 デフォルトでは、 null 値を持つ既存カラムを上書きします。 |
APPLY AS DELETE WHEN CDCイベントがupsertではなく DELETE として取り扱われるべきタイミングを指定します。out-of-orderのデータを取り扱うために、削除されたレコードは背後のDeltaテーブルで一時的にtombstoneとして保持され、これらのtombstoneを表示するビューがメタストアに作成されます。保持期間はテーブルプロパティのpipelines.cdc.tombstoneGCThresholdInSeconds で設定することができます。この句は任意です。 |
SEQUENCE BY ソースデータにおけるCDCイベントの論理的順序を指定するカラム名です。Delta Live Tablesは、out-of-orderな変更イベントを取り扱うために、この順序を使用します。 この句は任意です。 |
COLUMNS ターゲットテーブルに含めるカラムのサブセットを指定します。以下のいずれかが可能です。
デフォルトでは、 COLUMNS 句が指定されない場合、ターゲットテーブルには全てのカラムが含まれます。 |
INSERT
、UPDATE
イベントのデフォルトの挙動は、ソースからのCDCイベントをupsertするというものです:指定されたキーにマッチするターゲットテーブルの行を更新する、あるいはターゲットテーブルにマッチする行がない場合には行を挿入します。DELETE
イベントの挙動をAPPLY AS DELETE WHEN
条件で指定することも可能です。
Python
Delta Live Tables CDC機能を使うためには、Python APIのapply_changes()
関数を使用します。
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_delete = None,
column_list = None,
except_column_list = None
)
引数 |
---|
target Type: str 更新されるテーブル名です。 このパラメーターは必須です。 |
source Type: str CDCレコードを含むデータソースです。 このパラメーターは必須です。 |
keys Type: list ソースデータで行を一意に特定するカラム、あるいはカラムの組み合わせです。これは、ターゲットテーブルの特定のレコードに適用されるCDCイベントの特定に使用されます。 以下のいずれかの方法で指定できます:
col() 関数の引数にはクオリファイアを含めることはできません。例えば、col(userId) は使えますが、col(source.userId) は使えません。このパラメーターは必須です。 |
sequence_by Type: str あるいはcol() ソースデータにおけるCDCイベントの論理的順序を指定するカラム名です。Delta Live Tablesは、out-of-orderな変更イベントを取り扱うために、この順序を使用します。 以下のいずれかの方法で指定できます:
col() 関数の引数にはクオリファイアを含めることはできません。例えば、col(userId) は使えますが、col(source.userId) は使えません。このパラメーターは必須です。 |
ignore_null_updates Type: bool ターゲットカラムのサブセットに対する取り込み更新を許可します。CDCイベントが既存のレコードと合致し、 ignore_null_updates がTrue の場合、null を含むカラムは、ターゲットにおける既存の値を維持します。これはnull 値を持つネストされたカラムにも適用されます。ignore_null_updates がFalse の場合、既存の値はnull で上書きされます。このパラメーターは任意です。 デフォルトは False です。 |
apply_as_delete Type: str あるいはexpr() CDCイベントがupsertではなく DELETE として取り扱われるべきタイミングを指定します。out-of-orderのデータを取り扱うために、削除されたレコードは背後のDeltaテーブルで一時的にtombstoneとして保持され、これらのtombstoneを表示するビューがメタストアに作成されます。保持期間はテーブルプロパティのpipelines.cdc.tombstoneGCThresholdInSeconds で設定することができます。以下のいずれかの方法で指定できます。
|
column_list except_column_list Type: list ターゲットテーブルに含めるカラムのサブセットを指定します。含めるカラムの完全なリストを指定するには column_list を使用します。除外するカラムを指定するにはexcept_column_list を使用します。文字列のリストあるいはSpark SQLのcol() 関数で値を宣言することができます
col("sequenceNum") col() 関数の引数にはクオリファイアを含めることはできません。例えば、col(userId) は使えますが、col(source.userId) は使えません。このパラメーターは任意です。 column_list やexcept_column_list 引数が関数に渡されていない場合、デフォルトではターゲットテーブルの全てのカラムが含まれます。 |
INSERT
、UPDATE
イベントのデフォルトの挙動は、ソースからのCDCイベントをupsertするというものです:指定されたキーにマッチするターゲットテーブルの行を更新する、あるいはターゲットテーブルにマッチする行がない場合には行を挿入します。DELETE
イベントの挙動を引数apply_as_delete
で指定することも可能です。
注意
APPLY CHANGES INTO
クエリーやapply_changes
関数を実行する前に、ターゲットテーブルが作成されていることを確認してください。サンプルクエリーを参照ください。- 出力行数などターゲットテーブルのメトリクスは利用できません。
- スキーマエボリューションはサポートされていません。
APPLY CHANGES INTO
クエリーやapply_changes
関数ではエクスペクテーションはサポートされていません。ソース、ターゲットデータセットにエクスペクテーションを適用するには、以下の手順を踏んでください。
- 必要なエクスペクテーションを持つ中間テーブルを定義することでソースデータにエクスペクテーションを追加し、このデータセットをターゲットテーブルに対するソースとします。
- ターゲットテーブルから入力データを読み込む下流のテーブルを用いてターゲットデータに対するエクスペクテーションを追加します。
テーブルプロパティ
以下のテーブルプロパティは、DELETE
イベントにおけるtombstone管理を制御するために追加されています。
テーブルプロパティ |
---|
pipelines.cdc.tombstoneGCThresholdInSeconds out-of-orderのデータに期待する最大の間隔にマッチする値を設定します。 |
pipelines.cdc.tombstoneGCFrequencyInSeconds tombstoneとチェックされたデータのクリーンアップの頻度を指定します。 デフォルト:5分 |
サンプル
このサンプルでは、以下のソースイベントに基づいてターゲットテーブルをアップデートするDelta Live Tablesのクエリーをデモします。
- 新規ユーザーのレコードを作成
- ユーザーレコードを削除
- ユーザーレコードを更新。out-of-orderのイベントの取り扱いをデモするために、最後の
UPDATE
オペレーションは遅れて到着したので、ターゲットテーブルからは除外されます。
userId | name | city | operation | sequenceNum |
---|---|---|---|---|
124 | Raul | Oaxaca | INSERT | 1 |
123 | Isabel | Monterrey | INSERT | 1 |
125 | Mercedes | Tijuana | INSERT | 2 |
123 | null | null | DELETE | 5 |
125 | Mercedes | Guadalajara | UPDATE | 5 |
125 | Mercedes | Mexicali | UPDATE | 4 |
123 | Isabel | Chihuahua | UPDATE | 4 |
このサンプルのクエリーを実行した後では、ターゲットテーブルには以下のレコードが含まれることになります。
userId | name | city |
---|---|---|
124 | Raul | Oaxaca |
125 | Mercedes | Guadalajara |
クエリー
-- Create sample input events.
CREATE LIVE TABLE
users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(123, "Isabel", "Monterrey", "INSERT", 1),
(124, "Raul", "Oaxaca", "INSERT", 1),
-- One new user.
(125, "Mercedes", "Tijuana", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 5),
(125, "Mercedes", "Guadalajara", "UPDATE", 5),
-- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
(123, "Isabel", "Chihuahua", "UPDATE", 4),
(125, "Mercedes", "Mexicali", "UPDATE", 4)
);
-- Create and populate the target table.
CREATE INCREMENTAL LIVE TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(live.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum);
Databricks 無料トライアル
Author And Source
この問題について(Delta Live Tablesによるチェンジデータキャプチャ(CDC)), 我々は、より多くの情報をここで見つけました https://qiita.com/taka_yayoi/items/5319b9b010ce4703e98a著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .