RocketMQダイナミックリスニングTopic

28861 ワード

紹介する
RocketMQはアリ開園の高性能でスループットの高い消費ミドルウェアとして、アリ双十一の大部分の業務を担っており、戦場を経たエリートで信頼できるパートナーと言える.同時にその開発言語はJavaであり、自然に多くのインターネットアーキテクチャの人々の愛顧を得て、インターネット業界の第一選択のメッセージミドルウェアとなっている.
通常、RocketMQの消費者は単一の例であり、Topicの傍受も事前に定義されており、サービスが開始されるにつれて変化しない.しかし、サービスが初期化されると、生産者と消費者がTopicTest 1をリスニングするなど、動的リスニングTopicのニーズが避けられない.このとき、サービス実行中にTopicTest 2が作成されます.動的リスニングTopicは、コードを修正する必要もなく、サービスを再起動する必要もなく、TopicTest 2のデータを消費することができます.
簡単な例
copy公式サイトの一例で、Topicダイナミックリスニングはこの例に基づいて修正されます.
1.maven依存
<dependency>
   <groupId>org.apache.rocketmqgroupId>
    <artifactId>rocketmq-clientartifactId>
    <version>4.3.0version>
dependency>

2.メッセージの同期送信
public class SyncProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new
            DefaultMQProducer("my_test_group");
        // Specify name server addresses.
        producer.setNamesrvAddr("localhost:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest1" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " +
                    i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

3.消費メッセージ
public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        // Instantiate with specified consumer group name.
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_test_group");
         
        // Specify name server addresses.
        consumer.setNamesrvAddr("localhost:9876");
        
        // Subscribe one more more topics to consume.
        consumer.subscribe("TopicTest1", "*");
        // Register callback to execute on arrival of messages fetched from brokers.
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //Launch the consumer instance.
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

動的リスニングTopic
動的リスニングTopicの主な考え方は簡単で、実際にはconsumerのtopicを再購読し、再リスニングすることです.
1.生産者
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("my_test_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        while (true){
            System.out.print("
\r topic:"
); Scanner scanner = new Scanner(System.in); int num = scanner.nextInt(); try { Message msg = new Message("TopicTest"+num /* test.Topic */, "TagA" /* Tag */, ("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } } }

2.動的リスニングツールクラスの作成
public class MyMQPushConsumerUtil {

    private static DefaultMQPushConsumer consumer;

	//    consumer
    public static DefaultMQPushConsumer getInstance(){
        if (consumer == null){
            DefaultMQPushConsumer dconsumer = new DefaultMQPushConsumer("my_test_group");
            dconsumer.setNamesrvAddr("localhost:9876");
            dconsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            dconsumer.registerMessageListener(new MessageListenerConcurrently() {

                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                ConsumeConcurrentlyContext context) {
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer = dconsumer;
        }

        return consumer;
    }


    public static void start(){
        try {
            consumer.start();

        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

	//     topic   ,        
	//   :     setConsumer  ,topic          
    public static void setConsumer(String topic) {

            try {
                consumer.subscribe(topic, "*");
                consumer.registerMessageListener(new MessageListenerConcurrently() {

                    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                    ConsumeConcurrentlyContext context) {
                        System.out.printf("\r
%s Receive New Messages: %s %n"
, Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); } catch (MQClientException e) { e.printStackTrace(); } } }

3.消費者
public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        MyMQPushConsumerUtil.getInstance();
        MyMQPushConsumerUtil.start();
        while (true){
                System.out.println("
\r topic:"
); Scanner scanner = new Scanner(System.in); String str = scanner.next(); MyMQPushConsumerUtil.setConsumer(str); } } }

まとめ
コードはとても簡単で、構想はこのような意味で、各位のお兄さんに少し役に立つことを望みます.その後、「RocketMQ消費優先度」と「javaコードでtopicを手動で作成する方法」の2つのシナリオも提供されます.公式ドキュメント:http://rocketmq.apache.org/docs/quick-start/