セッションを持つAzure関数における順序待ち行列処理


順序について話しましょう.それは私のお気に入りのトピック、何かの一つですI've blogged about extensively before . Azure関数の以前に処理された処理はAzureイベントハブのようなイベントストリームでのみ可能でしたが、今日はどのようにサービスバスキューとトピックの順序を維持できるかを示したいと思います.
表面上はかなりまっすぐに見えます.キューからメッセージを処理することができます.マシン上で実行している単純なサービスのために、それは達成するのがかなり簡単です.しかし、どのように、私はスケールで処理したいときにキューメッセージの順序を保存しますか?Azure関数のような何かで、アクティブなインスタンスの数十人のメッセージを処理することができます.
病院で患者を扱うメッセージシステムの簡単な例を使いましょう.私は患者ごとにいくつかのイベントを想像してください:
  • 患者到着
  • 患者が部屋を割り当てた
  • 患者は治療を受ける
  • 患者退院
  • 私は自分の治療を処理する前に、私は順序のメッセージを処理し、潜在的に患者を放電することを確認したい!
    何が起こるかを見るために、いくつかの速い実験を走らせましょう.このために、私はこれらの4つのメッセージを送って、彼らを処理する1000人の患者をシミュレーションします.

    デフォルトと順序


    ちょうどキューの上でトリガーする単純なAzure機能でこれを試みましょう.私は特別な何かをするつもりはない、ちょうどキューにトリガし、それがredisキャッシュ上のリストに処理の操作を押してください.
    public async Task Run(
        [ServiceBusTrigger("queue", Connection = "ServiceBusConnectionString")]Message message, 
        ILogger log)
    {
        log.LogInformation($"C# ServiceBus queue trigger function processed message: {Encoding.UTF8.GetString(message.Body)}");
        await _client.PushData((string)message.UserProperties["patientId"], Encoding.UTF8.GetString(message.Body));
    }
    
    このキューにデータ(4メッセージごとに)の1000患者の価値を送信した後、REDISキャッシュは処理後にどのように見えますか?よく患者のいくつかは偉大に見えます.私が患者を見ているとき、私は4を見ます
    >lrange Patient-$4 0 -1
    1) "Message-0"
    2) "Message-1"
    3) "Message-2"
    4) "Message-3"
    
    すごい!すべての4つのイベントは患者4のために送られて、順番に処理されました.しかし、患者2を見るならば
    >lrange Patient-$2 0 -1
    1) "Message-1"
    2) "Message-2"
    3) "Message-0"
    4) "Message-3"
    
    この場合、2つの他のメッセージがすでに処理されたあとまで、「患者到着」メッセージを処理し終えませんでした.ここで何が起こったのですか.Azure Service Busは注文を保証するので、私のメッセージはなぜ故障しますか?
    デフォルトでは、キュートリガはいくつかのことを行います.まず、すべてのインスタンスが起動すると、同時に一連のメッセージを処理します.By default インスタンスは同時に32メッセージを処理します.それは、患者のためにすべての4つのメッセージを同時に処理しているかもしれません、そして、彼らが送られたより、彼らは異なる順序で終わります.よく修正するには簡単に見えるように、ちょうど1に並行性を制限しましょう.

    アンチパターン:制限スケールと並行性


    私が見る上記の問題に対する最も一般的な解決策は、ここにあります.並行して、32の代わりに一度にプロセス1メッセージだけに制限しましょう.そのためにhost.json ファイルと設定maxConcurrentCalls 1になる.現在、各インスタンスは一度に1メッセージだけを処理します.私は再び同じテストを実行します.
    最初に、それは超低速です.各インスタンスは一度に1を処理するので、4000キューのメッセージをかむのに長い時間がかかります.さらに悪い?その後の結果をチェックすると、患者の何人かはまだ故障している!何が起こっているのですか.インスタンスの並行性を1に制限したにもかかわらず、Azure関数は複数のインスタンスに対して私をスケールアウトしました.それで、私がスケーリングした20の機能アプリインスタンスを持っているならば、私は同時に処理されている20のメッセージ(1つのインスタンスにつき1)を持ちます.それは、同じ患者からのメッセージが同時に処理されることができる場所にまだ入ります.私はまだ注文処理を保証していません.
    ここの修理?多くの人々はazure関数からスケールを制限したい.中it's technically possible , それは私のスループットをさらに傷つけるでしょう.今1つだけのメッセージを一度に処理することができます、高トラフィックの間、私は私の機能を維持することができない可能性があります患者イベントの大きなバックログを取得するつもりです.

    救助へのセッション


    私がそれをここで終えたならば、これがこんな悲しいブログ柱でないでしょうか?より良い方法があります!以前は、パーティションやバッチのために、イベントハブを使用することができます.you can guarantee ordering . ここでの課題は、時にはキューがリトライとデッドレタリングのようなトランザクションの質を与えられたジョブのための正しいメッセージブローカーです.そして今、あなたはキューを使用し、サービスバスセッションで順序を得ることができます🎉.
    それで、セッションは何ですか?セッションでは、メッセージのグループの識別子を設定できます.セッションからメッセージを処理するために、あなたは最初に「ロック」をしなければなりません.その後、個別にセッションから各メッセージを処理することができます(通常のキューの完全なロック/完全な意味論を使用して).セッションの利点は、複数のインスタンス間でハイスケールで処理するときでも、順序を維持することができます.私たちは20のAzure機能アプリのインスタンスのような何かを持っていた前に考えて、すべて同じキューに競合します.20にスケーリングするのではなく、すべての20のインスタンスは、それぞれ独自の利用可能なセッションを「ロック」し、そのセッションからイベントを処理するだけです.セッションはまた、セッションからのメッセージが順番に処理されることを保証します.
    セッションを動的にいつでも作成することができます.Azure関数のインスタンスがスピンし、最初に「ロックされていないセッションIDを持つメッセージがありますか?」もしそうならば、それはセッションをロックして、順番に処理を始めます.セッションがもはや利用可能なメッセージを持っていないとき、Azure関数はロックを解放し、次の利用可能なセッションに移動します.メッセージは、最初にメッセージが属しているセッションをロックしなければならずに処理されません.
    上の例では、同じ4000のメッセージ(1000人の患者のための4つの患者イベント)を送るつもりです.この場合、患者IDをセッションIDとして設定します.各Azure関数のインスタンスは、セッション上でロックを取得し、利用可能なメッセージを処理し、利用可能なメッセージを持つ別の患者に移動します.

    Azure関数におけるセッションの使用


    セッションは現在Microsoft.Azure.WebJobs.Extensions.ServiceBus version>= 3.1.0を使用した拡張子.だから最初に、拡張子を引っ張ります.
    Install-Package Microsoft.Azure.WebJobs.Extensions.ServiceBus -Pre
    
    そして、セッションを有効にするために、私の関数コードに最も簡単なコードを変更しますisSessionsEnabled = true ):
    public async Task Run(
        [ServiceBusTrigger("queue", Connection = "ServiceBusConnectionString", IsSessionsEnabled = true)]Message message, 
        ILogger log)
    {
        log.LogInformation($"C# ServiceBus queue trigger function processed message: {Encoding.UTF8.GetString(message.Body)}");
        await _client.PushData(message.SessionId, Encoding.UTF8.GetString(message.Body));
    }
    
    私も、私がセッション有効なキューまたは話題を使用していることを確認する必要があります.

    そして、メッセージをキューにプッシュするとき、私は右を設定しますsessionId 私が送る各々の患者メッセージのために.
    機能を公開した後、私は4000メッセージを押します.キューはかなり速くドローされます.なぜなら、スケーリングされたインスタンス間で同時に複数のセッションを処理できるからです.テストを実行した後、REDISキャッシュをチェックします.予想通り、すべてのメッセージが処理され、1つの患者ごとに順番に処理されたことがわかります.
    >lrange Patient-$10 0 -1
    1) "Message-0"
    2) "Message-1"
    3) "Message-2"
    4) "Message-3"
    
    >lrange Patient-$872 0 -1
    1) "Message-0"
    2) "Message-1"
    3) "Message-2"
    4) "Message-3"
    
    それで、セッションのための新しいAzure機能サポートで、私は全体的なスループットに犠牲を払わないで、順番にサービスバス待ち行列または話題からメッセージを処理することができます.私は動的に新しいか既存のセッションにメッセージを加えることができて、セッションのメッセージが彼らがサービスバスによって受け取られる順序で処理されるという確信を持っています.
    あなたは私がテストとロードメッセージをロードするために使用する完全なサンプルを見ることができますmy GitHub repo . The master 支店はすべて整然としているout-of-order ブランチは、デフォルトとアウトオーダーの実験です.