RabbiitMQ消費者確認(Consmer Acknowledgemens)


要約
RabbiitMQはデータの安全について二つの特性があります。発表者確認と消費者確認(Consmer Acknowledegements)。前者はMQサーバ(brook er)に対して、発信者にメッセージを伝える結果であり、メッセージプロトコルの拡張内容である。後者は、MQサーバ・メッセージ伝達の結果を消費者に伝えるためのメッセージ・プロトコルの定義である。本論文は主にラビットMQの消費者確認の実現を紹介する。
メッセージ伝達の標識:伝達ラベル(delivery tags)
メッセージの伝達はどのようにして表示されますか?消費者が登録された後、RabbiitMQはBaic.delivery方法を通じてメッセージを伝達し、この方法は伝達ラベルを携帯し、チャネル上のあるメッセージの伝達を一意に識別する。したがって、伝達ラベルは通路によって分けられます。伝達ラベルは単調に増加した正の整数であり、消費者クライアントがメッセージを確認する方法は伝達ラベルをパラメータとする。伝達ラベルはチャネルによって分離されているので、メッセージAがチャネル1から消費者サーバに伝達されている場合、メッセージAの確認もチャネル1を通過しなければならない。エラーがチャネル2を通じて確認されれば、RabbitiMQはプロトコル異常を持ち出してチャネル2を閉鎖する。
消費者確認モード
MQサーバノードが消費者サーバにメッセージを送るとき、このメッセージはいつ消費者によって成功的に処理されたかを決定する必要があります。メッセージプロトコルは、一般的に、消費者が接続されたMQサーバに確認応答を送信することができる確認機構を提供する。仕組みが有効かどうかは、一般的に消費者が購読するときに決めます。
使用の確認モードに応じて、RabbiitMQは、メッセージがMQサーバから送信された(TCP Socketに書き込まれた)直後に、成功した処理として扱われたり、消費者から表示された(手作業の)確認を受けた後であったりすることができる。
手動確認モード
消費者が手作業で送った確認は、次のような合意方法であることができる。
  • basic.ack(deliveryTag,multiple)は、成功メッセージを確認するために用いられる
  • basic.nack(deliveryTag,requeue,multiple)は、失敗したメッセージを確認するために用いられる
  • basic.reject(deliveryTa,requeue)は、失敗を確認するためのメッセージ
  • を使用する。
    成功が簡単であることを確認した令ラビットMQは、メッセージを送信済みとして記録し、破棄する。basic.reject方法を使用して、RabbiitMQにメッセージを送信に失敗したと記録させるが、依然として廃棄が必要である。
    自動確認モード
    自動確認モードでは、メッセージは送信後に成功したと考えられます。このようなモード損失データセキュリティは、メッセージの高スループットと交換し、消費者とのTCP接続またはメッセージ・チャネルが送信に成功する前にオフになった場合、MQサーバから送信されたメッセージは失われる。したがって、自動確認モードはデータ以外の安全性を考慮し、慎重に使用するべきです。
    自動確認モードを使うなら、もう一つ注意したいのは消費者の負荷です。手動確認モードの典型的な使用は、未処理状態のメッセージを保存するために消費者側で有限サイズのキュー(prefetch)を定義する。しかし、自動確認モードにはこの制限がないため、消費者サーバが負荷を超えてメモリが足りなくなり、オペレーティングシステムによってプロセスが終了する恐れがあります。したがって、消費者は、メッセージを安定かつ効率的に処理することができるという前提でのみ、確認モードの使用を提案する。
    RabbiitMQ手動確認例
    1、メッセージの転送が成功したことを確認する。
    JAVAライブラリは、Channel#basicAckおよびChannel#basicNackの方法を使用して、プロトコルで定義されたbasic.ackおよびbasic.nackの方法をそれぞれ実現する。例は以下の通りです
    //      channel  
    
    boolean autoAck = false;
    channel.basicConsume(queueName, autoAck, "a-consumer-tag",
         new DefaultConsumer(channel) {
             @Override
             public void handleDelivery(String consumerTag,
                                        Envelope envelope,
                                        AMQP.BasicProperties properties,
                                        byte[] body)
                 throws IOException
             {
                 long deliveryTag = envelope.getDeliveryTag();
                 //           ,     RabbitMQ  
                 channel.basicAck(deliveryTag, false);
             }
         });
    
    2,一回に複数のメッセージの転送が成功したことを確認する。
    手動確認モードは、バッチ確認によりネットワークフローを低減し、確認方法のmultipleパラメータをtrueに設定することができる。basic.rejectは、このパラメータがないので、RabitMQは、拡張プロトコルの一部としてbasic.nackの方法を導入する。multipleパラメータがtrueに設定される。RabbiitMQは、すべての転送ラベルが所定の値より小さいメッセージを確認する。例えば、チャネルChに未確認メッセージがあり、それらの伝達ラベルは5,6,7,8であり、確認バンドの伝達ラベルが8であり、multipleパラメータがtrueに設定されている場合、5-8メッセージはいずれも確認される。multipleパラメータがfalseに設定されている場合、5,6,7メッセージはまだ確認されていません。例は以下の通りです
    //      channel  
    
    boolean autoAck = false;
    channel.basicConsume(queueName, autoAck, "a-consumer-tag",
         new DefaultConsumer(channel) {
             @Override
             public void handleDelivery(String consumerTag,
                                        Envelope envelope,
                                        AMQP.BasicProperties properties,
                                        byte[] body)
                 throws IOException
             {
                 long deliveryTag = envelope.getDeliveryTag();
                 //          deliveryTag        ,     
                 channel.basicAck(deliveryTag, true);
             }
         });
    
    3、メッセージが失敗したことを確認して、改めてチームに入るメッセージ。
    時には、消費者はすぐに伝達されたメッセージを処理できないが、他の消費者は処理する能力がある。この場合、メッセージを再入隊させ、他の消費者にそれを受信させ、処理させる必要があるかもしれない。basic.rejectおよびbasic.nackは、このための2つのプロトコル方法である。上記の2つの方法は、通常、メッセージ伝達失敗を確認するために用いられ、MQサーバはこれらのメッセージを破棄したり、または再入隊したりすることができる。requeueパラメータによって制御されてもよく、trueに設定されると、MQサーバは転送ラベルを指定したメッセージを再入力する。例は以下の通りです
    //      channel  
    
    boolean autoAck = false;
    channel.basicConsume(queueName, autoAck, "a-consumer-tag",
         new DefaultConsumer(channel) {
             @Override
             public void handleDelivery(String consumerTag,
                                        Envelope envelope,
                                        AMQP.BasicProperties properties,
                                        byte[] body)
                 throws IOException
             {
                 long deliveryTag = envelope.getDeliveryTag();
                 //        deliveryTag       ,    
                 channel.basicReject(deliveryTag, false);
             }
         });
    
    //      channel  
    
    boolean autoAck = false;
    channel.basicConsume(queueName, autoAck, "a-consumer-tag",
         new DefaultConsumer(channel) {
             @Override
             public void handleDelivery(String consumerTag,
                                        Envelope envelope,
                                        AMQP.BasicProperties properties,
                                        byte[] body)
                 throws IOException
             {
                 long deliveryTag = envelope.getDeliveryTag();
                 //     ,    
                 channel.basicReject(deliveryTag, true);
             }
         });
    
    RabbiitMQが再入隊のメッセージを元の位置に置くことができれば、可能な限り遠征先に近い位置に置く。ある瞬間に出現すると、すべての消費者のプリフェッチ・キューが満杯になっています(メッセージを受信できません)。再入隊/再送のサイクルが現れ、ネットワーク帯域幅とメモリ・リソースの消費を引き起こします。消費者は再発送の数量を追跡して、確認に失敗したメッセージを破棄したり、一定時間後に延期して再入隊する必要があります。basic.nackを使用して同時に複数のメッセージを入隊することができ、basic.reject方法よりも1つのmultipleパラメータが多くなり、例は以下の通りである。
    //      channel  
    
    boolean autoAck = false;
    channel.basicConsume(queueName, autoAck, "a-consumer-tag",
         new DefaultConsumer(channel) {
             @Override
             public void handleDelivery(String consumerTag,
                                        Envelope envelope,
                                        AMQP.BasicProperties properties,
                                        byte[] body)
                 throws IOException
             {
                 long deliveryTag = envelope.getDeliveryTag();
                 //           deliveryTag       ,       
                 channel.basicNack(deliveryTag, true, true);
             }
         });
    
    手動確認モードでは、送信されたチャネルが接続をオフまたは失うと、チャネル上で確認メッセージが受信されないと自動的に再入力されます。これは、クライアントのTCP接続が失われ、消費者アプリケーション(プロセス)の障害およびチャネルレベルプロトコルの異常を含む。
    メッセージが再入隊することを考慮して、消費者はメッセージ操作の累乗性を保証する必要があります。再送信されたメッセージは、伝送されたredeliverがRabitMQによってtrueに設定され、このパラメータはメッセージの最初の送信時にfalseである。注意してください。消費者は前回他の消費者に受信されたメッセージを受け取るかもしれません。
    消費者によくあるエラー:繰り返し確認と未知のタグ
    ある伝達ラベルを消費者が繰り返し確認すると、RabbiitMQチャネル異常が発生します。存在しないリレータブを使用すると同じ異常が発生します。
    他の会報PRECONDITION_FAILED - unknown delivery tag 100の場面は、メッセージのチャネルが受信メッセージのチャネルと異なることを確認することである。メッセージ伝達確認とメッセージ伝達は同じ通路でなければならないことを覚えてください。
    チャネルプリフェッチ設定(QoS)
    メッセージは非同期的にクライアントに送信(プッシュ)されるので、任意の所与の時刻において、通常以上のメッセージが「実行中」である。また、クライアントからの手動確認は本質的に非同期である。したがって、未確認の転送ラベルスライドウィンドウがあります。開発者は通常、このウィンドウのサイズを制限して、ユーザー側に制限のないバッファ問題がないようにしたいです。メッセージプロトコルは、unknown delivery tag方法を用いて「プリフェッチカウント」値を設定することによって実現される。この値はチャネルで許可される未確認引き渡しの最大数を定義する。構成の数に達すると、RabbitMQは、少なくとも1つの未処理メッセージが確認されていない限り、チャネル上でより多くのメッセージを転送することを停止する。
    例えば、チャネルbasic.qos上に未確認の伝達ラベル5、6、7、および8があると仮定し、チャネルChのプリフェッチカウントは4に設定されており、RabbitMQは、少なくとも未完成の転送が確認されない限り、Ch上でいかなるメッセージも伝達されないだろう。確認応答がチャネルCh上で消費者によって送信されると、Chは5に設定され、RabbiitMQはメッセージを注意して伝達する。
    プリフェッチカウントがメッセージスループットに及ぼす影響
    通常、プリフェッチを増加させることは、消費者にメッセージを伝える速度を高めるが、まだ処理されていないメッセージの数も増加し、消費者のRAM消費を増加させる。適切なプリフェッチ値を見つけることは繰り返し試験の問題であり、作業負荷によって異なります。100から300までの値は、一般的に、最適なスループットを提供し、消費者を圧倒する大きなリスクをもたらさない。より高い値を取ると、収益の減少の法則によく出会う。
    参考資料
    Consmer Acknowledgements and Publisher Confirms