rocketmq-spring-boot-starterの消費者メッセージ確認


最近rocketmqメッセージミドルウェアを作りましたが、Apacheが提供するspringbootパッケージのrocketmq-spring-boot-starterを使用しました.バージョンは2.1です.0の場合、生産者の使用方法は、他のメッセージ・ミドルウェアと同様に次のようになります.
@Resource
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping(value = "/mq/{name}")
    public String qmtest(@PathVariable("name")String name) throws Exception {
        //        Broker
        SendResult sendResult = rocketMQTemplate.syncSend("TopicTest", name);
        //   sendResult          
        System.out.printf("%s%n", sendResult);
        return null;
    }

消費者は主にorg.に多くのリスニングクラスを提供している.apache.rocketmq.spring.coreという経路の下で、ここでは一つ一つ説明しません.私が言いたい問題は、これらの傍受インタフェースの中でメッセージ消費が成功したかどうかを確認するメカニズムを提供していないことです.これは少し気が狂います.もしメッセージが正しく消費できなければ、私はどのように再送信したいですか.これは頼りになりません.次の消費実現インタフェースの方法を見てみましょう.
@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "${carInInfo.topic}",
        topic = "${carInInfo.topic}", selectorExpression = "*",
        consumeMode = ConsumeMode.ORDERLY)
public class CarInParkSynThirdMQ implements RocketMQListener {
   
    /**
     *          ,            
     *
     * @param addCarInParkDTO
     */
    @Override
    public void onMessage(AddCarInParkDTO addCarInParkDTO) {
        System.out.println("    :" + JSON.toJSONString(addCarInParkDTO));
    }
}

実装方式は簡単でしょうが、コードには消費が成功したかどうかを確認するメッセージがありません.実装のonMessage()方法はvoidなので、元のrocketmqの消費者実装方式、つまりrocketmq-clientを見たことがあります.JArの実装はMessageListenerですJAvaクラスはメッセージ傍受受信を実現するために、2つの継承インタフェースクラスMessageListenerConcurrentlyがある.JAvaとMessageListenerOrderly.JAva、これで探しやすいので、この2つのインタフェースの実装クラスを直接受け取って、おとなしくて、やはりrocket-spring-bootのjarの中で見つけて、DefaultRocketMQListenerContainerです.JAvaというクラスは、次のいずれかの実装を見てみましょう.
public class DefaultMessageListenerOrderly implements MessageListenerOrderly {

        @SuppressWarnings("unchecked")
        @Override
        public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
            for (MessageExt messageExt : msgs) {
                log.debug("received msg: {}", messageExt);
                try {
                    long now = System.currentTimeMillis();
                    handleMessage(messageExt);
                    long costTime = System.currentTimeMillis() - now;
                    log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
                } catch (Exception e) {
                    log.warn("consume message failed. messageExt:{}", messageExt, e);
                    context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            }

            return ConsumeOrderlyStatus.SUCCESS;
        }
    }

見たか、異常がなければ消費に成功し、異常があれば再送するのですが、これはどこで呼び出されたのでしょうか.このprivateの方法を見ればわかります
private void initRocketMQPushConsumer() throws MQClientException {
       ......

        switch (consumeMode) {
            case ORDERLY:
                consumer.setMessageListener(new DefaultMessageListenerOrderly());
                break;
            case CONCURRENTLY:
                consumer.setMessageListener(new DefaultMessageListenerConcurrently());
                break;
            default:
                throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
        }

        if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
            ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
        } else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) {
            ((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);
        }

    }

同じようにJAvaこのクラスではspringのInitializingBeanを実現するため呼び出し設定が行われています.JAvaインタフェースは、afterPropertiesSet()メソッドで初期化されているので、安心して消費することができます.もちろん、メッセージの消費回数を記録することもできます.異常回数が来たら、専門のテーブルに記録することができます.同時に、異常をキャプチャして、メッセージを再送信しないようにすることができます.