Azure Purviewで特定クラスへの分類を検出した際に通知する


この記事でやること

この記事ではPurviewで特定のクラスに分類されたイベントを検知してSlackに通知します。

準備

通知先

通知先はTeamsでもメールでも何でも良いのですが今回はSlackにしています。事前に通知用のチャネルを作ったりアクセス トークンを用意したりします。ここなどを参考にしています。

サンプルデータ

テストデータはStarter Kitにあるファイルを利用しています。今回はクレジットカード番号とメールアドレスに分類された際に通知をするのでそれらを含んだデータを使いました。

"Password","email","id","creditcard"
"b4c906d48f60cdf6a70801d9c9182793a478e1a3a2cb03542676a759db5f843f","[email protected]","5647518a-2a93-4611-9746-f050fd0a19d6","201907394460151"
"c8ee38f64609cdd6c8b80dcf888cc80d49fb25817e78dde5bc1094c42b4fa2d8","[email protected]","575d3815-3747-4aa8-94a6-3e32d9885d9c","5038685806838408"
"2d67e5d4eab9004f745c2eaff169be12b6be6a6816eac70b8497bcfefa092f08","[email protected]","ef678e73-0b90-4612-9ad0-00401c24b3c8","67616352246247000"

Purviewでスキャンするとこんな感じになります。

実装

Slackクライアントの設定

事前に用意したアクセス トークンと通知先のチャネルIDを指定します。

from slack_sdk import WebClient
SLACK_ACCESS_TOKEN = 'アクセス トークン'
CHANNEL_ID = 'チャネルID'

# Slackクライアントを作成
slack_client = WebClient(SLACK_ACCESS_TOKEN)

EventHubの利用設定

EventHubの接続文字列はPurviewアカウントのプロパティの"アトラス Kafka エンドポイントのプライマリ接続文字列"から入手できます。(Endpoint=sb://atlas-xxx
・・・的なやつ)
EVENTHUB_NAMEは"atlas_entities"固定です。

import sys
import asyncio
import nest_asyncio # Jupyter Notebookでのエラー対応
from pprint import *
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

# Connection strings
EVENTHUB_CONNECTION_STR = 'EventHubの接続文字列'
EVENTHUB_NAME = 'atlas_entities'
STORAGE_CONNECTION_STR = 'ストレージアカウントの接続文字列'
BLOB_CONTAINER_NAME = 'コンテナ名'

EventHubから情報を取得した結果クレジットカード番号(MICROSOFT.FINANCIAL.CREDIT_CARD_NUMBER)とメールアドレス(MICROSOFT.PERSONAL.EMAIL)に該当したか判断するためのリストを設定。
サポートされる分類の一覧はここ。正式名称はPurview Studioから。

# 通知対象クラスの指定
class_list = ['MICROSOFT.FINANCIAL.CREDIT_CARD_NUMBER', 'MICROSOFT.PERSONAL.EMAIL']

メッセージの受信とSlackへの通知

async def on_event(partition_context, event):
    event_data = event.body_as_json()
    eventType = event_data['message']['operationType']

    # クラスが追加もしくは更新された場合
    if eventType == 'CLASSIFICATION_ADD' or eventType == 'CLASSIFICATION_UPDATE':
        classificationName = event_data['message']['entity']['classificationNames'][0]
        # 通知対象のクラスにマッチした場合
        if classificationName in class_list:
            message = f"""\
通知対象の情報({classificationName})が検出されました。
  name={event_data['message']['entity']['attributes']['name']}
  qualifiedName={event_data['message']['entity']['attributes']['qualifiedName']}
            """

            # Slackに通知
            slack_client.chat_postMessage(channel=CHANNEL_ID, text=message, icon_emoji=':whale:', username='Purview Notice')

    await partition_context.update_checkpoint(event)
async def receive(client):
    await client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
async def main():
    print("Starting stream...", file=sys.stderr)
    checkpoint_store = BlobCheckpointStore.from_connection_string(STORAGE_CONNECTION_STR, BLOB_CONTAINER_NAME)
    client = EventHubConsumerClient.from_connection_string(
        EVENTHUB_CONNECTION_STR,
        consumer_group="$Default",
        eventhub_name=EVENTHUB_NAME,
        checkpoint_store=checkpoint_store,  # For load-balancing and checkpoint. Leave None for no load-balancing.
    )
    async with client:
        await receive(client)

非同期処理がNotebookでエラーとなったので、nest_asyncio.apply()を呼んでいます。

if __name__ == '__main__':
    print("Python process started.", file=sys.stderr)
    nest_asyncio.apply()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

参考

EventHubからの非同期のデータ受信なども含めてここが参考になります。