ロックMqトランザクションメッセージ送信コードフロー詳細
一、RocketMq事務メッセージの流れ:
1、まずbrookに事前要請メッセージを送ります。消費者は見えません。
2、コールバックはローカルトランザクションを実行します。たとえば、データベースを操作します。
3、事務の実行が成功したら、再度brookにメッセージを送り、brook事務の実行が成功したというメッセージを消費者に見せてください。ローカルトランザクションがタイムアウトしたらunknowに戻ります。brookはメッセージを送り、メッセージが実行されたかどうかを確認します。
二、RocketMq事務メッセージの例:
1、ロックMqに関する依存を導入する:
1、まずbrookに事前要請メッセージを送ります。消費者は見えません。
2、コールバックはローカルトランザクションを実行します。たとえば、データベースを操作します。
3、事務の実行が成功したら、再度brookにメッセージを送り、brook事務の実行が成功したというメッセージを消費者に見せてください。ローカルトランザクションがタイムアウトしたらunknowに戻ります。brookはメッセージを送り、メッセージが実行されたかどうかを確認します。
二、RocketMq事務メッセージの例:
1、ロックMqに関する依存を導入する:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
2、Transation Producerクラスを作成する:
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException {
//
TransactionMQProducer producer = new TransactionMQProducer("rocketMQ_transaction_producer_group");
//2. Nameserver
producer.setNamesrvAddr("192.168.***.***:9876");
//3、
TransactionListener listener = new TransactionListenerImol();
producer.setTransactionListener(listener);
//4、
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = newThread(r);
thread.setName("client-tanscation-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
//5、 producer
producer.start();
//6. , Topic、Tag String topic, String tags, String keys, byte[] body
Message message = new Message("Topic_transaction_demo", //
"Tags", //
"Key_1", //
("hello-transaction").getBytes(RemotingHelper.DEFAULT_CHARSET));
//7、
TransactionSendResult result = producer.sendMessageInTransaction(message, "hello-transaction");
producer.shutdown();
}
}
3、トランザクションメッセージを送信するには、Transaction Listenerインターフェースを実現するための事務傍受対象が必要であり、2つの方法的役割は、それぞれローカルトランザクションとメッセージングを実行することである。
public class TransactionListenerImol implements TransactionListener {
// key: id value:
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
//
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
// id
String transactionId = message.getTransactionId();
//0: , 1: 2:
localTrans.put(transactionId, 0);
// , ,service
System.out.println("hello-demo-transaction");
try {
System.out.println(" ---");
Thread.sleep(60000*2);
System.out.println(" ---");
localTrans.put(transactionId, 1);
} catch (InterruptedException e) {
e.printStackTrace();
localTrans.put(transactionId, 2);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.COMMIT_MESSAGE;
}
//
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
//
String transactionId = messageExt.getTransactionId();
// id
Integer status = localTrans.get(transactionId);
//
System.out.println(" ---transactionId:" + transactionId + " :" + status);
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
}
以上が本文の全部です。皆さんの勉強に役に立つように、私たちを応援してください。