ラビットMQ学習ノート3-topicスイッチを使用する

23122 ワード

topicのルートルールでは「.」を使って単語を分離し、「*」を使って単語を1つに合わせ、複数の単語を「〹」で合わせます.
以下の例では、
logger.*はlogger.errorとlogger.warningにマッチしますが、logger*.errorはlogger.errorにしかマッチしません.
loginger.errorとlogger.warningに整合することができます.
以下の例では、topicを使用して警告、エラーのログを受信し、マッチングしたルーティングルールに従って異なるQueキューに送信して処理する例を示している.
日誌プロデューサーSenderWithTopicExchange
 1 package com.yzl.test2;
 2 
 3 import java.util.concurrent.CountDownLatch;
 4 import java.util.concurrent.ExecutorService;
 5 import java.util.concurrent.Executors;
 6 
 7 import com.rabbitmq.client.Channel;
 8 import com.rabbitmq.client.Connection;
 9 import com.rabbitmq.client.ConnectionFactory;
10 
11 /**
12  *   topic       
13  *        2   
14  * @author: yzl
15  * @date: 2016-10-22
16  */
17 public class SenderWithTopicExchange {
18     //     
19     private static final String EXCHANGE_NAME = "myTopicExchange";
20     //      
21     private static final String BASE_ROUTING_KEY = "logger.";
22     
23     public static void main(String[] args) throws Exception {
24         //  CountDownLatch  2       
25         final CountDownLatch cdl = new CountDownLatch(2);
26         //   rabbitmq   
27         ConnectionFactory factory = new ConnectionFactory();
28         factory.setHost("localhost");
29         Connection connection = factory.newConnection();
30         //      
31         final Channel channel = connection.createChannel();
32         //       topicExchange topic   exchange
33         channel.exchangeDeclare(EXCHANGE_NAME, "topic");
34         
35         ExecutorService pool = Executors.newFixedThreadPool(2);
36         pool.submit(new Runnable() {
37             @Override
38             public void run() {
39                 try {
40                     cdl.await();
41                     //      ,     :logger.warning
42                     String warningMsg = "warning message is :";
43                     for(int i=1; i<800; i++){
44                         System.out.println("      :" + warningMsg+i);
45                         channel.basicPublish(EXCHANGE_NAME, BASE_ROUTING_KEY + "warning", null, (warningMsg+i).getBytes());
46                         Thread.sleep(2000L);
47                     }
48                 } catch (Exception e) {
49                     e.printStackTrace();
50                 }
51             }
52         });
53         pool.submit(new Runnable() {
54             @Override
55             public void run() {
56                 try {
57                     cdl.await();
58                     //      ,     :logger.error
59                     String errorMsg = "error message is :";
60                     for(int i=1; i<1000; i++){
61                         System.out.println("      :" + errorMsg+i);
62                         channel.basicPublish(EXCHANGE_NAME, BASE_ROUTING_KEY + "error", null, (errorMsg+i).getBytes());
63                         Thread.sleep(2000L);
64                     }
65                 } catch (Exception e) {
66                     e.printStackTrace();
67                 }
68             }
69         });
70         
71         cdl.countDown();
72         cdl.countDown();
73     }
74 }
ニュース消費者Receiver WithTopicExchange
 1 package com.yzl.test2;
 2 
 3 import java.io.IOException;
 4 
 5 import com.rabbitmq.client.AMQP.BasicProperties;
 6 import com.rabbitmq.client.Channel;
 7 import com.rabbitmq.client.Connection;
 8 import com.rabbitmq.client.ConnectionFactory;
 9 import com.rabbitmq.client.DefaultConsumer;
10 import com.rabbitmq.client.Envelope;
11 
12 /**
13  *   topic       
14  * 
15  * @author: yzl
16  * @date: 2016-10-22
17  */
18 public class ReceiverWithTopicExchange {
19     //      
20     private static final String EXCHANGE_NAME = "myTopicExchange";
21 
22     public static void main(String[] args) throws Exception {
23         //    rabbitmq   
24         ConnectionFactory factory = new ConnectionFactory();
25         factory.setHost("localhost");
26         Connection connection = factory.newConnection();
27         //       
28         final Channel channel = connection.createChannel();
29         //        topicExchange topic   exchange
30         channel.exchangeDeclare(EXCHANGE_NAME, "topic");
31         
32         //           
33         channel.queueDeclare("warningQueue", false, false, false, null);
34         //           
35         channel.queueDeclare("errorQueue", false, false, false, null);
36         //               
37         channel.queueDeclare("allLoggerQueue", false, false, false, null);
38         
39         //  logger.warning     myTopicExchange warningQueue
40         channel.queueBind("warningQueue", EXCHANGE_NAME, "logger.warning");
41         //  logger.error     myTopicExchange errorQueue
42         channel.queueBind("errorQueue", EXCHANGE_NAME, "logger.error");
43         //  logger.*      myTopicExchange allLoggerQueue
44         channel.queueBind("allLoggerQueue", EXCHANGE_NAME, "logger.*");
45         
46         //       ,    ack  
47         channel.basicConsume("warningQueue", false, new DefaultConsumer(channel){
48             @Override
49             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
50                     throws IOException {
51                 String msg = new String(body);
52                 System.out.println("warningQueue accept a warning msg :" + msg);
53                 channel.basicAck(envelope.getDeliveryTag(), false);
54             }
55         });
56         //       ,    ack  
57         channel.basicConsume("errorQueue", false, new DefaultConsumer(channel){
58             @Override
59             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
60                     throws IOException {
61                 String msg = new String(body);
62                 System.out.println("errorQueue accept a error msg :" + msg);
63                 channel.basicAck(envelope.getDeliveryTag(), false);
64             }
65         });
66         //         ,    ack  
67         channel.basicConsume("allLoggerQueue", false, new DefaultConsumer(channel){
68             @Override
69             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
70                     throws IOException {
71                 String msg = new String(body);
72                 System.out.println("allLoggerQueue accept a logger msg :" + msg);
73                 channel.basicAck(envelope.getDeliveryTag(), false);
74             }
75         });
76     }
77 }
結果出力:
      :warning message is :1
      :error message is :1
      :warning message is :2
      :error message is :2
      :error message is :3
      :warning message is :3
allLoggerQueue accept a logger msg :error message is :1 allLoggerQueue accept a logger msg :warning message is :1
errorQueue accept a error msg :error message is :1
warningQueue accept a warning msg :warning message is :1
warningQueue accept a warning msg :warning message is :2
errorQueue accept a error msg :error message is :2 allLoggerQueue accept a logger msg :warning message is :2 allLoggerQueue accept a logger msg :error message is :2 allLoggerQueue accept a logger msg :warning message is :3
errorQueue accept a error msg :error message is :3
warningQueue accept a warning msg :warning message is :3 allLoggerQueue accept a logger msg :error message is :3
メッセージ処理の流れ: