Azure Data Factory or Synapse Mapping Dataflowの変更フィードの紹介
はじめに
Flowlets and Change Feed now GA in Azure Data Factory
の2機能がGAしましたので、実用タイミングも踏まえて試してみます。
前提知識
What's Data Lake ? Azure Data Lake best practice
より、データレイクの層構造がわかるとユースケースが想像しやすいと思います。
※今回はrawにDelta Lakeなどを利用しない構成で考えてみてます。sparkが利用できる環境ではrawにはDelta Lakeを用いることをお勧めします。
ユースケース
以下のようなraw->entichのケースで使ってみます。
顧客マスタが毎日日付の入ったcsvで連携され、以下のゾーンを利用して統合されます。
raw:連携されたファイルをそのまま蓄積
enrich:rawに連携された 新規ファイル のみをdelta lakeとして変換し、upsert
手順
0. 環境とデータを準備
1 Synapse Analyticsをデプロイします。(ADFでもいいですが、datalakeと接続の作成が必要です)
2 データを準備します。
Azure Data Factory の Mapping Data Flow で CSV ファイルの重複行を削除するから参考にさせていただきました。
二日分のデータでID=1のレコードのみ更新がかかっています。
ファイル名:customer_20220301.csv
"ID","Email","Name"
"1","[email protected]","Yamada Hanako"
"2","[email protected]","Tanaka Taro"
ファイル名:customer_20220302.csv
"ID","Email","Name"
"1","[email protected]","Tanaka Hanako"
"2","[email protected]","Tanaka Taro"
1. Mapping Data flowを作成します。
1 ソースを追加します。
Data Lake Strage Gen2、Delimited textのデータセットを作成し、
以下のプロパティに設定します。
パスは raw/cf_customer とします。
サンプルファイルからスキーマのインポートを行いましょう。
2 変更データキャプチャの設定をオンにします。また、ファイル名も格納しておきましょう。この設定により、今後rawゾーンに置かれたファイルのうち、未処理のファイルのみが処理されます。
3 シンクを設定します。
Delta Lake は差分と表示されていますが、気にせず利用します。。。
パスは enrich/cf_customer とします。
4 データ追加時のアクションを設定します。
IDをキーにしてアップサートしましょう。これで最新の状態にマスタが更新されていきます。
キーカラムも忘れずに
5 案内にしたがい、行の変更の追加 をクリック後、すべての行がアップサート対象となるようにtrue()を設定します。
6 パイプラインを構成します。
2. データを配置し、初回の結果を確認します。
1 データをアップします。
2 デバッグ実行します。
3 Delta Lakeで対象フォルダをクエリして確認します。
3. 差分データを配置し、2回目の結果を確認します。
1 データをアップします。
2 デバッグ実行し、処理された行数を確認します。1ファイル分が処理されていることがわかります。
3 正常にデータが更新されました
補足
今回はアップされたデータすべて更新されるような方式ですが、連携データを更新分のみにするか、更新チェックをデータフローで実装すれば更新のコストを削減できると思います。
参考
以下の記載があるため、パイプラインの変更には注意
最後の実行からチェックポイントを常に記録して、そこから変更を取得できるよう、パイプラインとアクティビティ名は変更しないでください。 パイプライン名またはアクティビティ名を変更すると、チェックポイントがリセットされ、次の実行は最初から開始されます。
Author And Source
この問題について(Azure Data Factory or Synapse Mapping Dataflowの変更フィードの紹介), 我々は、より多くの情報をここで見つけました https://qiita.com/ryoma-nagata/items/dd318582e19394a90550著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .