Pythonを使用したAzure IOTデバイスの実装
17196 ワード
Azure IOTについての良いことは、C、C - CHEN、Java、Nodeを含む一般的なプログラミング言語で既に利用可能なSDKがあることです.JSとPythonは、したがって、我々のデバイスと好ましい言語を満たす必要があります.特定のSDKの詳細については、次のリンクを参照してください. Constrained Device SDKs Unconstrained Device SDKs このブログのポストでは、ハブからイベントを受け取るために他の間、パイソンデバイスSDKを使用しているカップルAzure Iotデバイスを作成することに焦点があります.
シナリオは、アラームエージェントによって管理されるアラームを持っているということです.アラームの状態が変化すると、アラームエージェントは、アラームステータスをIOTハブに送信します.IOTハブは、アラーム監視エージェントに警報状態を通過する.
紺碧の口座 アジュールパワーシェル アクティブAzure IOTハブ Python 3環境
Azure IOTデバイスを作成しましょう
PowerShellコンソールを開き、Azure PowerShellを使用してサブスクリプションで署名してください.その後、次に示すようにデバイスを作成します.
環境変数として格納されたneccessaryアプリケーションの設定を読んでください. それぞれのクラスに必要なパラメータを渡すことでデバイスをインスタンス化します. デバイスを接続します 一度 両方のデバイスが接続されていない場合は、接続して終了するものを切断することはできません. それ以外の場合は、5秒間スリープして切断して終了します. それは、この時点で非常に簡単な受精テストです.すべてが順調に進んだならば、我々は両方の装置が首尾よくIOTハブに接続したことをログで見るべきです.
ここで焦点はIOTハブに送られるデータを準備することです.
更新しましょう
変更点 インなら いいえ保留中のタスクは、新しいアラームの状態をシミュレートするために行う アラーム状態が変更された場合は、 インなら 新規作成 The The The 状態を設定する インなら 小包を処理するためのIOTハブに送る 状態を設定する
ここで焦点はIOTハブからのデータの取り扱いに関するものです.
更新しましょう
価値がある1つのものはペイロードがJSON形式にあるということですcurrent limitation of Azure IoT Hub direct method , それで、我々はこれをprotobufメッセージに変える必要があります.
Python環境がアクティブになっていることを確認します
そして、我々がファイルを見るならば
例えば、
シナリオは、アラームエージェントによって管理されるアラームを持っているということです.アラームの状態が変化すると、アラームエージェントは、アラームステータスをIOTハブに送信します.IOTハブは、アラーム監視エージェントに警報状態を通過する.
必要なもの
必要なIOTデバイスの作成と接続の確認
Azure IOTデバイスを作成しましょう
Alarm Agent
and Alarm Monitor Agent
.PowerShellコンソールを開き、Azure PowerShellを使用してサブスクリプションで署名してください.その後、次に示すようにデバイスを作成します.
PS D:\Workspace\IoTHub> $AlarmAgent= Add-AzIotHubDevice -ResourceGroupName "rg-sea-multilinks" -IotHubName "ih-sea-multilinks" -DeviceId "AlarmAgent" -AuthMethod "shared_private_key"
PS D:\Workspace\IoTHub>
PS D:\Workspace\IoTHub> $AlarmMonitorAgent = Add-AzIotHubDevice -ResourceGroupName "rg-sea-multilinks" -IotHubName "ih-sea-multilinks" -DeviceId "AlarmMonitorAgent" -AuthMethod "shared_private_key"
PS D:\Workspace\IoTHub>
次に、接続文字列を取得し、後で使用できる環境変数に割り当てます. PS D:\Workspace\IoTHub> $TempObject = Get-AzIotHubDeviceConnectionString -ResourceGroupName "rg-sea-multilinks" -IotHubName "ih-sea-multilinks" -DeviceId "AlarmAgent" -KeyType secondary
PS D:\Workspace\IoTHub> $Env:AlarmAgentConnectionString = $TempObject.ConnectionString
PS D:\Workspace\IoTHub> $TempObject = Get-AzIotHubDeviceConnectionString -ResourceGroupName "rg-sea-multilinks" -IotHubName "ih-sea-multilinks" -DeviceId "AlarmMonitorAgent" -KeyType secondary
PS D:\Workspace\IoTHub> $Env:AlarmMonitorAgentConnectionString = $TempObject.ConnectionString
PS D:\Workspace\IoTHub> $Env:AlarmAgentName = "AlarmAgent"
PS D:\Workspace\IoTHub> $Env:AlarmMonitorAgentName = "AlarmMonitorAgent"
PS D:\Workspace\IoTHub>
IOTハブと相互作用するための基底クラスを作成します.この時点で、それはちょうど接続して切断することができる必要があります. # DeviceAgent.py
from azure.iot.device.aio import IoTHubDeviceClient
class DeviceAgent:
def __init__(self, conn_str, agent_name, logging):
self.agent_name = agent_name
self.logging = logging
self.device_client = IoTHubDeviceClient.create_from_connection_string(conn_str)
async def connect(self):
await self.device_client.connect()
async def disconnect(self):
await self.device_client.disconnect()
def is_connected(self):
return self.device_client.connected
今すぐ作成AlarmAgent
はDeviceAgent
# AlarmAgent.py
from DeviceAgent import DeviceAgent
class AlarmAgent(DeviceAgent):
def __init__(self, conn_str, agent_name, destination_name, logging):
super(AlarmAgent, self).__init__(conn_str, agent_name, logging)
self.destination_name = destination_name
and AlarmMonitorAgent
. # AlarmMonitorAgent.py
from DeviceAgent import DeviceAgent
class AlarmMonitorAgent(DeviceAgent):
def __init__(self, conn_str, agent_name, logging):
super(AlarmMonitorAgent, self).__init__(conn_str, agent_name, logging)
今iotハブとの対話をメインアプリケーションコードを作成します. # App.py
import asyncio
import os
import logging
import sys
from time import gmtime
from AlarmAgent import AlarmAgent
from AlarmMonitorAgent import AlarmMonitorAgent
REQUIRED_DEVICES = 2
logging.basicConfig(filename='events.log', encoding='utf-8', format='%(asctime)s %(message)s', level=logging.INFO)
logging.Formatter.converter = gmtime
def all_devices_are_connected(devices):
return len(devices) == REQUIRED_DEVICES
async def disconnect_devices(devices):
coroutines = []
for device in devices:
coroutines.append(device.disconnect())
await asyncio.gather(*coroutines)
async def main():
alarm_agent = None
alarm_monitor_agent = None
connected_devices = []
try:
logging.info("App Started...")
# Get neccessary application configs
alarm_agent_conn_str = os.getenv("AlarmAgentConnectionString")
alarm_monitor_agent_conn_str = os.getenv("AlarmMonitorAgentConnectionString")
alarm_agent_name = os.getenv("AlarmAgentName")
alarm_monitor_agent_name = os.getenv("AlarmMonitorAgentName")
# Connect our devices to IoT hub
alarm_agent = AlarmAgent(alarm_agent_conn_str, alarm_agent_name, alarm_monitor_agent_name, logging)
alarm_monitor_agent = AlarmMonitorAgent(alarm_monitor_agent_conn_str, alarm_monitor_agent_name, logging)
await asyncio.gather(alarm_agent.connect(), alarm_monitor_agent.connect())
if alarm_agent.is_connected():
connected_devices.append(alarm_agent)
if alarm_monitor_agent.is_connected():
connected_devices.append(alarm_monitor_agent)
if all_devices_are_connected(connected_devices) is False:
await disconnect_devices(connected_devices)
# Can't do much if AlarmAgent and AlarmMonitorAgent are not both connect
logging.error("Failed to connect necessary devices.")
sys.exit()
logging.info("All devices are connected.")
await asyncio.sleep(5)
# Nothing else to do, clean up as required.
await disconnect_devices(connected_devices)
logging.info("App Stopped!")
except Exception:
logging.error(Exception.with_traceback())
if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(main())
では、上記のコードをすばやくまとめてみましょう.asyncio.gather()
私たちは同時に接続を開始することができますconnect()
いずれにしても非同期的にメソッド).asyncio.gather()
完了し、両方のデバイスが接続されていることを確認します.新しいアラーム状態を報告する
ここで焦点はIOTハブに送られるデータを準備することです.
更新しましょう
AlarmAgent
したがって、それはランダムにアラームステータスをシミュレートし、新しいステータスが現在のステータスとは異なる場合は、それを報告するAlarmMonitorAgent
.変更点
DeviceAgent
# DeviceAgent.py
...
from enum import Enum
class DeviceAgentState(Enum):
UNKNOWN = 1
IDLE = 2
NEW_EVENT_TO_PROCESS = 3
PROCESSING_DEVICE_TO_CLOUD_STARTED = 4
PROCESSING_DEVICE_TO_CLOUD_COMPLETED = 5
class DeviceAgent:
def __init__(self, conn_str, agent_name, logging):
...
self.state = DeviceAgentState.IDLE
...
async def send_parcel(self, parcel):
await self.device_client.send_message(parcel)
変更点AlarmAgent
# AlarmAgent.py
...
import random
from datetime import datetime, timezone
from DeviceAgent import DeviceAgentState
from pb_AlarmStatus_pb2 import pb_AlarmStatus
from pb_Parcel_pb2 import pb_Parcel
class AlarmAgent(DeviceAgent):
def __init__(self, conn_str, agent_name, destination_name, logging):
...
self.current_alarm_status = False
self.new_alarm_status = False
self.parcel_to_send = None
...
def simulate_event(self):
return bool(random.getrandbits(1))
def pack_alarm_event(self):
self.logging.info("Packing event: %r" % self.current_alarm_status)
date_time_utc = datetime.now(timezone.utc)
alarm_status = pb_AlarmStatus()
alarm_status.alarm_active = self.current_alarm_status
alarm_status.time_utc = "{}".format(date_time_utc.time())
alarm_status.date_utc = "{}".format(date_time_utc.date())
self.logging.info("*************************************************************************")
self.logging.info("Packing event: Active: %r, time: %r, date: %r" % (alarm_status.alarm_active, alarm_status.time_utc, alarm_status.date_utc))
self.logging.info("*************************************************************************")
parcel = pb_Parcel()
parcel.source.name = "Door Alarm"
parcel.source.local_id = "1"
parcel.source.domain_agent = self.agent_name
parcel.source.domain = "Device Domain"
parcel.destination.name = "Alarm Monitor"
parcel.type = "pb_AlarmStatus"
parcel.content = str(alarm_status.SerializeToString(), 'utf-8')
self.parcel_to_send = str(parcel.SerializeToString(), 'utf-8')
async def do_work(self):
if self.state is DeviceAgentState.IDLE or self.state is DeviceAgentState.PROCESSING_DEVICE_TO_CLOUD_COMPLETED:
self.new_alarm_status = self.simulate_event()
if self.new_alarm_status == self.current_alarm_status:
self.state = DeviceAgentState.IDLE
else:
self.current_alarm_status = self.new_alarm_status
self.state = DeviceAgentState.NEW_EVENT_TO_PROCESS
return
if self.state is DeviceAgentState.NEW_EVENT_TO_PROCESS:
self.pack_alarm_event()
self.state = DeviceAgentState.PROCESSING_DEVICE_TO_CLOUD_STARTED
return
if self.state is DeviceAgentState.PROCESSING_DEVICE_TO_CLOUD_STARTED:
await self.send_parcel(self.parcel_to_send)
await asyncio.sleep(0.5) # Delayed for test
self.parcel_to_send = None
self.state = DeviceAgentState.PROCESSING_DEVICE_TO_CLOUD_COMPLETED
return
変更点App
# App.py
...
import time
...
async def main():
...
try:
...
start_time = time.time()
while True:
await alarm_agent.do_work()
# Let alarm agent runs for 5 sec
if (time.time() - start_time) > 5:
break
...
# Give alarm monitor some time before cleaning up connections.
await asyncio.sleep(5)
if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(main())
上のコードの主な変更点はAlarmAgent.do_work()
メソッド.The main()
メソッドは、継続的に5秒の作業を行うには、アラームエージェントに指示し、それが特定のタスクを実行する状態に応じて.E :IDLE
, PROCESSING_DEVICE_TO_CLOUD_COMPLETED
州NEW_EVENT_TO_PROCESS
NEW_EVENT_TO_PROCESS
州parcel
送信するオブジェクト.parcel
オブジェクトはソースの詳細(この小包トランザクションを開始したデバイスまたはサービス)を持ちます.parcel
オブジェクトはバックエンドサービス(既に作成されたServerlessな関数のコレクション)に必要なデスティネーションの詳細を持っています.parcel
オブジェクトは、受信者がそれを処理できるように、データコンテンツの詳細があります.PROCESSING_DEVICE_TO_CLOUD_STARTED
PROCESSING_DEVICE_TO_CLOUD_STARTED
州PROCESSING_DEVICE_TO_CLOUD_COMPLETED
着信アラームステータスレポートの処理
ここで焦点はIOTハブからのデータの取り扱いに関するものです.
更新しましょう
AlarmMonitorAgent
したがって、着信アラームステータスの更新を処理することができます. # AlarmMonitorAgent.py
...
from pb_AlarmStatus_pb2 import pb_AlarmStatus
from pb_Parcel_pb2 import pb_Parcel
from azure.iot.device import MethodResponse
from google.protobuf import json_format
class AlarmMonitorAgent(DeviceAgent):
def __init__(self, conn_str, agent_name, logging):
...
self.device_client.on_method_request_received = self.method_request_handler
async def method_request_handler(self, method_request):
status_code = 200
payload = {"result": True, "data": "parcel handled"}
if method_request.name != "ProcessMessage":
status_code = 404
payload = {"result": False, "data": "unknown method request"}
parcel = json_format.ParseDict(method_request.payload, pb_Parcel(), True)
if parcel is None or parcel.type != "pb_AlarmStatus":
status_code = 400
payload = {"result": False, "data": "bad parcel received"}
else:
alarm_status = pb_AlarmStatus()
alarm_status.ParseFromString(bytes(parcel.content, 'utf-8'))
self.logging.info("****************************************************************************")
self.logging.info("Alarm status received from: %r" % (parcel.source.name))
self.logging.info("alarm_active: %r, time_utc: %r, date_utc: %r" % (alarm_status.alarm_active, alarm_status.time_utc, alarm_status.date_utc))
self.logging.info("****************************************************************************")
method_response = MethodResponse.create_from_method_request(method_request, status_code, payload)
await self.device_client.send_method_response(method_response)
時AlarmMonitorAgent
が初期化され、直接メソッドハンドラmethod_request_handler
が登録された.したがって、IOTハブがアラーム状態更新を受け取るとき、それはこのメソッドを起動します.価値がある1つのものはペイロードがJSON形式にあるということですcurrent limitation of Azure IoT Hub direct method , それで、我々はこれをprotobufメッセージに変える必要があります.
クイックテストラン
Python環境がアクティブになっていることを確認します
python .\App.py
そして、我々がファイルを見るならば
events.log
, IOTハブに送られているものを見ることができます.例えば、
2022-02-14 12:44:14,182 *************************************************************************
2022-02-14 12:44:14,182 Packing event: Active: True, time: '12:44:14.182005', date: '2022-02-14'
2022-02-14 12:44:14,182 *************************************************************************
を送信するAlarmAgent
, and 2022-02-14 12:44:14,908 ****************************************************************************
2022-02-14 12:44:14,908 Alarm status received from: 'Door Alarm'
2022-02-14 12:44:14,908 alarm_active: True, time_utc: '12:44:14.182005', date_utc: '2022-02-14'
2022-02-14 12:44:14,909 ****************************************************************************
を受信するAlarmMonitorAgent
.This blog post was originally posted on my blog site An IoT Odyssey
Reference
この問題について(Pythonを使用したAzure IOTデバイスの実装), 我々は、より多くの情報をここで見つけました https://dev.to/chrisdinhnz/implementing-an-azure-iot-device-using-python-412gテキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol