Spring Cloud Stream-ビルドメッセージイベント駆動のマイクロサービス
16815 ワード
Spring Coud Stream概要
Spring Coud Streamとは何ですか?
Spring Cloud StreamはSpring Cloudのサブプロジェクトであり、MQをより便利に操作できるフレームワークであり、メッセージの中間部品に接続された高度に伸縮可能なメッセージイベント駆動のマイクロサービスを構築する目的である.
簡単に言うとSpring Cloud StreamはMQ動作を簡略化したフレームであり、そのフレームの構図は以下の通りである.
Spring Coud Streamプログラミングモデル:
図中の概念について:
Spring Cloud Streamを使う
現在は生産者としてのマイクロサービスプロジェクトがあります.このマイクロサービスのためにSpring Cloud Streamを統合します.第一歩はstream依存を追加します.
com.alibaba.cloud
spring-cloud-starter-stream-rocketmq
第二のステップは、起動クラスに
@EnableBinding
の注釈を追加し、以下の通りである.import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
@EnableBinding(Source.class)
...
ステップ3では、配置ファイルにstreamに関する設定項目を追加します.spring:
cloud:
stream:
rocketmq:
binder:
name-server: 192.168.190.129:9876
bindings:
# output
output:
# topic
destination: stream-test-topic
以上のステップを完了したら、プロジェクトはもうSpring Cloud Streamに統合されました.今はSpring Coud Streamを使って生産者を作成します.具体的なコードは以下の通りです.package com.zj.node.contentcenter.controller.content;
import lombok.RequiredArgsConstructor;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
*
*
* @author 01
* @date 2019-08-10
**/
@RestController
@RequiredArgsConstructor
public class TestProducerController {
private final Source source;
@GetMapping("/test-stream")
public String testStream(){
Message message = MessageBuilder
.withPayload(" ")
.build();
source.output()
.send(message);
return "send message success!";
}
}
プロジェクトを起動して、このインターフェースが成功的に実行できるかどうかをテストします.もう一つの消費者向けのサービス項目:user-center、Spring Cloud Streamは、依存配置が同じなので、ここでは重複しませんが、配置と注釈の種類は変更が必要です.まず、以下のように構成されています.
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 192.168.190.129:9876
bindings:
# input
input:
# topic
destination: stream-test-topic
# rocketmq group,
# MQ,
group: binder-group
起動クラスのコメントは以下の通りです.import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
@EnableBinding(Sink.class)
...
統合完了後、Spring Cloud Streamを使って消費者を作成します.具体的なコードは以下の通りです.package com.zj.node.usercenter.rocketmq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Service;
/**
*
*
* @author 01
* @date 2019-08-10
**/
@Slf4j
@Service
public class TestStreamConsumer {
@StreamListener(Sink.INPUT)
public void receive(String messageBody) {
log.info(" stream ,messageBody = {}", messageBody);
}
}
コードの作成が完了したらプロジェクトを開始します.以前はすでに生産者を通じてRocketMQにメッセージを送りましたので、コンソールは受信したメッセージを出力します.Spring Coud Streamカスタムインターフェース
以上の細かい点の勉強を通して、Spring Cloud Streamの基本的な使用を了解しました.上記の例から、inputは1つのtopic消費メッセージをバインドするために使用され、outputは逆に、1つのtopic配信メッセージをバインドするために使用されることが分かる.
しかし、実際のプロジェクトでは、複数のtopicがあるかもしれません.極端なシーンでも、異なるtopicは異なるMQを使用して実現されるかもしれません.streamがデフォルトで提供しているinputとoutputは一つのtopicしか結合できませんので、この時はstreamのカスタムインターフェースで複数の「input」と「output」の結合が異なるtopicを実現する必要があります.
以上の小節の例では、生産者がメッセージを送信する際に使用するのは
Source
インターフェースのoutput
方法であり、消費者がメッセージを送信する時に使用するのはSink
インターフェースのinput
方法であり、起動クラスの@EnableBinding
注釈に配置する必要があることが分かる.したがって、実際にはカスタムインターフェースのソースコードとこの2つのインターフェースのソースコードはほとんど一致している必要がありますが、名前が異なるだけで、使用もSource
とSink
をユーザー定義のインターフェースに変更するだけです.次に,どのようにインターフェースをカスタマイズして使うかを簡単に実証し,前の小節の例に基づいて改造した.まず生産者であり、メッセージを送信するためのインターフェースを定義し、具体的なコードは以下の通りである.
package com.zj.node.contentcenter.rocketmq;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
* , stream Source
*
* @author 01
* @date 2019-08-10
**/
public interface MySource {
/**
* Name of the output channel.
*/
String MY_OUTPUT = "my-output";
/**
* @return output channel
*/
@Output(MY_OUTPUT)
MessageChannel output();
}
起動クラス@EnableBinding
に、このインターフェースを追加する.import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
@EnableBinding({Source.class, MySource.class})
...
設定ファイルに以下の構成を追加します.spring:
cloud:
stream:
rocketmq:
binder:
name-server: 192.168.190.129:9876
bindings:
# output
output:
# topic
destination: stream-test-topic
# ”output“, MySource MY_OUTPUT
my-output:
# topic
destination: stream-my-topic
生産者のコードを修正すれば次のとおりです.package com.zj.node.contentcenter.controller.content;
import com.zj.node.contentcenter.rocketmq.MySource;
import lombok.RequiredArgsConstructor;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
*
*
* @author 01
* @date 2019-08-03
**/
@RestController
@RequiredArgsConstructor
public class TestProducerController {
private final MySource mySource;
@GetMapping("/test-stream")
public String testStream(){
Message message = MessageBuilder
.withPayload(" ")
.build();
mySource.output()
.send(message);
return "send message success!";
}
}
プロジェクトを起動してインターフェースにアクセスし、テストメッセージが正常に送信されますか?生産者を改造した後、消費者を改造し、まず消費者メッセージ用のインターフェースを定義します.具体的なコードは以下の通りです.
package com.zj.node.usercenter.rocketmq;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
/**
* , stream Sink
*
* @author 01
* @date 2019-08-10
**/
public interface MySink {
/**
* Input channel name.
*/
String MY_INPUT = "my-input";
/**
* @return input channel.
*/
@Input(MY_INPUT)
SubscribableChannel input();
}
起動クラスの@EnableBinding
にも、このインターフェースを追加する必要があります.import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
@EnableBinding({Sink.class, MySink.class})
...
設定ファイルに以下の構成を追加します.spring:
cloud:
stream:
rocketmq:
binder:
name-server: 192.168.190.129:9876
bindings:
# input
input:
# topic
destination: stream-test-topic
# rocketmq group,
# MQ,
group: binder-group
# ”input“, MySink MY_INPUT
my-input:
# topic
destination: stream-my-topic
group: my-group
消費者のコードを修正すると次のようになります.package com.zj.node.usercenter.rocketmq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Service;
/**
*
*
* @author 01
* @date 2019-08-10
**/
@Slf4j
@Service
public class TestStreamConsumer {
@StreamListener(MySink.MY_INPUT)
public void receive(String messageBody) {
log.info(" - stream ,messageBody = {}", messageBody);
}
}
プロジェクトを開始します.以前はすでに生産者を通じてRocketMQにメッセージを送りましたので、コンソールは受信したメッセージを出力します.Spring Coud Streamのモニタ
我々はSpring Boot Actutorコンポーネントが監視ポイントを暴露するために使用されることを知っています.多くの監視ツールはこのコンポーネントの監視ポイントに依存して監視を行う必要があります.プロジェクトがStreamとActutorを統合すると、対応する監視ポイントが暴露されます.まずプロジェクトにActutorを統合する必要があります.以下のような依存性を追加します.
org.springframework.boot
spring-boot-starter-actuator
設定ファイルに以下の構成を追加します.management:
endpoints:
web:
exposure:
#
include: '*'
endpoint:
health:
#
show-details: always
アクセスhttp://127.0.0.1:{ }/actuator
は、露出したすべての監視エンドポイントを取得することができ、Streamの関連監視ポイントもその列にあり、以下の図である./actuator/bindings
エンドポイントは、bindings関連情報を調べるために使用することができる./actuator/channels
エンドポイントはchanelsの関連情報を調べるために使われますが、「input」と「output」はいわゆるチャンネルです.これらのチャンネルはtopicの抽象と考えられます./actuator/health
エンドポイントでは、binderおよびRocketMQの状態が確認され、主にMQの接続状況を確認するために使用され、そのstatusに接続されていない場合はDOWNとなる.Spring Cloud Stream+RocketMQ実現ビジネスメッセージ
以前Spring Cloud Alibaba RocketMQ-非同期通信を構築するためのマイクロサービスの文末において,RocketMQのビジネスメッセージを紹介し,どのように符号化するかを実証しました.本論文でSpring Cloud Streamを学習した後、Streamと結合して、以前の事務メッセージを実現するコードを再構成する.
まず設定ファイルを変更します.
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 192.168.190.129:9876
bindings:
output:
producer:
# , output channel
transactional: true
#
group: tx-test-producer-group
bindings:
# output
output:
# topic
destination: stream-test-topic
次にTestProducerService
を再構成し、具体的なコードは以下の通りである.package com.zj.node.contentcenter.service.test;
import com.alibaba.fastjson.JSON;
import com.zj.node.contentcenter.dao.content.NoticeMapper;
import com.zj.node.contentcenter.dao.log.RocketmqTransactionLogMapper;
import com.zj.node.contentcenter.domain.entity.content.Notice;
import com.zj.node.contentcenter.domain.entity.log.RocketmqTransactionLog;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.UUID;
/**
* @author 01
* @date 2019-08-08
**/
@Service
@RequiredArgsConstructor
public class TestProducerService {
private final NoticeMapper noticeMapper;
private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
private final Source source;
public String testSendMsg(Notice notice) {
// id
String transactionId = UUID.randomUUID().toString();
// stream ,
source.output().send(
MessageBuilder.withPayload(" ")
// header ,
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.setHeader("notice_id", notice.getId())
// json, toString
.setHeader("notice", JSON.toJSONString(notice))
.build()
);
return "send message success";
}
@Transactional(rollbackFor = Exception.class)
public void updateNotice(Integer noticeId, Notice notice) {
Notice newNotice = new Notice();
newNotice.setId(noticeId);
newNotice.setContent(notice.getContent());
noticeMapper.updateByPrimaryKeySelective(newNotice);
}
@Transactional(rollbackFor = Exception.class)
public void updateNoticeWithRocketMQLog(Integer noticeId, Notice notice, String transactionId) {
updateNotice(noticeId, notice);
//
rocketmqTransactionLogMapper.insertSelective(
RocketmqTransactionLog.builder()
.transactionId(transactionId)
.log("updateNotice")
.build()
);
}
}
最後にTestTransactionListener
を再構成し、具体的なコードは以下の通りである.package com.zj.node.contentcenter.rocketmq;
import com.alibaba.fastjson.JSON;
import com.zj.node.contentcenter.dao.log.RocketmqTransactionLogMapper;
import com.zj.node.contentcenter.domain.entity.content.Notice;
import com.zj.node.contentcenter.domain.entity.log.RocketmqTransactionLog;
import com.zj.node.contentcenter.service.test.TestProducerService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
/**
*
*
* @author 01
* @date 2019-08-08
**/
@Slf4j
@RequiredArgsConstructor
// txProducerGroup
@RocketMQTransactionListener(txProducerGroup = "tx-test-producer-group")
public class TestTransactionListener implements RocketMQLocalTransactionListener {
private final TestProducerService service;
private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
/**
*
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
log.info(" . id: {}", transactionId);
Integer noticeId = Integer.parseInt((String) headers.get("notice_id"));
// header json
Notice notice = JSON.parseObject((String) headers.get("notice"), Notice.class);
try {
//
service.updateNoticeWithRocketMQLog(noticeId, notice, transactionId);
// MQ Server commit
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error(" , ", e);
// MQ Server rollback
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
*
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
log.warn(" . id: {}", transactionId);
// id
RocketmqTransactionLog transactionLog = rocketmqTransactionLogMapper.selectOne(
RocketmqTransactionLog.builder()
.transactionId(transactionId)
.build()
);
// id ,
if (transactionLog == null) {
log.warn(" , , . id: {}", transactionId);
return RocketMQLocalTransactionState.ROLLBACK;
}
return RocketMQLocalTransactionState.COMMIT;
}
}
拡張記事: