RocketMQ簡易パッケージ

4184 ワード

common-rocketmq紹介
このプロジェクトはrocketmqに対して実際のプロジェクト開発の中で簡単なパッケージを作って、使用は簡単で、common-rocketmqはJDK 8に基づくので、githubアドレス:https://github.com/xinyi-lt/common-rocketmq次はcommon-rocketmq構造です.
クラス5個とXMLプロファイル1個を含む
rocketmq---------common-------MessageData(    DTO)
            |
            |---producer----------RocketMQProducer(     )
            |               |
            |               |---RocketMQProducerImpl(       )
            |---consumer------RocketMQConsumer(   )
                            |
                            |---ConsumerService(              )
common-rocketmq.xml

使用例
demoのgithubアドレス:https://github.com/xinyi-lt/common-rocketmq-demo
1.maven依存

    com.sxzq.lt
    common-rocketmq
    1.0-SNAPSHOT


2.producer生産者
  • プロファイル導入
  •     
    
  • コードにproducer
  • を注入する
            RocketMQProducer producer = applicationContext.getBean(RocketMQProducer.class);
    
  • 送信メッセージ
  •         //    DTO
            MessageData messageData = new MessageData<>();
    
            //   
            messageData.setTimestamp(System.currentTimeMillis());
            //      UUID   
            messageData.setUuid(UUID.randomUUID().toString());
    
            UserInfo userInfo = new UserInfo();
            //useinfo  
    
            messageData.setData(userInfo);
    
            logger.info("start to send message data:{}", JSON.toJSONString(messageData));
    
            //    
            SendResult result = producer.sendMessage(
                    new Message(MQConstant.MQ_TOPIC_PRODUCER_DEMO,
                    MQConstant.MQ_TAG_PRODUCER_DEMO_FIRST,
                    UUID.randomUUID().toString(),
                    JSON.toJSONString(messageData).getBytes(Charset.forName("utf-8"))));
    
            logger.info("send message complete result:{} ", result);
    
  • 送信遅延メッセージ
          //  
            Message msg = new Message(MQConstant.MQ_TOPIC_PRODUCER_DEMO,
                    MQConstant.MQ_TAG_PRODUCER_DEMO_FIRST,
                    msgKey,
                    JSON.toJSONString(messageData).getBytes(Charset.forName("utf-8")));
    
            //        
            // messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
            msg.setDelayTimeLevel(4);
    
            //    
            SendResult result = producer.sendMessage(msg);
    
  • 3.consumer消費者
  • プロファイル導入
     
    
  • common-rocketmqにおけるConsumerServiceを実現するインタフェース
  •         public class DemoConsumerService implements ConsumerService {
               @Override
                public boolean consume(Message message) {
       
                      //       
                      return consumeResult;
                }
            }
    
  • DemoConsumerService springコンテナに追加管理
  •        
    
  • 事業例を作成する消費者
  • .