Azureストリーム解析論


概要


こんにちは👋 , このブログ記事では、AOURE Stream Analyticsを使ってストリーム処理ロジックを作成して、インターネットのデータを収集します.詳細については、Azure Stream Analyticsのアラート処理クエリについて説明します.しきい値に到達したときに警告をトリガーし、テレメトリが警告していないときにアラートをクリアします.

Azureストリーム解析の設定


Azure Stream Analyticsを設定するには、Azureアカウントが必要です.あなたが1を持たないならば、Azureアカウントhereをつくってください.
AzureポータルまたはCLIまたは他のIAC(コードとしてのインフラストラクチャ)ツールからAzure Stream Analyticsを設定できます.私は、開発とテスト目的のためにAzure Stream Analytics Tool extensionVisual Studio Codeをインストールすることを勧めます.Azure Stream Analyticsの仕事を設定するために、このdocumentationに従ってください.

ITOテレメトリーとアラート処理


さて、IOTテレメトリとアラート処理シナリオについて話しましょう.リアルタイムでデータの流れを放射しているセンサーを持っている工場で機械を持っている産業オートメーション会社を考えてください.このシナリオでは、プラントマネージャは、センサーデータからリアルタイムの洞察力を持っていて、アラートが起こるとき、行動を起こしたいです.閾値を設定することができますし、測定値が警告を識別するためにしきい値を超えているかどうかを検出します.このブログでは、センサーの測定値は-温度と湿度を考慮します.
我々は、以下の要件でアラートを識別するためにAzure Stream Analytics(ASA)質問を書くつもりです.
  • は、測定値が閾値を超えているときに警告を識別する.

  • イベントにEventhubまたはServiceBusのトピックにアラートイベントを送信します.
  • デザインゴール:測定値が閾値を横切って、電子メール警報でユーザーをスパムでないときだけ、警報は送られなければなりません.
  • 温度が80の閾値より下であるならば、警報は第2の記録だけで、そして、あらゆる記録に関して起こりません.
    { "time": "2021-08-20T20:47:53", "temp": "78",... "deviceId": "sensor-001" }
    { "time": "2021-08-20T20:47:54", "temp": "86",... "deviceId": "sensor-001" }
    { "time": "2021-08-20T20:47:55", "temp": "81",... "deviceId": "sensor-001" }
    
  • デバイスが正常に戻るときに“アラートクリア”イベントを送信します.
  • は、デバイスが構成されたエスカレーション持続時間のために警戒しているとき、「エスカレートされた」イベントを送ります.
  • ASAクエリパターンで使用可能なオプションは、上記の要件を実装するものを見てみましょう.

    テレメトリにおける警報の検出


    まず、すべての入力データを出力に渡すパススルークエリを作成します.この中で、私は開発中の明快さのためのサンプルJSONとして限定データを使用しました.あなたが同じことを使いたいならば、hereからデータをダウンロードしてください.
    SELECT
        *
    INTO
        [output]
    FROM
        [deviceTelemetry]
    
    
    テスト結果は

    ここで、温度が80より大きいとき、アラートを見つけるために解析学を適用しましょう.クエリの出力には、警告があるかどうかを示すHasAlertフラグがあります.
  • に注意してください、私たちは、ストリーム解析のクエリの閾値をハードコーディングする代わりに、動的に規則を適用するために参照入力を使用することができます.このブログの簡潔さのために、質問のために参照入力をする方法または私がもう一つのブログ柱でカバーする方法に関して、hereを参照してください.
  • WITH ProcessedTelemetry as (
        SELECT
            deviceId, 
            System.timestamp() as time,
            AVG(temp) as temp,
            CASE WHEN AVG(temp) > 80 THEN 1 ELSE 0 END as hasAlert
        FROM
            [deviceTelemetry] TIMESTAMP BY time
        GROUP BY 
            System.Timestamp(), deviceId -- Run immediately on every event or use any temporal window here
    )
    
    SELECT * FROM ProcessedTelemetry
    
    

    グッド!今、我々はイベントが警告しているかどうか知っている.我々は、テレメトリがアラート状態に入ったときのみ警告を送信する必要があります.上記の表では、デバイスがアラートに入る2行目でなければなりません.ASAクエリの解析関数とは何かを見てみましょう.
    ラグ-遅れ解析演算子は、特定の制約の範囲内でイベントストリームの「前」のイベントを調べることができます.変数の成長率を計算するのに非常に役に立ちます.そして、変数が閾値を横切るとき、または、条件が始まるか、本当であるのを止めるとき、検出します.
    次のクエリでは、遅延スコープ関数を使用して、テレスコープで前のイベントHasAlertフィールドを検索します.結果を実行すると、HasAlertとPreviousAlarmフィールドを見ることができます.PreviousAlarmは、ストリームの前のhasAlertフィールドとは何かを示します.
    WITH ProcessedTelemetry as (
        SELECT
            deviceId, 
            System.timestamp() as time,
            AVG(temp) as temp,
            CASE WHEN AVG(temp) > 80 THEN 1 ELSE 0 END as hasAlert
        FROM
            [deviceTelemetry] TIMESTAMP BY time
        GROUP BY 
            System.Timestamp(), deviceId -- Run immediately on every event
    ),
    TelemetryWithLastAlert as (
        SELECT
            deviceId, 
            time,
            temp,
            hasAlert,
            LAG(hasAlert) OVER (PARTITION BY deviceId LIMIT DURATION(day, 1)) as previousAlert
        FROM
            ProcessedTelemetry        
    )
    
    SELECT * FROM TelemetryWithLastAlert
    
    

    したがって、正確にデバイスがアラート状態に入ったときにイベントを検出するには、HasAlertとPreviousAlert状態を比較することによって問い合わせを簡単にします.
    WITH ProcessedTelemetry as (
        SELECT
            deviceId, 
            System.timestamp() as time,
            AVG(temp) as temp,
            CASE WHEN AVG(temp) > 80 THEN 1 ELSE 0 END as hasAlert
        FROM
            [deviceTelemetry] TIMESTAMP BY time
        GROUP BY 
            System.Timestamp(), deviceId -- Run immediately on every event
    ),
    TelemetryWithLastAlert as (
        SELECT
            deviceId, 
            time,
            temp,
            hasAlert,
            LAG(hasAlert) OVER (PARTITION BY deviceId LIMIT DURATION(day, 1)) as previousAlert
        FROM
            ProcessedTelemetry        
    )
    
    SELECT 
        deviceId, 
        time, 
      temp, 
      hasAlert 
    INTO
        alertHub
    FROM
        TelemetryWithLastAlert 
    WHERE
        hasAlert = 1 AND (previousAlert IS NULL OR previousAlert = 0)
    
    

    これまで、我々は我々の要件の1、1/2を見てきました.テレメトリのアラート状態を検出し、状態が正常から警告に変更されたときだけ、ハブまたはサービスバスの話題にイベントを引き起こす.

    アラートクリアイベント


    現在のアラート状態と以前のアラート状態の両方が利用可能であるため、デバイスが正常な状態に戻るときに同じようなクエリを行うことができます.
    ...
    AlertEvents as (
        SELECT 
            deviceId, 
            time, 
            temp, 
            hasAlert,
            'alertRaised' as eventType
        FROM
            TelemetryWithLastAlert 
        WHERE
            hasAlert = 1 AND (previousAlert IS NULL OR previousAlert = 0)
    
        UNION
    
        SELECT 
            deviceId, 
            time, 
            temp, 
            hasAlert,
            'alertCleared' as eventType
        FROM
            TelemetryWithLastAlert 
        WHERE
            hasAlert = 0 AND previousAlert = 1
    )
    
    SELECT * INTO alertHub FROM AlertEvents
    
    
    テスト結果は

    警告エスカレーションイベント


    要件に最終的な項目を実装するには、最終的な項目は-しきい値間隔後に警告をエスカレートすることです.ここでは、最後のAnalyticsクエリを使用してアラートが開始され、アラートの持続時間を見つける時間を見つけることができます.ですので、デバイスが設定された閾値以上のアラート状態でイベントをエスカレートできます.質問を拡張することができます-アラートがエスカレートされた場合にのみイベントを発生させる最後のエスカレーション期間を検索します.私はそれをさらに拡張するためにあなたのエクササイズのためにここにそれを残す.
    ラスト
    最後の解析演算子は、定義された制約内のイベントストリーム内の最新イベントを検索することができます.最後の既知の良い値(例えばNULLでない)を計算するようなシナリオで有用です.
    TelemetryWithDuration as (
        SELECT
            deviceId, 
            time,
            temp,
            hasAlert,
            previousAlert,
            DATEDIFF(second,LAST(time) OVER (PARTITION BY deviceId LIMIT DURATION(day, 1) WHEN hasAlert = 1 AND previousAlert = 0), time) as alertDuration
        FROM
            TelemetryWithLastAlert       
    ),
    
    

    ここでレポをチェックしてください.

    クシムムトゥ / デバイステレメトリー


    デバイステレメトリ分析


    完全なクエリ結果のサンプル

    結論


    結論として、Azure Stream Analytics Queryはリアルタイム分析によってリアルタイムで複雑なイベントを処理する能力を提供します.私たちはiOTストリーミングデータのリアルタイム分析を行うことによって必要な結果を達成するために広範な解析関数を使用することができます.このブログの記事では、アラート処理シナリオを探究-アラートを検出し、アラートをクリアし、イベントハブまたはサービスバスのトピックによって処理されるエスカレートイベントを警告します.
    私はクラウド、コンテナ、IOT、devpsについてたくさん書きます.もし興味があれば、あなたがもういないなら私についてきてください.私に従ってください、またはhttps://blog.sivamuthukumar.comで私のブログをチェックしてください.