ラビットMQ学習ノート3-topicスイッチを使用する
23122 ワード
topicのルートルールでは「.」を使って単語を分離し、「*」を使って単語を1つに合わせ、複数の単語を「〹」で合わせます.
以下の例では、
logger.*はlogger.errorとlogger.warningにマッチしますが、logger*.errorはlogger.errorにしかマッチしません.
loginger.errorとlogger.warningに整合することができます.
以下の例では、topicを使用して警告、エラーのログを受信し、マッチングしたルーティングルールに従って異なるQueキューに送信して処理する例を示している.
日誌プロデューサーSenderWithTopicExchange
以下の例では、
logger.*はlogger.errorとlogger.warningにマッチしますが、logger*.errorはlogger.errorにしかマッチしません.
loginger.errorとlogger.warningに整合することができます.
以下の例では、topicを使用して警告、エラーのログを受信し、マッチングしたルーティングルールに従って異なるQueキューに送信して処理する例を示している.
日誌プロデューサーSenderWithTopicExchange
1 package com.yzl.test2;
2
3 import java.util.concurrent.CountDownLatch;
4 import java.util.concurrent.ExecutorService;
5 import java.util.concurrent.Executors;
6
7 import com.rabbitmq.client.Channel;
8 import com.rabbitmq.client.Connection;
9 import com.rabbitmq.client.ConnectionFactory;
10
11 /**
12 * topic
13 * 2
14 * @author: yzl
15 * @date: 2016-10-22
16 */
17 public class SenderWithTopicExchange {
18 //
19 private static final String EXCHANGE_NAME = "myTopicExchange";
20 //
21 private static final String BASE_ROUTING_KEY = "logger.";
22
23 public static void main(String[] args) throws Exception {
24 // CountDownLatch 2
25 final CountDownLatch cdl = new CountDownLatch(2);
26 // rabbitmq
27 ConnectionFactory factory = new ConnectionFactory();
28 factory.setHost("localhost");
29 Connection connection = factory.newConnection();
30 //
31 final Channel channel = connection.createChannel();
32 // topicExchange topic exchange
33 channel.exchangeDeclare(EXCHANGE_NAME, "topic");
34
35 ExecutorService pool = Executors.newFixedThreadPool(2);
36 pool.submit(new Runnable() {
37 @Override
38 public void run() {
39 try {
40 cdl.await();
41 // , :logger.warning
42 String warningMsg = "warning message is :";
43 for(int i=1; i<800; i++){
44 System.out.println(" :" + warningMsg+i);
45 channel.basicPublish(EXCHANGE_NAME, BASE_ROUTING_KEY + "warning", null, (warningMsg+i).getBytes());
46 Thread.sleep(2000L);
47 }
48 } catch (Exception e) {
49 e.printStackTrace();
50 }
51 }
52 });
53 pool.submit(new Runnable() {
54 @Override
55 public void run() {
56 try {
57 cdl.await();
58 // , :logger.error
59 String errorMsg = "error message is :";
60 for(int i=1; i<1000; i++){
61 System.out.println(" :" + errorMsg+i);
62 channel.basicPublish(EXCHANGE_NAME, BASE_ROUTING_KEY + "error", null, (errorMsg+i).getBytes());
63 Thread.sleep(2000L);
64 }
65 } catch (Exception e) {
66 e.printStackTrace();
67 }
68 }
69 });
70
71 cdl.countDown();
72 cdl.countDown();
73 }
74 }
ニュース消費者Receiver WithTopicExchange 1 package com.yzl.test2;
2
3 import java.io.IOException;
4
5 import com.rabbitmq.client.AMQP.BasicProperties;
6 import com.rabbitmq.client.Channel;
7 import com.rabbitmq.client.Connection;
8 import com.rabbitmq.client.ConnectionFactory;
9 import com.rabbitmq.client.DefaultConsumer;
10 import com.rabbitmq.client.Envelope;
11
12 /**
13 * topic
14 *
15 * @author: yzl
16 * @date: 2016-10-22
17 */
18 public class ReceiverWithTopicExchange {
19 //
20 private static final String EXCHANGE_NAME = "myTopicExchange";
21
22 public static void main(String[] args) throws Exception {
23 // rabbitmq
24 ConnectionFactory factory = new ConnectionFactory();
25 factory.setHost("localhost");
26 Connection connection = factory.newConnection();
27 //
28 final Channel channel = connection.createChannel();
29 // topicExchange topic exchange
30 channel.exchangeDeclare(EXCHANGE_NAME, "topic");
31
32 //
33 channel.queueDeclare("warningQueue", false, false, false, null);
34 //
35 channel.queueDeclare("errorQueue", false, false, false, null);
36 //
37 channel.queueDeclare("allLoggerQueue", false, false, false, null);
38
39 // logger.warning myTopicExchange warningQueue
40 channel.queueBind("warningQueue", EXCHANGE_NAME, "logger.warning");
41 // logger.error myTopicExchange errorQueue
42 channel.queueBind("errorQueue", EXCHANGE_NAME, "logger.error");
43 // logger.* myTopicExchange allLoggerQueue
44 channel.queueBind("allLoggerQueue", EXCHANGE_NAME, "logger.*");
45
46 // , ack
47 channel.basicConsume("warningQueue", false, new DefaultConsumer(channel){
48 @Override
49 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
50 throws IOException {
51 String msg = new String(body);
52 System.out.println("warningQueue accept a warning msg :" + msg);
53 channel.basicAck(envelope.getDeliveryTag(), false);
54 }
55 });
56 // , ack
57 channel.basicConsume("errorQueue", false, new DefaultConsumer(channel){
58 @Override
59 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
60 throws IOException {
61 String msg = new String(body);
62 System.out.println("errorQueue accept a error msg :" + msg);
63 channel.basicAck(envelope.getDeliveryTag(), false);
64 }
65 });
66 // , ack
67 channel.basicConsume("allLoggerQueue", false, new DefaultConsumer(channel){
68 @Override
69 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
70 throws IOException {
71 String msg = new String(body);
72 System.out.println("allLoggerQueue accept a logger msg :" + msg);
73 channel.basicAck(envelope.getDeliveryTag(), false);
74 }
75 });
76 }
77 }
結果出力: :warning message is :1
:error message is :1
:warning message is :2
:error message is :2
:error message is :3
:warning message is :3
allLoggerQueue accept a logger msg :error message is :1 allLoggerQueue accept a logger msg :warning message is :1
errorQueue accept a error msg :error message is :1
warningQueue accept a warning msg :warning message is :1
warningQueue accept a warning msg :warning message is :2
errorQueue accept a error msg :error message is :2 allLoggerQueue accept a logger msg :warning message is :2 allLoggerQueue accept a logger msg :error message is :2 allLoggerQueue accept a logger msg :warning message is :3
errorQueue accept a error msg :error message is :3
warningQueue accept a warning msg :warning message is :3 allLoggerQueue accept a logger msg :error message is :3
メッセージ処理の流れ: