Sping Boot入門から実戦までの実戦編(一):カスタムSpring Boot Starter——アリ雲メッセージキューサービスStarterを実現します.
22708 ワード
はい、 Sping Boot入門から実戦の入門編(四):Spring Boot自動化配置 この記事では、Spring Bootの自動化構成の実現は主に以下のように行われていることを知っています.@EnbaleAutoConfigration注釈 SpringAppleクラス spring-boot-at configre jar包 spring.factoresファイル 公式に提供されたstarterの多くは二つのjarパッケージを含んでいます.一つのstarterは実現されていません.ただ管理に依存しています.一つのautconfigreパッケージは自動配置類とMETA-INF/spring.factoresファイルを含んでいます.starterをカスタマイズするときは、一つにまとめることができます.
公式に提供されたスターは、名前はspring-boot-starter-xxに従い、カスタムstarterで、命名はxx-spring-boot-starterに従う.
本文はアリ雲メッセージキューRocketMQサービスに基づいています.https://help.aliyun.com/document_detail/43349.html?spm=a2c4g.11186623.3.2.Ui5KeU)タイミングメッセージと遅延メッセージ(注文書がどのぐらい支払われていないか、自動的にクローズされるかなど)の送受信機能の急速な開発を実現するために、カスタムstarterが実現される.ソースの住所: mq-spring-boot-starter
1 mq-spring-boot-starter mavenプロジェクトを作成します.pom.xmlに依存を導入する:
2属性設定クラスを定義します.属性設定ファイルの「aliyun.mq」で先頭のプロパティを読みだします.
3自動配置クラスを定義します.そのうち @Coditional OnProperty(prefix=「aliyun.mq.com nsumer」、value=「enabled」、havingValue=「true」)は、設定されている属性の中に、属性aliyun.mq.com.enabledが存在し、trueの値があるときに、このBeanが実用化されることを示しています.いくつかのアプリケーションは、生産者または消費者のみが必要であるため、この属性によって、対応するBeaビンが実装されているかどうかを制御することができる.
4.生産者を定義する.send方法は、同期された方法でメッセージコンテンツbodyを一定の遅延後、指定メッセージキューtopicに送信し、タグがタグである(消費者は、同じtopicのメッセージをフィルタすることができる).sendAsyncは、非同期的にメッセージを送信し、メッセージの送信が完了したら、指定されたコールバックSendCallbackによってカスタム処理を行う.ここではデフォルトのコールバックはログでのみ記録されます.
5消費者を定義する.subscribe方法は、topicを指定するいくつかのタグtagsをメッセージ購読し、このtopicにこれらのtagsを含むメッセージが到着すると、message Listenerによって処理される.ここでは、抽象的なクラスのAbstractMessage Listenerを定義し、テンプレート方法によってメッセージの処理ロジックを統一し(正常消費、comit;異常が発生し、再消費)、消費者はAbstractMessage Listenerを継承するだけで、handle方法でメッセージ消費を完了することができます.
6前にすべてのメッセージ・キューサービスに関するコードの実現が完了しました.参照項目を自動的に配置するには、META-INF/spring.factoresファイルを定義し、自動配置クラスをorg.sprigframe ork.boot.atoconfigre.EnbaleAutoConfigrationに割り当てます.
7使用
7.1 pom.xml導入依存、現在のバージョン:1.0.0-NAPSHOT
公式に提供されたスターは、名前はspring-boot-starter-xxに従い、カスタムstarterで、命名はxx-spring-boot-starterに従う.
本文はアリ雲メッセージキューRocketMQサービスに基づいています.https://help.aliyun.com/document_detail/43349.html?spm=a2c4g.11186623.3.2.Ui5KeU)タイミングメッセージと遅延メッセージ(注文書がどのぐらい支払われていないか、自動的にクローズされるかなど)の送受信機能の急速な開発を実現するために、カスタムstarterが実現される.ソースの住所: mq-spring-boot-starter
1 mq-spring-boot-starter mavenプロジェクトを作成します.pom.xmlに依存を導入する:
<dependencies>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-autoconfigureartifactId>
dependency>
<dependency>
<groupId>com.aliyun.openservicesgroupId>
<artifactId>ons-clientartifactId>
<version>1.7.0.Finalversion>
dependency>
dependencies>
2属性設定クラスを定義します.属性設定ファイルの「aliyun.mq」で先頭のプロパティを読みだします.
@ConfigurationProperties(prefix = "aliyun.mq")
public class MqPropertiesConfig {
private String onsAddr;
private String topic;
private String accessKey;
private String secretKey;
private Properties producer;
private Properties consumer;
private String tagSuffix;
setter/getter;
}
3自動配置クラスを定義します.そのうち @Coditional OnProperty(prefix=「aliyun.mq.com nsumer」、value=「enabled」、havingValue=「true」)は、設定されている属性の中に、属性aliyun.mq.com.enabledが存在し、trueの値があるときに、このBeanが実用化されることを示しています.いくつかのアプリケーションは、生産者または消費者のみが必要であるため、この属性によって、対応するBeaビンが実装されているかどうかを制御することができる.
@Configuration
@EnableConfigurationProperties(MqPropertiesConfig.class)
public class MqAutoConfig {
@Autowired
private MqPropertiesConfig propConfig;
@Bean(initMethod="start", destroyMethod = "shutdown")
@ConditionalOnMissingBean
@ConditionalOnProperty(prefix = "aliyun.mq.producer",value = "enabled",havingValue = "true")
public MqTimerProducer mqTimerProducer(){
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.ProducerId, propConfig.getProducer().getProperty("producerId"));
properties.setProperty(PropertyKeyConst.AccessKey, propConfig.getAccessKey());
properties.setProperty(PropertyKeyConst.SecretKey, propConfig.getSecretKey());
properties.setProperty(PropertyKeyConst.ONSAddr, propConfig.getOnsAddr());
properties.setProperty("topic", propConfig.getTopic());
return new MqTimerProducer(properties);
}
@Bean(initMethod="start", destroyMethod = "shutdown")
@ConditionalOnMissingBean
@ConditionalOnProperty(prefix = "aliyun.mq.consumer",value = "enabled",havingValue = "true")
public MqConsumer mqConsumer(){
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.ConsumerId, propConfig.getConsumer().getProperty("consumerId"));
properties.setProperty(PropertyKeyConst.AccessKey, propConfig.getAccessKey());
properties.setProperty(PropertyKeyConst.SecretKey, propConfig.getSecretKey());
properties.setProperty(PropertyKeyConst.ONSAddr, propConfig.getOnsAddr());
properties.setProperty("topic", propConfig.getTopic());
return new MqConsumer(properties);
}
}
4.生産者を定義する.send方法は、同期された方法でメッセージコンテンツbodyを一定の遅延後、指定メッセージキューtopicに送信し、タグがタグである(消費者は、同じtopicのメッセージをフィルタすることができる).sendAsyncは、非同期的にメッセージを送信し、メッセージの送信が完了したら、指定されたコールバックSendCallbackによってカスタム処理を行う.ここではデフォルトのコールバックはログでのみ記録されます.
public class MqTimerProducer {
private final static Logger LOG = LoggerFactory.getLogger(MqTimerProducer.class);
private Properties properties;
private Producer producer;
private String topic;
public MqTimerProducer(Properties properties) {
if (properties == null || properties.get(PropertyKeyConst.ProducerId) == null
|| properties.get(PropertyKeyConst.AccessKey) == null
|| properties.get(PropertyKeyConst.SecretKey) == null
|| properties.get(PropertyKeyConst.ONSAddr) == null
|| properties.get("topic") == null) {
throw new ONSClientException("producer properties not set properly.");
}
this.properties = properties;
this.topic = properties.getProperty("topic");
}
public void start() {
this.producer = ONSFactory.createProducer(this.properties);
this.producer.start();
}
public void shutdown() {
if (this.producer != null) {
this.producer.shutdown();
}
}
public void send(String tag, String body, long delay) {
LOG.info("start to send message. [topic: {}, tag: {}, body: {}, delay: {}]", topic, tag, body, delay);
if (topic == null || tag == null || body == null) {
throw new RuntimeException("topic, tag, or body is null.");
}
Message message = new Message(topic, tag, body.getBytes());
message.setStartDeliverTime(System.currentTimeMillis() + delay);
SendResult result = this.producer.send(message);
LOG.info("send message success. ", result.toString());
}
public void sendAsync(String tag, String body, long delay) {
this.sendAsync(tag, body, delay, new DefaultSendCallback());
}
public void sendAsync(String tag, String body, long delay, SendCallback sendCallback) {
LOG.info("start to send message async. [topic: {}, tag: {}, body: {}, delay: {}]", topic, tag, body, delay);
if (topic == null || tag == null || body == null) {
throw new RuntimeException("topic, tag, or body is null.");
}
Message message = new Message(topic, tag, body.getBytes());
message.setStartDeliverTime(System.currentTimeMillis() + delay);
this.producer.sendAsync(message, sendCallback);
}
setter/getter;
}
5消費者を定義する.subscribe方法は、topicを指定するいくつかのタグtagsをメッセージ購読し、このtopicにこれらのtagsを含むメッセージが到着すると、message Listenerによって処理される.ここでは、抽象的なクラスのAbstractMessage Listenerを定義し、テンプレート方法によってメッセージの処理ロジックを統一し(正常消費、comit;異常が発生し、再消費)、消費者はAbstractMessage Listenerを継承するだけで、handle方法でメッセージ消費を完了することができます.
public class MqConsumer {
private final static Logger LOG = LoggerFactory.getLogger(MqConsumer.class);
private Properties properties;
private Consumer consumer;
private String topic;
public MqConsumer(Properties properties) {
if (properties == null || properties.get(PropertyKeyConst.ConsumerId) == null
|| properties.get(PropertyKeyConst.AccessKey) == null
|| properties.get(PropertyKeyConst.SecretKey) == null
|| properties.get(PropertyKeyConst.ONSAddr) == null
|| properties.get("topic") == null) {
throw new ONSClientException("consumer properties not set properly.");
}
this.properties = properties;
this.topic = properties.getProperty("topic");
}
public void start() {
this.consumer = ONSFactory.createConsumer(properties);
this.consumer.start();
}
public void shutdown() {
if (this.consumer != null) {
this.consumer.shutdown();
}
}
/**
* @param tags tag '||' , *
* @param messageListener
*/
public void subscribe(String tags, AbstractMessageListener messageListener) {
LOG.info("subscribe [topic: {}, tags: {}, messageListener: {}]", topic, tags, messageListener.getClass().getCanonicalName());
consumer.subscribe(topic, tags, messageListener);
}
}
public abstract class AbstractMessageListener implements MessageListener {
private final static Logger LOG = LoggerFactory.getLogger(AbstractMessageListener.class);
public abstract void handle(String body);
@Override
public Action consume(Message message, ConsumeContext context) {
LOG.info("receive message. [topic: {}, tag: {}, body: {}, msgId: {}, startDeliverTime: {}]", message.getTopic(), message.getTag(), new String(message.getBody()), message.getMsgID(), message.getStartDeliverTime());
try {
handle(new String(message.getBody()));
LOG.info("handle message success.");
return Action.CommitMessage;
} catch (Exception e) {
//
LOG.warn("handle message fail, requeue it.", e);
return Action.ReconsumeLater;
}
}
}
6前にすべてのメッセージ・キューサービスに関するコードの実現が完了しました.参照項目を自動的に配置するには、META-INF/spring.factoresファイルを定義し、自動配置クラスをorg.sprigframe ork.boot.atoconfigre.EnbaleAutoConfigrationに割り当てます.
org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.ieyecloud.springboot.mq.config.MqAutoConfig
7使用
7.1 pom.xml導入依存、現在のバージョン:1.0.0-NAPSHOT
com.ieyecloud
mq-spring-boot-starter
1.0-SNAPSHOT
7.2 apprationプロファイルに対応する構成を追加するaliyun:
mq:
onsAddr: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
topic: mulin_topic_test
accessKey: xxx
secretKey: xxx
producer:
enabled: true # false producer, true producerId
producerId: xxx
consumer:
enabled: true # false consumer, true consumerId
consumerId: xxx
7.3 producerを使用して、consumerは必要に応じて対応するインスタンスを注入するだけである.@Autowired
private MqTimerProducer producer;
@Autowired
private MqConsumer consumer;
7.4 consumer傍受処理類が実現し、AbstractMessage Listener類を継承し、handle方法を実現すればいいです. @Component
public class QuestionStatusMessageListener extends AbstractMessageListener{
@Autowired
private QuickQuestionService questionService;
@Override
public void handle(String s) {
QuestionStatusMessage message = JsonUtil.fromJson(s, QuestionStatusMessage.class);
questionService.updateStatus(message.getQid(), message.getCs(), message.getTs());
}
}