Azure Service Bus,受信モードでのPerk-lock,ReceiveAndDelete


受信モードとは?


Service Busを使用するとき、少なくとも1回同じ部分をどのように処理するかを知りました.
そして深く理解した部分はreceiveモードです.

中はどのように処理して、少なくとも1回以上実行することを保証します!?
一時...約束してるでしょ?
msgをドラッグ&ドロップすると、Service BusにはPerk-lockとReceiveとDeltaの2つの方法があります.
この文章では、2つのモードとさまざまなオプション、およびテガがテストしたコードを簡単に記録します.

Perk-lockとReceive and Delete


Peek-LOckとは?


簡単に言えば、msgをconsumerにインポートする前に、インポートするメッセージをロックして、他のconsumerがインポートできないようにしてからmsgをインポートする方法です.
これは2つの段階に分けて行われます.
他のユーザが同じメッセージを受信しないように、次のメッセージを見つけてロックし、アプリケーションにmsgを送信します.
  • アプリケーションがmsgを処理した後、サービスバスに第2段階の完了を要求すると、サービスバスはそのmsgを消費したとマークされる.
  • アプリケーションがmsgを何らかの理由で処理できない場合、サービスバスはmsgをロック解除し、同じ消費者または他の消費者がmsgを再取得できるようにします.
    ロックタイムアウトという時間があり、その時間内に処理できない場合は、上記のようにロックを解除した後、同じ消費者または他の消費者に再受信させることができる.
    msgを処理し、サービスバスに完全な処理要求を発行する前に、アプリケーションがクラッシュした場合、サービスバスはアプリケーションの再起動時にメッセージを再送信します.このプロセスを少なくとも1回の処理と呼ぶ.同じメッセージが再送信され、2回処理される可能性があるため、アプリケーションは重複する部分を検出するために追加の論理を追加する必要があります.
    上記2回の処理が発生しないように、MessageIDを用いて新しいMessageIDをtimewindowに記録すれば、受け入れられたmsgと判断できる.
    したがって、Perk-lockは、すべてのメッセージを少なくとも1回処理しなければならないアプリケーションで使用することができる.

    Receive and Deleteとは?


    このモードは、consumerからのリクエストが受信されると、msgが使用済みとマークされ、msgがconsumerアプリケーションに返されることを示します.
    最も簡単なアニメーションで、すべてのmsgはすべての処理を必要としないアプリケーションで使用できます.

    では、今まで私たちは何を書いていますか?


    Peek-LOckはデフォルトオプションです.
    このモードを使用するには、ReceiveAndDeleteモードに設定する必要があります.

    テスト


    上記のモードでは、receive and delete方式は、最初は消費タグを用いて送信されるため、同じキューを複数の消費者が処理することはない.
    Peek-LOckをテストします

    python

    from azure.servicebus import ServiceBusClient, ServiceBusMessage
    import asyncio
    CONNECTION_STR = "{Service_Bus_Endpoint_Here}"
    QUEUE_NAME = "{Queue_Name_Here}"
    servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR, logging_enable=True)
    servicebus_client2 = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR, logging_enable=True)
    servicebus_client3 = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR, logging_enable=True)
    print("\n\n----------------------------------------------\n")
    print("Sending messages to Service Bus")
    def send_message(sender):
        for x in range(1,11):
            message = ServiceBusMessage(str(x))
            sender.send_messages(message)
            print(" number: ", str(x))
        print("Sending messages finished\n")
    with servicebus_client:
        sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME)
        with sender:
            send_message(sender)
    print("Receiving messages from Service Bus")
    print("----------------------------------------------\n")
    async def receiver_001():
        with servicebus_client:
            receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME, max_wait_time=5)
            with receiver:
                for msg in receiver:
                    await asyncio.sleep(3)
                    print("receiver_001 Received: " + str(msg))
                    receiver.complete_message(msg)
    async def receiver_002():
        with servicebus_client2:
            receiver = servicebus_client2.get_queue_receiver(queue_name=QUEUE_NAME, max_wait_time=5)
            with receiver:
                for msg in receiver:
                    await asyncio.sleep(2)
                    print("receiver_002 Received: " + str(msg))
                    receiver.complete_message(msg)
    async def receiver_003():
        with servicebus_client3:
            receiver = servicebus_client2.get_queue_receiver(queue_name=QUEUE_NAME, max_wait_time=5)
            with receiver:
                for msg in receiver:
                    await asyncio.sleep(1)
                    print("receiver_003 Received: " + str(msg))
                    receiver.complete_message(msg)
    async def main():
        try:
            await asyncio.wait([
                asyncio.create_task(receiver_001()),
                asyncio.create_task(receiver_002()),
                asyncio.create_task(receiver_003())
            ])
        except:
            pass
    if __name__=='__main__':
        asyncio.run(main())
    上のコードを回すと、下のような画面が見えます.

    10を入れ、同時に3つの消費者を回しても同じmsgを持たない部分を確認できます.
    3人の消費者が同時に導入した部分を示すために作成したコードマシンでは,キューを終了する際の順序は保証できるが,処理後に私の画面で送信する順序は保証できない.

    の最後の部分


    これは次の部分しか覚えていない商売です.
    少なくとも1回の処理が必要:Perk-lock
    少なくとも1回の処理は不要:Receive and Delete