Spring Cloud Aliba RocketMQ-非同期通信を構築するマイクロサービス
MQ導入後のアーキテクチャの進化
MQの選択
メッセージキュー比較参照テーブル:
RocketMQ vs. ActiveMQ vs. Kafka:
参照先:メッセージキュー比較参照テーブル RocketMQ vs. ActiveMQ vs. Kafka
CentOS 7上にRocketMQを構築
環境要件: CentOS 7.2 64ビットJDK 1.8+ 4 G+の空きディスク容量 1、RocketMQのバイナリパッケージをダウンロードします.ここでは4.5.1バージョンを使用しています.ダウンロード先は以下の通りです.
http://rocketmq.apache.org/release_notes/release-notes-4.5.1/
wgetコマンドを使用してダウンロード:
2、ダウンロードした圧縮パッケージを解凍し、適切なディレクトリの下に移動する.
注意:unzipコマンドがインストールされていない場合は、yum install-y unzipのコマンドを使用してインストールします.
3、rocketmqのルートディレクトリに入り、次のディレクトリとファイルが含まれているかどうかを確認します.
4、問題がなければ、次のコマンドを使用してName Serverを起動します.
5.デフォルトの9876ポートがリスニングされているかどうかを確認し、Name Serverが正常に起動したかどうかを確認します.
6、Brokerを起動する:
7、Brokerが起動に成功したかどうかを検証し、起動に成功した場合、次のようなログが表示されます.
Name ServerとBrokerを停止するには、次の2つのコマンドを順に実行します.
RocketMQ機能が正常であることを確認
1、生産メッセージが正常であることを検証し、以下のコマンドを実行する.
通常、次のような出力が表示されます.これは、メッセージを生成した後に成功したresultです.
2、消費メッセージが正常であることを検証し、以下のコマンドを実行する.
通常、次のような出力が表示されます.これは消費されたメッセージの内容です.
RocketMQコンソールの構築
RocketMQはSpring Bootに基づいて開発されたビジュアル化コンソールを公式に提供しており、RocketMQの稼働状況を簡単に確認し、運用効率を向上させることができます.このセクションでは、RocketMQを構築するコンソールの構築方法について説明します.私たちが使用しているRocketMQバージョンは4.5.1なので、RocketMQの4.5.1バージョンに合うようにコンソールのソースコードを変更する必要があります.
1、まずソースコードをダウンロードする必要があります.2つの方法があります.1つはgitクローンコード倉庫を使用すること、2つはrocketmq-externalsのzipパッケージを直接ダウンロードすることです.私はgit方式を使用して、以下のコマンドを実行します.
2、コンソールコードを修正し、IDEで
2.1、プロジェクト中の
2.2、依存を修正します.consoleプロジェクトのデフォルトで使用されているrocketmqバージョンは4.4.0で、ここで使用しているのは4.5.1と完全に互換性がないので、依存バージョンを修正して、この行を見つける必要があります.
次のように変更します.
2.3、コードを修正し、rocketmqのバージョンを修正したため、
次のように変更します.
3、構築をパッケージして起動し、ideaのterminalを開き、次のコマンドを実行します.
4、ブラウザを使ってコンソールにアクセスする.ここではポートを修正したので、アクセスアドレスは
英語に慣れていなければ右上で言語を切り替えることができます:
コンソールはビジュアル化されたインタフェースであり、中国語をサポートしているため、ここでは多くの説明にすぎません.公式のコンソール使用説明ドキュメントを参照してください. RocketMQ使用文書 RocketMQ用語と概念
ここでは基本的な用語と概念を簡単に思考ガイドにまとめました:
公式ドキュメント: Apache RocketMQ開発者ガイド Springメッセージプログラミングモデル-作成者
上記のセクションでRocketMQを構築した後、Springのメッセージプログラミングモデルを使用して、簡単な例を作成します.まず、プロジェクトに依存関係を追加する必要があります.
プロファイルにrocketmqを追加する構成は、次のとおりです.
生産者のコードを作成します.ここではControllerを例に挙げます.具体的なコードは以下の通りです.
作成が完了したら、プロジェクトを開始し、このインタフェースにアクセスします:
メッセージ送信に成功したら、RocketMQのコンソールで確認できます:
メッセージ・ボディは、メッセージの詳細を次のように表示できます.
生産者のコードから見ると、非常に簡単で、RocketMQTemplateを1つ使うだけでオブジェクトをメッセージ体に変換してメッセージを送信することができます.実際にはRocketMQ以外にも、他のMQにも対応するTemplateがあり、以下のようになっています. RocketMQ:RocketMQTemplate ActiveMQ/Artemis:JmsTemplate RabbitMQ:AmqpTemplate Kafka:KafkaTemplate
Springメッセージプログラミングモデル-消費者の作成
消費者プロジェクトでは、rocketmqの依存も追加する必要があります.
同様に、Name Serverの接続先を設定する必要があります.
消費者のコードを作成します.具体的なコードは以下の通りです.
作成が完了したらプロジェクトを開始します.以前はキューにメッセージを送信していたので、消費者プロジェクトが起動すると、メッセージを傍受して消費することができます.コンソールは次のログを出力します.
RocketMQトランザクションメッセージ
RocketMQがトランザクション・メッセージをサポートしていることはよく知られていますが、これは多くの人がRocketMQをメッセージ・ミドルウェアとして使用することを選んだ大きな原因であり、RocketMQの大きな特定でもあります.RocketMQトランザクションメッセージの流れを下図に示します.画像:http://rocketmq.apache.org/rocketmq/the-design-of-transactional-message
原図は英語なので、大まかな翻訳を行いました.
プロセスを簡単に分析します.
1、生産者はMQサーバーに半メッセージを送信し、半メッセージは特殊なメッセージであり、このメッセージはMQサーバーに格納されるが、一時的に配達できない状態としてマークされるため、消費者はこのメッセージを消費しない
2.半メッセージ送信に成功すると、生産者はローカル事務を実行する
3、生産者はローカルトランザクションの実行結果に基づいて、MQサーバにcommitまたはrollbackメッセージを二次確認する.MQサーバがcommitを受信すると、メッセージの半分が配達可能状態としてマークされ、消費者は消費することができる.逆に,MQサーバがrollbackを受信するとメッセージの半分が破棄され,消費者は消費できなくなる.
4、MQサーバーが二次確認のメッセージを受信していない場合、または生産者がローカルトランザクションの実行を一時停止した場合、MQサーバーはタイミング(デフォルト1分)で生産者にレビューメッセージを送信し、生産者のローカルトランザクションのステータスを確認します.その後、生産者は、レビューされたローカルトランザクションの実行結果に基づいて、commitまたはrollbackメッセージをMQサーバに再送信します.
概念用語:ハーフメッセージ(Half Message):一時的に消費できないメッセージで、生産者はMQサーバにメッセージを送信したが、このメッセージは「一時的に配達できない」状態とマークされ、先に格納される.消費者はこのニュースを消費しません メッセージレビュー(Message Status Check):ネットワークの切断または生産者の再起動により、トランザクション・メッセージが失われる可能性のある2回目の確認.MQサーバは、メッセージが長い間半メッセージ状態であることを発見すると、メッセージの最終状態(コミットまたはロールバック) を問い合わせる要求をメッセージプロバイダに送信する.
メッセージ3: Commit:トランザクションメッセージを送信し、消費者はこのメッセージ を消費することができる. Rollback:トランザクションメッセージをロールバックし、brokerはメッセージを削除し、消費者は を消費できません. UNKNOWN:brokerメッセージのステータスを確認する必要がある エンコーディングによるRocketMQトランザクションメッセージの実装
RocketMQトランザクションメッセージを実装するには、フローチャートに従っていくつかのコードを記述する必要があります.エンコーディングを開始する前に、まずデータベースにRocketMQのトランザクション・ログ・テーブルを作成し、ローカル・トランザクション・レビューの根拠として使用します.テーブル構造は次のとおりです.
次に、トランザクション・メソッドとして動作するデータ・テーブルを再構築します.テーブル構造は次のとおりです.
次にコードを書き始め、まずサービスを定義します.そこにはトランザクション注釈付きの方法とトランザクションメッセージを送信する方法があります.具体的なコードは以下の通りです.
ローカルトランザクション・リスナーを実装し、トランザクション・メソッドを実行し、ローカル・トランザクション・ステータスのレビュー・メソッドを提供します.具体的なコードは以下の通りです.
これらのメソッドの実行プロセスを簡単に説明します.
まず,
RocketMQログ関連ピット
rocketmqには独自のログシステムがあるため、デフォルトではSlf 4 jは使用されません.
MQの選択
メッセージキュー比較参照テーブル:
RocketMQ vs. ActiveMQ vs. Kafka:
参照先:
CentOS 7上にRocketMQを構築
環境要件:
http://rocketmq.apache.org/release_notes/release-notes-4.5.1/
wgetコマンドを使用してダウンロード:
[root@study-01 ~]# cd /usr/local/src
[root@study-01 /usr/local/src]# wget http://mirror.bit.edu.cn/apache/rocketmq/4.5.1/rocketmq-all-4.5.1-bin-release.zip
2、ダウンロードした圧縮パッケージを解凍し、適切なディレクトリの下に移動する.
[root@study-01 /usr/local/src]# unzip rocketmq-all-4.5.1-bin-release.zip
[root@study-01 /usr/local/src]# mv rocketmq-all-4.5.1-bin-release /usr/local/rocketmq-4.5.1
注意:unzipコマンドがインストールされていない場合は、yum install-y unzipのコマンドを使用してインストールします.
3、rocketmqのルートディレクトリに入り、次のディレクトリとファイルが含まれているかどうかを確認します.
[root@study-01 /usr/local/src]# cd /usr/local/rocketmq-4.5.1
[root@study-01 /usr/local/rocketmq-4.5.1]# ls
benchmark bin conf lib LICENSE NOTICE README.md
4、問題がなければ、次のコマンドを使用してName Serverを起動します.
[root@study-01 /usr/local/rocketmq-4.5.1]# nohup sh bin/mqnamesrv &
[1] 2448
[root@study-01 /usr/local/rocketmq-4.5.1]#
5.デフォルトの9876ポートがリスニングされているかどうかを確認し、Name Serverが正常に起動したかどうかを確認します.
[root@study-01 /usr/local/rocketmq-4.5.1]# netstat -lntp |grep java
tcp6 0 0 :::9876 :::* LISTEN 2454/java
[root@study-01 /usr/local/rocketmq-4.5.1]#
6、Brokerを起動する:
[root@study-01 /usr/local/rocketmq-4.5.1]# nohup sh bin/mqbroker -n localhost:9876 &
[2] 2485
[root@study-01 /usr/local/rocketmq-4.5.1]#
7、Brokerが起動に成功したかどうかを検証し、起動に成功した場合、次のようなログが表示されます.
[root@study-01 /usr/local/rocketmq-4.5.1]# cat ~/logs/rocketmqlogs/broker.log |grep "boot success"
2019-08-04 01:27:38 INFO main - The broker[study-01, 192.168.190.129:10911] boot success. serializeType=JSON and name server is localhost:9876
[root@study-01 /usr/local/rocketmq-4.5.1]#
Name ServerとBrokerを停止するには、次の2つのコマンドを順に実行します.
[root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/mqshutdown broker
The mqbroker(2492) is running...
Send shutdown request to mqbroker(2492) OK #
[root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/mqshutdown namesrv
The mqnamesrv(2454) is running...
Send shutdown request to mqnamesrv(2454) OK #
[2]+ 143 nohup sh bin/mqbroker -n localhost:9876
[root@study-01 /usr/local/rocketmq-4.5.1]#
RocketMQ機能が正常であることを確認
1、生産メッセージが正常であることを検証し、以下のコマンドを実行する.
[root@study-01 /usr/local/rocketmq-4.5.1]# export NAMESRV_ADDR=localhost:9876
[root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
通常、次のような出力が表示されます.これは、メッセージを生成した後に成功したresultです.
SendResult [sendStatus=SEND_OK, msgId=C0A8BE810A690D7163610FCC253B03E7, offsetMsgId=C0A8BE8100002A9F000000000002BDFE, messageQueue=MessageQueue [topic=TopicTest, brokerName=study-01, queueId=3], queueOffset=249]
2、消費メッセージが正常であることを検証し、以下のコマンドを実行する.
[root@study-01 /usr/local/rocketmq-4.5.1]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
通常、次のような出力が表示されます.これは消費されたメッセージの内容です.
ConsumeMessageThread_6 Receive New Messages: [MessageExt [queueId=3, storeSize=180, queueOffset=242, sysFlag=0, bornTimestamp=1564853837073, bornHost=/192.168.190.129:34708, storeTimestamp=1564853837074, storeHost=/192.168.190.129:10911, msgId=C0A8BE8100002A9F000000000002AA4E, commitLogOffset=174670, bodyCRC=911284903, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1564854006859, UNIQ_KEY=C0A8BE810A690D7163610FCC251103CB, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 55, 49], transactionId='null'}]]
RocketMQコンソールの構築
RocketMQはSpring Bootに基づいて開発されたビジュアル化コンソールを公式に提供しており、RocketMQの稼働状況を簡単に確認し、運用効率を向上させることができます.このセクションでは、RocketMQを構築するコンソールの構築方法について説明します.私たちが使用しているRocketMQバージョンは4.5.1なので、RocketMQの4.5.1バージョンに合うようにコンソールのソースコードを変更する必要があります.
1、まずソースコードをダウンロードする必要があります.2つの方法があります.1つはgitクローンコード倉庫を使用すること、2つはrocketmq-externalsのzipパッケージを直接ダウンロードすることです.私はgit方式を使用して、以下のコマンドを実行します.
git clone https://github.com/apache/rocketmq-externals.git
2、コンソールコードを修正し、IDEで
rocketmq-console
項目を開く.下図のように:2.1、プロジェクト中の
application.properties
配置ファイルを修正する.ここでは主にリスニングポートとName Serverの接続アドレスを修正した.その他の配置項目について必要があれば、説明に従って自分で修正することができる.# console , 8080
server.port=8011
# Name Server ; , console , - - NameSvrAddrList
rocketmq.config.namesrvAddr=192.168.190.129:9876
2.2、依存を修正します.consoleプロジェクトのデフォルトで使用されているrocketmqバージョンは4.4.0で、ここで使用しているのは4.5.1と完全に互換性がないので、依存バージョンを修正して、この行を見つける必要があります.
4.4.0
次のように変更します.
4.5.1
2.3、コードを修正し、rocketmqのバージョンを修正したため、
org.apache.rocketmq.console.service.impl.MessageServiceImpl#queryMessageByTopic
方法のコンパイルエラーが発生するので、ここのコードを変更する必要があります.@Override
public List queryMessageByTopic(String topic, final long begin, final long end) {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, null);
...
次のように変更します.
@Override
public List queryMessageByTopic(String topic, final long begin, final long end) {
RPCHook rpcHook = null;
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook);
...
3、構築をパッケージして起動し、ideaのterminalを開き、次のコマンドを実行します.
# rocketmq-console
mvn clean package -DskipTests
# jar
cd target
# rocketmq console
java -jar rocketmq-console-ng-1.0.1.jar
4、ブラウザを使ってコンソールにアクセスする.ここではポートを修正したので、アクセスアドレスは
http://localhost:8011
である.通常の状況では、英語に慣れていなければ右上で言語を切り替えることができます:
コンソールはビジュアル化されたインタフェースであり、中国語をサポートしているため、ここでは多くの説明にすぎません.公式のコンソール使用説明ドキュメントを参照してください.
ここでは基本的な用語と概念を簡単に思考ガイドにまとめました:
公式ドキュメント:
上記のセクションでRocketMQを構築した後、Springのメッセージプログラミングモデルを使用して、簡単な例を作成します.まず、プロジェクトに依存関係を追加する必要があります.
org.apache.rocketmq
rocketmq-spring-boot-starter
2.0.3
プロファイルにrocketmqを追加する構成は、次のとおりです.
rocketmq:
name-server: 192.168.190.129:9876
producer:
# : group
group: test-group
生産者のコードを作成します.ここではControllerを例に挙げます.具体的なコードは以下の通りです.
package com.zj.node.contentcenter.controller.content;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
/**
*
*
* @author 01
* @date 2019-08-03
**/
@RestController
@RequiredArgsConstructor
public class TestProducerController {
/**
* RocketMQ api
*/
private final RocketMQTemplate rocketMQTemplate;
@GetMapping("/test-rocketmq/sendMsg")
public String testSendMsg() {
String topic = "test-topic";
//
rocketMQTemplate.convertAndSend(topic, MyMessage.getInstance());
return "send message success";
}
}
@Data
class MyMessage {
private Integer id;
private String name;
private String status;
private Date createTime;
static MyMessage getInstance() {
MyMessage message = new Message();
message.id = 1;
message.name = "×××";
message.status = "default";
message.createTime = new Date();
return message;
}
}
作成が完了したら、プロジェクトを開始し、このインタフェースにアクセスします:
メッセージ送信に成功したら、RocketMQのコンソールで確認できます:
メッセージ・ボディは、メッセージの詳細を次のように表示できます.
生産者のコードから見ると、非常に簡単で、RocketMQTemplateを1つ使うだけでオブジェクトをメッセージ体に変換してメッセージを送信することができます.実際にはRocketMQ以外にも、他のMQにも対応するTemplateがあり、以下のようになっています.
Springメッセージプログラミングモデル-消費者の作成
消費者プロジェクトでは、rocketmqの依存も追加する必要があります.
org.apache.rocketmq
rocketmq-spring-boot-starter
2.0.3
同様に、Name Serverの接続先を設定する必要があります.
rocketmq:
name-server: 192.168.190.129:9876
消費者のコードを作成します.具体的なコードは以下の通りです.
package com.zj.node.usercenter.rocketmq;
import com.alibaba.fastjson.JSON;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
*
*
* @author 01
* @date 2019-08-03
**/
@Slf4j
@Component
// topic topic ,consumerGroup ,
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer-group")
public class TestConsumerListener implements RocketMQListener {
/**
*
*
* @param message
*/
@Override
public void onMessage(MyMessage message) {
log.info(" test-topic ");
log.info(JSON.toJSONString(message));
}
}
/**
*
*/
@Data
class MyMessage {
private Integer id;
private String name;
private String status;
private Date createTime;
}
作成が完了したらプロジェクトを開始します.以前はキューにメッセージを送信していたので、消費者プロジェクトが起動すると、メッセージを傍受して消費することができます.コンソールは次のログを出力します.
RocketMQトランザクションメッセージ
RocketMQがトランザクション・メッセージをサポートしていることはよく知られていますが、これは多くの人がRocketMQをメッセージ・ミドルウェアとして使用することを選んだ大きな原因であり、RocketMQの大きな特定でもあります.RocketMQトランザクションメッセージの流れを下図に示します.
原図は英語なので、大まかな翻訳を行いました.
プロセスを簡単に分析します.
1、生産者はMQサーバーに半メッセージを送信し、半メッセージは特殊なメッセージであり、このメッセージはMQサーバーに格納されるが、一時的に配達できない状態としてマークされるため、消費者はこのメッセージを消費しない
2.半メッセージ送信に成功すると、生産者はローカル事務を実行する
3、生産者はローカルトランザクションの実行結果に基づいて、MQサーバにcommitまたはrollbackメッセージを二次確認する.MQサーバがcommitを受信すると、メッセージの半分が配達可能状態としてマークされ、消費者は消費することができる.逆に,MQサーバがrollbackを受信するとメッセージの半分が破棄され,消費者は消費できなくなる.
4、MQサーバーが二次確認のメッセージを受信していない場合、または生産者がローカルトランザクションの実行を一時停止した場合、MQサーバーはタイミング(デフォルト1分)で生産者にレビューメッセージを送信し、生産者のローカルトランザクションのステータスを確認します.その後、生産者は、レビューされたローカルトランザクションの実行結果に基づいて、commitまたはrollbackメッセージをMQサーバに再送信します.
概念用語:
メッセージ3:
RocketMQトランザクションメッセージを実装するには、フローチャートに従っていくつかのコードを記述する必要があります.エンコーディングを開始する前に、まずデータベースにRocketMQのトランザクション・ログ・テーブルを作成し、ローカル・トランザクション・レビューの根拠として使用します.テーブル構造は次のとおりです.
次に、トランザクション・メソッドとして動作するデータ・テーブルを再構築します.テーブル構造は次のとおりです.
次にコードを書き始め、まずサービスを定義します.そこにはトランザクション注釈付きの方法とトランザクションメッセージを送信する方法があります.具体的なコードは以下の通りです.
package com.zj.node.contentcenter.service.test;
import com.zj.node.contentcenter.dao.content.NoticeMapper;
import com.zj.node.contentcenter.dao.log.RocketmqTransactionLogMapper;
import com.zj.node.contentcenter.domain.entity.content.Notice;
import com.zj.node.contentcenter.domain.entity.log.RocketmqTransactionLog;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Date;
import java.util.UUID;
/**
* @author 01
* @date 2019-08-08
**/
@Service
@RequiredArgsConstructor
public class TestProducerService {
private final RocketMQTemplate rocketMQTemplate;
private final NoticeMapper noticeMapper;
private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
public String testSendMsg(Notice notice) {
// topic
String topic = "test-topic";
//
String txProducerGroup = "tx-test-producer-group";
// id
String transactionId = UUID.randomUUID().toString();
//
rocketMQTemplate.sendMessageInTransaction(
txProducerGroup, topic,
//
MessageBuilder.withPayload(" ")
// header ,
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.setHeader("notice_id", notice.getId())
.build(),
// executeLocalTransaction
notice);
return "send message success";
}
@Transactional(rollbackFor = Exception.class)
public void updateNotice(Integer noticeId, Notice notice) {
Notice newNotice = new Notice();
newNotice.setId(noticeId);
newNotice.setContent(notice.getContent());
noticeMapper.updateByPrimaryKeySelective(newNotice);
}
@Transactional(rollbackFor = Exception.class)
public void updateNoticeWithRocketMQLog(Integer noticeId, Notice notice, String transactionId) {
updateNotice(noticeId, notice);
//
rocketmqTransactionLogMapper.insertSelective(
RocketmqTransactionLog.builder()
.transactionId(transactionId)
.log("updateNotice")
.build()
);
}
}
ローカルトランザクション・リスナーを実装し、トランザクション・メソッドを実行し、ローカル・トランザクション・ステータスのレビュー・メソッドを提供します.具体的なコードは以下の通りです.
package com.zj.node.contentcenter.rocketmq;
import com.zj.node.contentcenter.dao.log.RocketmqTransactionLogMapper;
import com.zj.node.contentcenter.domain.entity.content.Notice;
import com.zj.node.contentcenter.domain.entity.log.RocketmqTransactionLog;
import com.zj.node.contentcenter.service.test.TestProducerService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
/**
*
*
* @author 01
* @date 2019-08-08
**/
@Slf4j
@RequiredArgsConstructor
// txProducerGroup sendMessageInTransaction
@RocketMQTransactionListener(txProducerGroup = "tx-test-producer-group")
public class TestTransactionListener implements RocketMQLocalTransactionListener {
private final TestProducerService service;
private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
/**
*
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
log.info(" . id: {}", transactionId);
// header String
Integer noticeId = Integer.parseInt((String) headers.get("notice_id"));
try {
//
service.updateNoticeWithRocketMQLog(noticeId, (Notice) arg, transactionId);
// , MQ Server commit
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error(" , ", e);
// MQ Server rollback
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
*
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
log.warn(" . id: {}", transactionId);
// id
RocketmqTransactionLog transactionLog = rocketmqTransactionLogMapper.selectOne(
RocketmqTransactionLog.builder()
.transactionId(transactionId)
.build()
);
// id ,
if (transactionLog == null) {
log.warn(" , , . id: {}", transactionId);
return RocketMQLocalTransactionState.ROLLBACK;
}
return RocketMQLocalTransactionState.COMMIT;
}
}
これらのメソッドの実行プロセスを簡単に説明します.
まず,
TestProducerService.testSendMsg
を呼び出してMQサーバに半メッセージを送信し,このメソッドではローカルトランザクションメソッドが実行されないこともコードから分かる.MQ Serverが半メッセージの受信に成功すると、生産者に受信に成功したことを通知し、その後、executeLocalTransaction
のトランザクション注釈付きメソッドTestProducerService
が呼び出され、トランザクションメソッドの実行が完了すると、MQサーバにローカルトランザクションステータスが返されます.updateNoticeWithRocketMQLog
メソッドが返すトランザクション状態がexecuteLocalTransaction
または何らかの理由で実行されていない場合、MQ Serverは二次確認メッセージを受信できず、デフォルトでは1分後に生産者にレビューメッセージを送信し、生産者がレビューメッセージを受信するとローカルトランザクションリスナーのUNKNOWN
メソッドが実行されます.トランザクション・ログでテーブルのデータを記録し、トランザクションのステータスを確認して返します.RocketMQログ関連ピット
rocketmqには独自のログシステムがあるため、デフォルトではSlf 4 jは使用されません.
checkLocalTransaction
メソッドに反映すると、このメソッドの実行中に異常が投げ出された場合、異常情報はコンソールに印刷されずrocketmq_に出力されるclient.logログファイルにあります.関連ソース:executeLocalTransaction
rocketmqのログをコンソールに出力する場合は、クラスを起動するmainメソッドに次のコードを追加する必要があります.// rocketmq slf4j
System.setProperty(ClientLogger.CLIENT_LOG_USESLF4J, "true");