RabbiitMQ-メッセージ確認機構(事務+confirm)

60564 ワード

記事の目次
  • RabbiitMQ-メッセージ確認メカニズム(事務+confirm)
  • 1.RabitMQ-メッセージ確認メカニズム(事務+confirm)
  • .事務機構
  • 生産者
  • 消費者
  • .Confirmモード
  • 3.1生産者端Confirmモードの実現原理
  • 普通のシングルは
  • を送信します。
  • 生産者
  • 消費者
  • 複数のバッチ
  • 生産者
  • 消費者
  • 生産者
  • 消費者
  • RabbiitMQ-メッセージ確認機構(事務+confirm)
    1.RabbiitMQ-メッセージ確認メカニズム(事務+confirm)
    ラジビットMQでは、サーバの異常によるデータ損失問題を解決します。生産者がメッセージを送信した後、メッセージはラジビットMQサーバですか?   デフォルトでは知ることのできない には二つの方法があります。
  • AMQPプロトコル:AMQPは、トランザクション機構
  • を実現する。
  • Confirmモード
  • 2.事務の仕組み
     TxSelect TxComit TxRollBack
  • TxSelect:現在のチャンネルをtranseモードに設定するための
  • TxComit:トランザクションを提出するための
  • TxRollBack:ロールバックトランザクション用
  • 生産者
    package com.ithzk.rabbitmq.transation;
    
    import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    import javax.sound.midi.Soundbank;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author hzk
     * @date 2018/3/10
     */
    public class TxSend {
    
        private final static String QUEUE_NAME = "test_queue_tx";
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            Connection connection = RabbitMQConnectionUtils.getConnection();
    
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
            String msg = "hello tx";
    
            try {
                channel.txSelect();
    
                channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
    
                int ex = 1/0;
    
                channel.txCommit();
            } catch (Exception e) {
                e.printStackTrace();
                channel.txRollback();
                System.out.println("Send msg rollback");
            }
    
            System.out.println("Send msg"+msg);
    
            channel.close();
            connection.close();
        }
    
    }
    
    
    消費者
    package com.ithzk.rabbitmq.transation;
    
    import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author hzk
     * @date 2018/3/10
     */
    public class TxRecv1 {
    
        private final static String QUEUE_NAME = "test_queue_tx";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //    
            Connection connection = RabbitMQConnectionUtils.getConnection();
    
            //        
            final Channel channel = connection.createChannel();
    
            //    
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
            //        
            channel.basicQos(1);
    
           boolean autoAck = true;
    
           channel.basicConsume(QUEUE_NAME,autoAck,new DefaultConsumer(channel){
    
               @Override
               public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                   String msg = new String(body, "utf-8");
                   System.out.println("[1] Recv tx msg:" + msg);
               }
           });
    
           System.out.println("[Consumer 1 start]");
    
    
        }
    
    
    }
    
    
    3.Confirmモード
    3.1生産者端Confirmモードの実現原理
     生産者はチャネルをConfirmモードに設定し、一旦チャネルがconfirmモードに入ると、このチャネル上で発行されたメッセージには一意のIDが割り当てられます。(1から開始します。)メッセージが全てのマッチするキューに送られると、brook erは生産者に確認を送ります。これにより、生産者は、メッセージが正しく目的のキューに到達していることを知っています。メッセージとキューが持続可能であれば、確認メッセージはディスクに書き込まれて発行され、生産者にbrookフィードバックされる確認メッセージには、deliver-deliver-が確認メッセージのシリアル番号が含まれています。このシーケンス番号にマークされる前のメッセージはすべて処理されました。
     Confirmモードの最大の利点は、非同期処理であり、Nack(Negative ACKnowledment、否定応答)がconfrimモードをオンすることである。
    channel.confirmSelect()
    
     プログラミングモード:
  • 普通はwaitForConfirmsを呼び出すために送信します。()
  • 一括でwaitForConfirmsを呼び出すバッチを送ります。
  • 非同期confirmモード:フィードバックを提供する
  • 普通郵便で送る
    生産者
    package com.ithzk.rabbitmq.confirm;
    
    import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     *     
     * @author hzk
     * @date 2018/3/10
     */
    public class ConfrimSend {
    
        private final static String QUEUE_NAME = "test_queue_confrim_one";
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    
            Connection connection = RabbitMQConnectionUtils.getConnection();
    
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
            //     confirmSelect()  channel  confirm  
            channel.confirmSelect();
    
            String msg = "hello confirm one";
    
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
    
            if (!channel.waitForConfirms()){
                System.out.println("message send fail!");
            }else{
                System.out.println("Send msg"+msg);
            }
    
            channel.close();
            connection.close();
        }
    
    }
    
    
    消費者
    package com.ithzk.rabbitmq.confirm;
    
    import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author hzk
     * @date 2018/3/10
     */
    public class ConfirmRecv1 {
    
        private final static String QUEUE_NAME = "test_queue_confrim_one";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //    
            Connection connection = RabbitMQConnectionUtils.getConnection();
    
            //        
            final Channel channel = connection.createChannel();
    
            //    
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
           boolean autoAck = true;
    
           channel.basicConsume(QUEUE_NAME,autoAck,new DefaultConsumer(channel){
    
               @Override
               public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                   String msg = new String(body, "utf-8");
                   System.out.println("[1] Recv tx msg:" + msg);
               }
           });
    
           System.out.println("[Consumer 1 start]");
    
    
        }
    
    
    }
    
    
    複数のロット
     を送ってから確認します。
    生産者
    package com.ithzk.rabbitmq.confirm;
    
    import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     *     
     * @author hzk
     * @date 2018/3/10
     */
    public class ConfrimSend2 {
    
        private final static String QUEUE_NAME = "test_queue_confrim_one";
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    
            Connection connection = RabbitMQConnectionUtils.getConnection();
    
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
            //     confirmSelect()  channel  confirm  
            channel.confirmSelect();
    
            String msg = "hello confirm batch";
    
            //    
            for (int i = 0 ;i < 50 ; i++){
                channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            }
    
            //  
            if (!channel.waitForConfirms()){
                System.out.println("message send fail!");
            }else{
                System.out.println("Send msg"+msg);
            }
    
            channel.close();
            connection.close();
        }
    
    }
    
    
    消費者
    package com.ithzk.rabbitmq.confirm;
    
    import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author hzk
     * @date 2018/3/10
     */
    public class ConfirmRecv1 {
    
        private final static String QUEUE_NAME = "test_queue_confrim_one";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //    
            Connection connection = RabbitMQConnectionUtils.getConnection();
    
            //        
            final Channel channel = connection.createChannel();
    
            //    
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
           boolean autoAck = true;
    
           channel.basicConsume(QUEUE_NAME,autoAck,new DefaultConsumer(channel){
    
               @Override
               public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                   String msg = new String(body, "utf-8");
                   System.out.println("[1] Recv tx msg:" + msg);
               }
           });
    
           System.out.println("[Consumer 1 start]");
    
    
        }
    
    
    }
    
    
    非同期コンバージョンモード
    Channelオブジェクトから提供されるConfriimListener()コールバック方法は、deliveryTag(現在Chanelから送信されているメッセージ番号)のみを含み、サブセットは各Chanelのためにunconfirmのメッセージシーケンス番号を維持し、各publishのデータに、セットの要素を1つ追加して、一回ごとにhandleAck方法を変更します。(multiple=true)記録。プログラムの動作効率から見ると、このunconfirmセットは、順序の良いセットSortedert記憶構造を採用したほうがいいです。
    生産者
    package com.ithzk.rabbitmq.confirm;
    
    import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.ConfirmListener;
    import com.rabbitmq.client.Connection;
    
    import java.io.IOException;
    import java.util.Collections;
    import java.util.SortedMap;
    import java.util.SortedSet;
    import java.util.TreeSet;
    import java.util.concurrent.TimeoutException;
    
    /**
     *     
     * @author hzk
     * @date 2018/3/10
     */
    public class ConfrimSend3 {
    
        private final static String QUEUE_NAME = "test_queue_confrim_one";
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    
            Connection connection = RabbitMQConnectionUtils.getConnection();
    
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
            //     confirmSelect()  channel  confirm  
            channel.confirmSelect();
    
            //        
            final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
    
            //      
            channel.addConfirmListener(new ConfirmListener() {
                //     handleAck
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    if(multiple){
                        System.out.println("handleAck multiple");
                        confirmSet.headSet(deliveryTag+1).clear();
                    }else{
                        System.out.println("handleAck multiple false");
                        confirmSet.remove(deliveryTag);
                    }
                }
    
                //handleNack 3s 10s xxx..   
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    if(multiple){
                        System.out.println("handleNack multiple");
                        confirmSet.headSet(deliveryTag+1).clear();
                    }else{
                        System.out.println("handleNack multiple false");
                        confirmSet.remove(deliveryTag);
                    }
                }
            });
    
            String msg = "hello confirm async";
    
            while (true){
                long seqNo = channel.getNextPublishSeqNo();
                channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
                confirmSet.add(seqNo);
            }
    
        }
    
    }
    
    
    消費者
    package com.ithzk.rabbitmq.confirm;
    
    import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author hzk
     * @date 2018/3/10
     */
    public class ConfirmRecv1 {
    
        private final static String QUEUE_NAME = "test_queue_confrim_one";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //    
            Connection connection = RabbitMQConnectionUtils.getConnection();
    
            //        
            final Channel channel = connection.createChannel();
    
            //    
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
           boolean autoAck = true;
    
           channel.basicConsume(QUEUE_NAME,autoAck,new DefaultConsumer(channel){
    
               @Override
               public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                   String msg = new String(body, "utf-8");
                   System.out.println("[1] Recv tx msg:" + msg);
               }
           });
    
           System.out.println("[Consumer 1 start]");
    
    
        }
    
    
    }
    
    
     ps:txモードの効率が悪いので、使用後の2つを勧めます。