Sping Boot入門から実戦までの実戦編(一):カスタムSpring Boot Starter——アリ雲メッセージキューサービスStarterを実現します.

22708 ワード

はい、 Sping Boot入門から実戦の入門編(四):Spring Boot自動化配置 この記事では、Spring Bootの自動化構成の実現は主に以下のように行われていることを知っています.
  • @EnbaleAutoConfigration注釈
  • SpringAppleクラス
  • spring-boot-at configre jar包
  • spring.factoresファイル
  • 公式に提供されたstarterの多くは二つのjarパッケージを含んでいます.一つのstarterは実現されていません.ただ管理に依存しています.一つのautconfigreパッケージは自動配置類とMETA-INF/spring.factoresファイルを含んでいます.starterをカスタマイズするときは、一つにまとめることができます.
    公式に提供されたスターは、名前はspring-boot-starter-xxに従い、カスタムstarterで、命名はxx-spring-boot-starterに従う.
    本文はアリ雲メッセージキューRocketMQサービスに基づいています.https://help.aliyun.com/document_detail/43349.html?spm=a2c4g.11186623.3.2.Ui5KeU)タイミングメッセージと遅延メッセージ(注文書がどのぐらい支払われていないか、自動的にクローズされるかなど)の送受信機能の急速な開発を実現するために、カスタムstarterが実現される.ソースの住所: mq-spring-boot-starter
      
    1 mq-spring-boot-starter mavenプロジェクトを作成します.pom.xmlに依存を導入する:
    <dependencies>
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-autoconfigureartifactId>
            dependency>
            <dependency>
                <groupId>com.aliyun.openservicesgroupId>
                <artifactId>ons-clientartifactId>
                <version>1.7.0.Finalversion>
            dependency>
        dependencies>
     
    2属性設定クラスを定義します.属性設定ファイルの「aliyun.mq」で先頭のプロパティを読みだします.
    @ConfigurationProperties(prefix = "aliyun.mq")
    public class MqPropertiesConfig {
        private String onsAddr;
        private String topic;
        private String accessKey;
        private String secretKey;
        private Properties producer;
        private Properties consumer;
        private String tagSuffix;
    
        setter/getter;
    }
     
    3自動配置クラスを定義します.そのうち @Coditional OnProperty(prefix=「aliyun.mq.com nsumer」、value=「enabled」、havingValue=「true」)は、設定されている属性の中に、属性aliyun.mq.com.enabledが存在し、trueの値があるときに、このBeanが実用化されることを示しています.いくつかのアプリケーションは、生産者または消費者のみが必要であるため、この属性によって、対応するBeaビンが実装されているかどうかを制御することができる.
    @Configuration
    @EnableConfigurationProperties(MqPropertiesConfig.class)
    public class MqAutoConfig {
    
        @Autowired
        private MqPropertiesConfig propConfig;
    
        @Bean(initMethod="start", destroyMethod = "shutdown")
        @ConditionalOnMissingBean
        @ConditionalOnProperty(prefix = "aliyun.mq.producer",value = "enabled",havingValue = "true")
        public MqTimerProducer mqTimerProducer(){
            Properties properties = new Properties();
            properties.setProperty(PropertyKeyConst.ProducerId, propConfig.getProducer().getProperty("producerId"));
            properties.setProperty(PropertyKeyConst.AccessKey, propConfig.getAccessKey());
            properties.setProperty(PropertyKeyConst.SecretKey, propConfig.getSecretKey());
            properties.setProperty(PropertyKeyConst.ONSAddr, propConfig.getOnsAddr());
            properties.setProperty("topic", propConfig.getTopic());
            return  new MqTimerProducer(properties);
        }
    
        @Bean(initMethod="start", destroyMethod = "shutdown")
        @ConditionalOnMissingBean
        @ConditionalOnProperty(prefix = "aliyun.mq.consumer",value = "enabled",havingValue = "true")
        public MqConsumer mqConsumer(){
            Properties properties = new Properties();
            properties.setProperty(PropertyKeyConst.ConsumerId, propConfig.getConsumer().getProperty("consumerId"));
            properties.setProperty(PropertyKeyConst.AccessKey, propConfig.getAccessKey());
            properties.setProperty(PropertyKeyConst.SecretKey, propConfig.getSecretKey());
            properties.setProperty(PropertyKeyConst.ONSAddr, propConfig.getOnsAddr());
            properties.setProperty("topic", propConfig.getTopic());
            return  new MqConsumer(properties);
        }
    }
      
    4.生産者を定義する.send方法は、同期された方法でメッセージコンテンツbodyを一定の遅延後、指定メッセージキューtopicに送信し、タグがタグである(消費者は、同じtopicのメッセージをフィルタすることができる).sendAsyncは、非同期的にメッセージを送信し、メッセージの送信が完了したら、指定されたコールバックSendCallbackによってカスタム処理を行う.ここではデフォルトのコールバックはログでのみ記録されます.
    public class MqTimerProducer {
        private final static Logger LOG = LoggerFactory.getLogger(MqTimerProducer.class);
        private Properties properties;
        private Producer producer;
        private String topic;
    
        public MqTimerProducer(Properties properties) {
            if (properties == null || properties.get(PropertyKeyConst.ProducerId) == null
                    || properties.get(PropertyKeyConst.AccessKey) == null
                    || properties.get(PropertyKeyConst.SecretKey) == null
                    || properties.get(PropertyKeyConst.ONSAddr) == null
                    || properties.get("topic") == null) {
                throw new ONSClientException("producer properties not set properly.");
            }
            this.properties = properties;
            this.topic = properties.getProperty("topic");
        }
    
        public void start() {
            this.producer = ONSFactory.createProducer(this.properties);
            this.producer.start();
        }
    
        public void shutdown() {
            if (this.producer != null) {
                this.producer.shutdown();
            }
        }
    
        public void send(String tag, String body, long delay) {
            LOG.info("start to send message. [topic: {}, tag: {}, body: {}, delay: {}]", topic, tag, body, delay);
            if (topic == null || tag == null || body == null) {
                throw new RuntimeException("topic, tag, or body is null.");
            }
            Message message = new Message(topic, tag, body.getBytes());
            message.setStartDeliverTime(System.currentTimeMillis() + delay);
            SendResult result = this.producer.send(message);
            LOG.info("send message success. ", result.toString());
        }
    
    
        public void sendAsync(String tag, String body, long delay) {
            this.sendAsync(tag, body, delay, new DefaultSendCallback());
        }
    
        public void sendAsync(String tag, String body, long delay, SendCallback sendCallback) {
            LOG.info("start to send message async. [topic: {}, tag: {}, body: {}, delay: {}]", topic, tag, body, delay);
            if (topic == null || tag == null || body == null) {
                throw new RuntimeException("topic, tag, or body is null.");
            }
            Message message = new Message(topic, tag, body.getBytes());
            message.setStartDeliverTime(System.currentTimeMillis() + delay);
            this.producer.sendAsync(message, sendCallback);
        }
    
        setter/getter;
    }
     
    5消費者を定義する.subscribe方法は、topicを指定するいくつかのタグtagsをメッセージ購読し、このtopicにこれらのtagsを含むメッセージが到着すると、message Listenerによって処理される.ここでは、抽象的なクラスのAbstractMessage Listenerを定義し、テンプレート方法によってメッセージの処理ロジックを統一し(正常消費、comit;異常が発生し、再消費)、消費者はAbstractMessage Listenerを継承するだけで、handle方法でメッセージ消費を完了することができます.
    public class MqConsumer {
    
        private final static Logger LOG = LoggerFactory.getLogger(MqConsumer.class);
        private Properties properties;
        private Consumer consumer;
        private String topic;
    
        public MqConsumer(Properties properties) {
            if (properties == null || properties.get(PropertyKeyConst.ConsumerId) == null
                    || properties.get(PropertyKeyConst.AccessKey) == null
                    || properties.get(PropertyKeyConst.SecretKey) == null
                    || properties.get(PropertyKeyConst.ONSAddr) == null
                    || properties.get("topic") == null) {
                throw new ONSClientException("consumer properties not set properly.");
            }
            this.properties = properties;
            this.topic = properties.getProperty("topic");
        }
    
        public void start() {
            this.consumer = ONSFactory.createConsumer(properties);
            this.consumer.start();
        }
    
        public void shutdown() {
            if (this.consumer != null) {
                this.consumer.shutdown();
            }
        }
    
        /**
         * @param tags              tag '||'  ,   *
         * @param messageListener
         */
        public void subscribe(String tags, AbstractMessageListener messageListener) {
            LOG.info("subscribe [topic: {}, tags: {}, messageListener: {}]", topic, tags, messageListener.getClass().getCanonicalName());
            consumer.subscribe(topic, tags, messageListener);
        }
    }
    public abstract class AbstractMessageListener implements MessageListener {
    
        private final static Logger LOG = LoggerFactory.getLogger(AbstractMessageListener.class);
    
        public abstract void handle(String body);
    
        @Override
        public Action consume(Message message, ConsumeContext context) {
            LOG.info("receive message. [topic: {}, tag: {}, body: {}, msgId: {}, startDeliverTime: {}]", message.getTopic(), message.getTag(), new String(message.getBody()), message.getMsgID(), message.getStartDeliverTime());
            try {
                handle(new String(message.getBody()));
                LOG.info("handle message success.");
                return Action.CommitMessage;
            } catch (Exception e) {
                //    
                LOG.warn("handle message fail, requeue it.", e);
                return Action.ReconsumeLater;
            }
        }
    }
     
    6前にすべてのメッセージ・キューサービスに関するコードの実現が完了しました.参照項目を自動的に配置するには、META-INF/spring.factoresファイルを定義し、自動配置クラスをorg.sprigframe ork.boot.atoconfigre.EnbaleAutoConfigrationに割り当てます.
    org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.ieyecloud.springboot.mq.config.MqAutoConfig
     
    7使用
    7.1 pom.xml導入依存、現在のバージョン:1.0.0-NAPSHOT
    
         com.ieyecloud
         mq-spring-boot-starter
         1.0-SNAPSHOT
     
    7.2 apprationプロファイルに対応する構成を追加する
    aliyun:
         mq:
             onsAddr: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
             topic: mulin_topic_test
             accessKey: xxx
             secretKey: xxx
             producer:
                 enabled: true  # false     producer, true producerId    
                 producerId: xxx
             consumer:
                 enabled: true  # false     consumer, true consumerId    
             consumerId: xxx
    7.3 producerを使用して、consumerは必要に応じて対応するインスタンスを注入するだけである.
    @Autowired
     private MqTimerProducer producer;
    
     @Autowired
     private MqConsumer consumer;
    7.4 consumer傍受処理類が実現し、AbstractMessage Listener類を継承し、handle方法を実現すればいいです.
     @Component
     public class QuestionStatusMessageListener extends AbstractMessageListener{
    
         @Autowired
         private QuickQuestionService questionService;
    
         @Override
         public void handle(String s) {
             QuestionStatusMessage message = JsonUtil.fromJson(s, QuestionStatusMessage.class);
             questionService.updateStatus(message.getQid(), message.getCs(), message.getTs());
         }
     }