Pythonを使用したAzure IOTデバイスの実装


Azure IOTについての良いことは、C、C - CHEN、Java、Nodeを含む一般的なプログラミング言語で既に利用可能なSDKがあることです.JSとPythonは、したがって、我々のデバイスと好ましい言語を満たす必要があります.特定のSDKの詳細については、次のリンクを参照してください.
  • Constrained Device SDKs
  • Unconstrained Device SDKs
  • このブログのポストでは、ハブからイベントを受け取るために他の間、パイソンデバイスSDKを使用しているカップルAzure Iotデバイスを作成することに焦点があります.
    シナリオは、アラームエージェントによって管理されるアラームを持っているということです.アラームの状態が変化すると、アラームエージェントは、アラームステータスをIOTハブに送信します.IOTハブは、アラーム監視エージェントに警報状態を通過する.

    必要なもの

  • 紺碧の口座
  • アジュールパワーシェル
  • アクティブAzure IOTハブ
  • Python 3環境
  • 必要なIOTデバイスの作成と接続の確認


    Azure IOTデバイスを作成しましょうAlarm Agent and Alarm Monitor Agent .
    PowerShellコンソールを開き、Azure PowerShellを使用してサブスクリプションで署名してください.その後、次に示すようにデバイスを作成します.
        PS D:\Workspace\IoTHub> $AlarmAgent= Add-AzIotHubDevice -ResourceGroupName "rg-sea-multilinks" -IotHubName "ih-sea-multilinks" -DeviceId "AlarmAgent" -AuthMethod "shared_private_key"
        PS D:\Workspace\IoTHub>
        PS D:\Workspace\IoTHub> $AlarmMonitorAgent = Add-AzIotHubDevice -ResourceGroupName "rg-sea-multilinks" -IotHubName "ih-sea-multilinks" -DeviceId "AlarmMonitorAgent" -AuthMethod "shared_private_key"
        PS D:\Workspace\IoTHub>
    
    次に、接続文字列を取得し、後で使用できる環境変数に割り当てます.
        PS D:\Workspace\IoTHub> $TempObject = Get-AzIotHubDeviceConnectionString -ResourceGroupName "rg-sea-multilinks" -IotHubName "ih-sea-multilinks" -DeviceId "AlarmAgent" -KeyType secondary
        PS D:\Workspace\IoTHub> $Env:AlarmAgentConnectionString = $TempObject.ConnectionString
        PS D:\Workspace\IoTHub> $TempObject = Get-AzIotHubDeviceConnectionString -ResourceGroupName "rg-sea-multilinks" -IotHubName "ih-sea-multilinks" -DeviceId "AlarmMonitorAgent" -KeyType secondary
        PS D:\Workspace\IoTHub> $Env:AlarmMonitorAgentConnectionString = $TempObject.ConnectionString
        PS D:\Workspace\IoTHub> $Env:AlarmAgentName = "AlarmAgent"
        PS D:\Workspace\IoTHub> $Env:AlarmMonitorAgentName = "AlarmMonitorAgent"
        PS D:\Workspace\IoTHub>
    
    IOTハブと相互作用するための基底クラスを作成します.この時点で、それはちょうど接続して切断することができる必要があります.
        # DeviceAgent.py
    
        from azure.iot.device.aio import IoTHubDeviceClient
    
        class DeviceAgent:
            def __init__(self, conn_str, agent_name, logging):
                self.agent_name = agent_name
                self.logging = logging
    
                self.device_client = IoTHubDeviceClient.create_from_connection_string(conn_str)
    
            async def connect(self):
                await self.device_client.connect()
    
            async def disconnect(self):
                await self.device_client.disconnect()
    
            def is_connected(self):
                return self.device_client.connected
    
    今すぐ作成AlarmAgentDeviceAgent
        # AlarmAgent.py
    
        from DeviceAgent import DeviceAgent
    
        class AlarmAgent(DeviceAgent):
            def __init__(self, conn_str, agent_name, destination_name, logging):
                super(AlarmAgent, self).__init__(conn_str, agent_name, logging)
                self.destination_name = destination_name
    
    and AlarmMonitorAgent .
        # AlarmMonitorAgent.py
    
        from DeviceAgent import DeviceAgent
    
        class AlarmMonitorAgent(DeviceAgent):
            def __init__(self, conn_str, agent_name, logging):
                super(AlarmMonitorAgent, self).__init__(conn_str, agent_name, logging)
    
    今iotハブとの対話をメインアプリケーションコードを作成します.
        # App.py
    
        import asyncio
        import os
        import logging
        import sys
        from time import gmtime
    
        from AlarmAgent import AlarmAgent
        from AlarmMonitorAgent import AlarmMonitorAgent
    
        REQUIRED_DEVICES = 2
    
        logging.basicConfig(filename='events.log', encoding='utf-8', format='%(asctime)s %(message)s', level=logging.INFO)
        logging.Formatter.converter = gmtime
    
        def all_devices_are_connected(devices):
            return len(devices) == REQUIRED_DEVICES
    
        async def disconnect_devices(devices):
            coroutines = []
            for device in devices:
                coroutines.append(device.disconnect())
            await asyncio.gather(*coroutines)
    
        async def main():
    
            alarm_agent = None
            alarm_monitor_agent = None
    
            connected_devices = []
    
            try:
                logging.info("App Started...")
    
                # Get neccessary application configs
                alarm_agent_conn_str = os.getenv("AlarmAgentConnectionString")
                alarm_monitor_agent_conn_str = os.getenv("AlarmMonitorAgentConnectionString")
                alarm_agent_name = os.getenv("AlarmAgentName")
                alarm_monitor_agent_name = os.getenv("AlarmMonitorAgentName")
    
                # Connect our devices to IoT hub
                alarm_agent = AlarmAgent(alarm_agent_conn_str, alarm_agent_name, alarm_monitor_agent_name, logging)
                alarm_monitor_agent = AlarmMonitorAgent(alarm_monitor_agent_conn_str, alarm_monitor_agent_name, logging)
                await asyncio.gather(alarm_agent.connect(), alarm_monitor_agent.connect())
    
                if alarm_agent.is_connected():
                    connected_devices.append(alarm_agent)
    
                if alarm_monitor_agent.is_connected():
                    connected_devices.append(alarm_monitor_agent)
    
                if all_devices_are_connected(connected_devices) is False:
                    await disconnect_devices(connected_devices)
    
                    # Can't do much if AlarmAgent and AlarmMonitorAgent are not both connect
                    logging.error("Failed to connect necessary devices.")
                    sys.exit()
    
                logging.info("All devices are connected.")
                await asyncio.sleep(5)
    
                # Nothing else to do, clean up as required.
                await disconnect_devices(connected_devices)
    
                logging.info("App Stopped!")
            except Exception:
                logging.error(Exception.with_traceback())
    
        if __name__ == "__main__":
            asyncio.get_event_loop().run_until_complete(main())
    
    では、上記のコードをすばやくまとめてみましょう.
  • 環境変数として格納されたneccessaryアプリケーションの設定を読んでください.
  • それぞれのクラスに必要なパラメータを渡すことでデバイスをインスタンス化します.
  • デバイスを接続しますasyncio.gather() 私たちは同時に接続を開始することができますconnect() いずれにしても非同期的にメソッド).
  • 一度asyncio.gather() 完了し、両方のデバイスが接続されていることを確認します.
  • 両方のデバイスが接続されていない場合は、接続して終了するものを切断することはできません.
  • それ以外の場合は、5秒間スリープして切断して終了します.
  • それは、この時点で非常に簡単な受精テストです.すべてが順調に進んだならば、我々は両方の装置が首尾よくIOTハブに接続したことをログで見るべきです.

    新しいアラーム状態を報告する


    ここで焦点はIOTハブに送られるデータを準備することです.
    更新しましょうAlarmAgent したがって、それはランダムにアラームステータスをシミュレートし、新しいステータスが現在のステータスとは異なる場合は、それを報告するAlarmMonitorAgent .
    変更点DeviceAgent
        # DeviceAgent.py
    
        ...
    
        from enum import Enum
    
        class DeviceAgentState(Enum):
            UNKNOWN = 1
            IDLE = 2
            NEW_EVENT_TO_PROCESS = 3
            PROCESSING_DEVICE_TO_CLOUD_STARTED = 4
            PROCESSING_DEVICE_TO_CLOUD_COMPLETED = 5
    
        class DeviceAgent:
            def __init__(self, conn_str, agent_name, logging):
                ...
                self.state = DeviceAgentState.IDLE
    
            ...
    
            async def send_parcel(self, parcel):
                await self.device_client.send_message(parcel)
    
    変更点AlarmAgent
        # AlarmAgent.py
    
        ...
    
        import random
        from datetime import datetime, timezone
    
        from DeviceAgent import DeviceAgentState
        from pb_AlarmStatus_pb2 import pb_AlarmStatus
        from pb_Parcel_pb2 import pb_Parcel
    
        class AlarmAgent(DeviceAgent):
            def __init__(self, conn_str, agent_name, destination_name, logging):
                ...
                self.current_alarm_status = False
                self.new_alarm_status = False
                self.parcel_to_send = None
    
            ...
    
            def simulate_event(self):
                return bool(random.getrandbits(1))
    
            def pack_alarm_event(self):
                self.logging.info("Packing event: %r" % self.current_alarm_status)
                date_time_utc = datetime.now(timezone.utc)
                alarm_status = pb_AlarmStatus()
                alarm_status.alarm_active = self.current_alarm_status
                alarm_status.time_utc = "{}".format(date_time_utc.time())
                alarm_status.date_utc = "{}".format(date_time_utc.date())
    
                self.logging.info("*************************************************************************")
                self.logging.info("Packing event: Active: %r, time: %r, date: %r" % (alarm_status.alarm_active, alarm_status.time_utc, alarm_status.date_utc))
                self.logging.info("*************************************************************************")
    
                parcel = pb_Parcel()
                parcel.source.name = "Door Alarm"
                parcel.source.local_id = "1"
                parcel.source.domain_agent = self.agent_name
                parcel.source.domain = "Device Domain"
                parcel.destination.name = "Alarm Monitor"
                parcel.type = "pb_AlarmStatus"
                parcel.content = str(alarm_status.SerializeToString(), 'utf-8')
                self.parcel_to_send = str(parcel.SerializeToString(), 'utf-8')
    
            async def do_work(self):
                if self.state is DeviceAgentState.IDLE or self.state is DeviceAgentState.PROCESSING_DEVICE_TO_CLOUD_COMPLETED:
                    self.new_alarm_status = self.simulate_event()
    
                    if self.new_alarm_status == self.current_alarm_status:
                        self.state = DeviceAgentState.IDLE
                    else:
                        self.current_alarm_status = self.new_alarm_status
                        self.state = DeviceAgentState.NEW_EVENT_TO_PROCESS
                    return
    
                if self.state is DeviceAgentState.NEW_EVENT_TO_PROCESS:
                    self.pack_alarm_event()
                    self.state = DeviceAgentState.PROCESSING_DEVICE_TO_CLOUD_STARTED
                    return
    
                if self.state is DeviceAgentState.PROCESSING_DEVICE_TO_CLOUD_STARTED:
                    await self.send_parcel(self.parcel_to_send)
                    await asyncio.sleep(0.5) # Delayed for test
                    self.parcel_to_send = None
                    self.state = DeviceAgentState.PROCESSING_DEVICE_TO_CLOUD_COMPLETED
                    return
    
    変更点App
        # App.py
    
        ...
    
        import time
    
        ...
    
        async def main():
    
            ...
    
            try:
                ...
    
                start_time = time.time()
    
                while True:
                    await alarm_agent.do_work()
    
                    # Let alarm agent runs for 5 sec
                    if (time.time() - start_time) > 5:
                        break
    
                ...
    
                # Give alarm monitor some time before cleaning up connections.
                await asyncio.sleep(5)
    
        if __name__ == "__main__":
            asyncio.get_event_loop().run_until_complete(main())
    
    上のコードの主な変更点はAlarmAgent.do_work() メソッド.The main() メソッドは、継続的に5秒の作業を行うには、アラームエージェントに指示し、それが特定のタスクを実行する状態に応じて.E :
  • インならIDLE , PROCESSING_DEVICE_TO_CLOUD_COMPLETED
  • いいえ保留中のタスクは、新しいアラームの状態をシミュレートするために行う
  • アラーム状態が変更された場合は、NEW_EVENT_TO_PROCESS
  • インならNEW_EVENT_TO_PROCESS
  • 新規作成parcel 送信するオブジェクト.
  • The parcel オブジェクトはソースの詳細(この小包トランザクションを開始したデバイスまたはサービス)を持ちます.
  • The parcel オブジェクトはバックエンドサービス(既に作成されたServerlessな関数のコレクション)に必要なデスティネーションの詳細を持っています.
  • The parcel オブジェクトは、受信者がそれを処理できるように、データコンテンツの詳細があります.
  • 状態を設定するPROCESSING_DEVICE_TO_CLOUD_STARTED
  • インならPROCESSING_DEVICE_TO_CLOUD_STARTED
  • 小包を処理するためのIOTハブに送る
  • 状態を設定するPROCESSING_DEVICE_TO_CLOUD_COMPLETED
  • 着信アラームステータスレポートの処理


    ここで焦点はIOTハブからのデータの取り扱いに関するものです.
    更新しましょうAlarmMonitorAgent したがって、着信アラームステータスの更新を処理することができます.
        # AlarmMonitorAgent.py
    
        ...
        from pb_AlarmStatus_pb2 import pb_AlarmStatus
        from pb_Parcel_pb2 import pb_Parcel
        from azure.iot.device import MethodResponse
        from google.protobuf import json_format
    
        class AlarmMonitorAgent(DeviceAgent):
            def __init__(self, conn_str, agent_name, logging):
                ...
                self.device_client.on_method_request_received = self.method_request_handler
    
            async def method_request_handler(self, method_request):
                status_code = 200
                payload = {"result": True, "data": "parcel handled"}
    
                if method_request.name != "ProcessMessage":
                    status_code = 404
                    payload = {"result": False, "data": "unknown method request"}
    
                parcel = json_format.ParseDict(method_request.payload, pb_Parcel(), True)
    
                if parcel is None or parcel.type != "pb_AlarmStatus":
                    status_code = 400
                    payload = {"result": False, "data": "bad parcel received"}
                else:
                    alarm_status = pb_AlarmStatus()
                    alarm_status.ParseFromString(bytes(parcel.content, 'utf-8'))
                    self.logging.info("****************************************************************************")
                    self.logging.info("Alarm status received from: %r" % (parcel.source.name))
                    self.logging.info("alarm_active: %r, time_utc: %r, date_utc: %r" % (alarm_status.alarm_active, alarm_status.time_utc, alarm_status.date_utc))
                    self.logging.info("****************************************************************************")
    
                method_response = MethodResponse.create_from_method_request(method_request, status_code, payload)
                await self.device_client.send_method_response(method_response)
    
    AlarmMonitorAgent が初期化され、直接メソッドハンドラmethod_request_handler が登録された.したがって、IOTハブがアラーム状態更新を受け取るとき、それはこのメソッドを起動します.
    価値がある1つのものはペイロードがJSON形式にあるということですcurrent limitation of Azure IoT Hub direct method , それで、我々はこれをprotobufメッセージに変える必要があります.

    クイックテストラン


    Python環境がアクティブになっていることを確認しますpython .\App.py
    そして、我々がファイルを見るならばevents.log , IOTハブに送られているものを見ることができます.

    例えば、
        2022-02-14 12:44:14,182 *************************************************************************
        2022-02-14 12:44:14,182 Packing event: Active: True, time: '12:44:14.182005', date: '2022-02-14'
        2022-02-14 12:44:14,182 *************************************************************************
    
    を送信するAlarmAgent , and
        2022-02-14 12:44:14,908 ****************************************************************************
        2022-02-14 12:44:14,908 Alarm status received from: 'Door Alarm'
        2022-02-14 12:44:14,908 alarm_active: True, time_utc: '12:44:14.182005', date_utc: '2022-02-14'
        2022-02-14 12:44:14,909 ****************************************************************************
    
    を受信するAlarmMonitorAgent .

    This blog post was originally posted on my blog site An IoT Odyssey