ロックMqトランザクションメッセージ送信コードフロー詳細


一、RocketMq事務メッセージの流れ:
1、まずbrookに事前要請メッセージを送ります。消費者は見えません。
2、コールバックはローカルトランザクションを実行します。たとえば、データベースを操作します。
3、事務の実行が成功したら、再度brookにメッセージを送り、brook事務の実行が成功したというメッセージを消費者に見せてください。ローカルトランザクションがタイムアウトしたらunknowに戻ります。brookはメッセージを送り、メッセージが実行されたかどうかを確認します。
二、RocketMq事務メッセージの例:
1、ロックMqに関する依存を導入する:

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-client</artifactId>
  <version>4.4.0</version>
</dependency>
2、Transation Producerクラスを作成する:

public class TransactionProducer {

  public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException {
    //          
    TransactionMQProducer producer = new TransactionMQProducer("rocketMQ_transaction_producer_group");
    //2.  Nameserver  
    producer.setNamesrvAddr("192.168.***.***:9876");
    //3、                     
    TransactionListener listener = new TransactionListenerImol();
    producer.setTransactionListener(listener);
    //4、   
    ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
      @Override
      public Thread newThread(Runnable r) {
        Thread thread = newThread(r);
        thread.setName("client-tanscation-msg-check-thread");
        return thread;
      }
    });
    producer.setExecutorService(executorService);
    //5、  producer
    producer.start();

    //6.      ,    Topic、Tag     String topic, String tags, String keys, byte[] body
    Message message = new Message("Topic_transaction_demo", //  
        "Tags", //        
        "Key_1", //     
        ("hello-transaction").getBytes(RemotingHelper.DEFAULT_CHARSET));

    //7、      
    TransactionSendResult result = producer.sendMessageInTransaction(message, "hello-transaction");

    producer.shutdown();
  }
}
3、トランザクションメッセージを送信するには、Transaction Listenerインターフェースを実現するための事務傍受対象が必要であり、2つの方法的役割は、それぞれローカルトランザクションとメッセージングを実行することである。

public class TransactionListenerImol implements TransactionListener {
  //         key:  id value:         
  private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
  //      
  @Override
  public LocalTransactionState executeLocalTransaction(Message message, Object o) {
    //  id
    String transactionId = message.getTransactionId();
    //0:   ,     1:     2:    
    localTrans.put(transactionId, 0);
    //    ,    ,service
    System.out.println("hello-demo-transaction");
    try {
      System.out.println("        ---");
      Thread.sleep(60000*2);
      System.out.println("        ---");
      localTrans.put(transactionId, 1);
    } catch (InterruptedException e) {
      e.printStackTrace();
      localTrans.put(transactionId, 2);
      return LocalTransactionState.ROLLBACK_MESSAGE;
    }
    return LocalTransactionState.COMMIT_MESSAGE;
  }

  //    
  @Override
  public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
    //           
    String transactionId = messageExt.getTransactionId();
    //      id    
    Integer status = localTrans.get(transactionId);
    //    
    System.out.println("    ---transactionId:" + transactionId + "  :" + status);
    switch (status) {
      case 0:
        return LocalTransactionState.UNKNOW;
      case 1:
        return LocalTransactionState.COMMIT_MESSAGE;
      case 2:
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
    return LocalTransactionState.UNKNOW;
  }
}
以上が本文の全部です。皆さんの勉強に役に立つように、私たちを応援してください。