Spring Boot入門のニュース中間件の使用


はじめに
メッセージ中間には2つの重要な概念がある。メッセージエージェントと宛先。メッセージ送信者がメッセージを送信すると、メッセージはメッセージエージェントによって引き継がれ、メッセージエージェント保証メッセージは指定された宛先に伝達される。
私たちがよく使うメッセージエージェントはJMSとAMQP仕様があります。対応して、それらの一般的な実装は、それぞれActiveMQおよびRabbiitMQである。
二、ActiveMQの整合
2.1依存関係を追加

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--          ,       -->
<dependency> 
  <groupId>org.apache.activemq</groupId> 
  <artifactId>activemq-pool</artifactId> 
</dependency>
2.2プロファイルを追加

# activemq   
spring.activemq.broker-url=tcp://192.168.2.12:61616
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.pool.enabled=false
spring.activemq.pool.max-connections=50
#     /     ,          true
spring.jms.pub-sub-domain=false
ここのspring.activemq.pool.enabled=falseは接続池を閉じるという意味です。
2.3エンコーディング
クラスを設定:

@Configuration
public class JmsConfirguration {
  public static final String QUEUE_NAME = "activemq_queue";
  
  public static final String TOPIC_NAME = "activemq_topic";
  
  @Bean
  public Queue queue() {
    return new ActiveMQQueue(QUEUE_NAME);
  }
  
  @Bean
  public Topic topic() {
    return new ActiveMQTopic(TOPIC_NAME);
  }
}
キューとテーマの作成を担当します。
メッセージ生産者:

@Component
public class JmsSender {
  @Autowired
  private Queue queue;
  
  @Autowired
  private Topic topic;
  
  @Autowired
  private JmsMessagingTemplate jmsTemplate;
  
  public void sendByQueue(String message) {
    this.jmsTemplate.convertAndSend(queue, message);
  }
  
  public void sendByTopic(String message) {
    this.jmsTemplate.convertAndSend(topic, message);
  }
}
メッセージ消費者:

@Component
public class JmsReceiver {
  
  @JmsListener(destination = JmsConfirguration.QUEUE_NAME)
  public void receiveByQueue(String message) {
    System.out.println("      :" + message);
  }
  
  @JmsListener(destination = JmsConfirguration.TOPIC_NAME)
  public void receiveByTopic(String message) {
    System.out.println("      :" + message);
  }
}
ニュース消費者は@Jms Listenerを使って、ニュースを監督する。
2.4試験

@RunWith(SpringRunner.class)
@SpringBootTest
public class JmsTest {
  @Autowired
  private JmsSender sender;
  @Test
  public void testSendByQueue() {
    for (int i = 1; i < 6; i++) {
      this.sender.sendByQueue("hello activemq queue " + i);
    }
  }
  
  @Test
  public void testSendByTopic() {
    for (int i = 1; i < 6; i++) {
      this.sender.sendByTopic("hello activemq topic " + i);
    }
  }
}
印刷結果:
キューメッセージを受信:hello activemq queue 1
キューメッセージを受信:hello activemq queue 2
キューメッセージ受信:ハローactivemq queue 3
キューメッセージを受信:hello activemq queue 4
キューメッセージの受信:hello activemq queue 5
リリース/購読モードをテストする時、spring.jms.pb-sub-doman=trueを設定します。
主題メッセージを受信:ハローactivemq topic 1
主題メッセージを受信する:hello activemq topic 2
主題メッセージを受信:ハローactivemq topic 3
主題メッセージを受信する:hello activemq topic 4
主題メッセージを受信する:hello activemq topic 5
三、ラビットMQを整合する。
3.1依存を追加

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.2プロファイルを追加

spring.rabbitmq.host=192.168.2.30
spring.rabbitmq.port=5672
spring.rabbitmq.username=light
spring.rabbitmq.password=light
spring.rabbitmq.virtual-host=/test
3.3符号化
クラスを設定:

@Configuration
public class AmqpConfirguration {
  //=============  、      ===============
  
  public static final String SIMPLE_QUEUE = "simple_queue";
  @Bean
  public Queue queue() {
    return new Queue(SIMPLE_QUEUE, true);
  }
  
  //===============  /    ============
  
  public static final String PS_QUEUE_1 = "ps_queue_1";
  public static final String PS_QUEUE_2 = "ps_queue_2";
  public static final String FANOUT_EXCHANGE = "fanout_exchange";
  
  @Bean
  public Queue psQueue1() {
    return new Queue(PS_QUEUE_1, true);
  }
  
  @Bean
  public Queue psQueue2() {
    return new Queue(PS_QUEUE_2, true);
  }
  
  @Bean
  public FanoutExchange fanoutExchange() {
    return new FanoutExchange(FANOUT_EXCHANGE);
  }
  
  @Bean
  public Binding fanoutBinding1() {
    return BindingBuilder.bind(psQueue1()).to(fanoutExchange());
  }
  
  @Bean
  public Binding fanoutBinding2() {
    return BindingBuilder.bind(psQueue2()).to(fanoutExchange());
  }
  //===============    ============
  
  public static final String ROUTING_QUEUE_1 = "routing_queue_1";
  public static final String ROUTING_QUEUE_2 = "routing_queue_2";
  public static final String DIRECT_EXCHANGE = "direct_exchange";
  
  @Bean
  public Queue routingQueue1() {
    return new Queue(ROUTING_QUEUE_1, true);
  }
  
  @Bean
  public Queue routingQueue2() {
    return new Queue(ROUTING_QUEUE_2, true);
  }
  
  @Bean
  public DirectExchange directExchange() {
    return new DirectExchange(DIRECT_EXCHANGE);
  }
  
  @Bean
  public Binding directBinding1() {
    return BindingBuilder.bind(routingQueue1()).to(directExchange()).with("user");
  }
  
  @Bean
  public Binding directBinding2() {
    return BindingBuilder.bind(routingQueue2()).to(directExchange()).with("order");
  }
  
  //===============    ============
  
  public static final String TOPIC_QUEUE_1 = "topic_queue_1";
  public static final String TOPIC_QUEUE_2 = "topic_queue_2";
  public static final String TOPIC_EXCHANGE = "topic_exchange";
  
  @Bean
  public Queue topicQueue1() {
    return new Queue(TOPIC_QUEUE_1, true);
  }
  
  @Bean
  public Queue topicQueue2() {
    return new Queue(TOPIC_QUEUE_2, true);
  }
  
  @Bean
  public TopicExchange topicExchange() {
    return new TopicExchange(TOPIC_EXCHANGE);
  }
  
  @Bean
  public Binding topicBinding1() {
    return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("user.add");
  }
  
  @Bean
  public Binding topicBinding2() {
    return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("user.#");
  }  
}
RabbiitMQは複数の動作モードがあるので、配置が多いです。関連内容を知りたい方は、「RabbiitMQ仕事パターン紹介」または自分のBaidu関連資料を閲覧してください。
メッセージ生産者:

@Component
public class AmqpSender {
  @Autowired
  private AmqpTemplate amqpTemplate;
  /**
   *       
   * 
   * @param message
   */
  public void simpleSend(String message) {
    this.amqpTemplate.convertAndSend(AmqpConfirguration.SIMPLE_QUEUE, message);
  }
  /**
   *   /      
   * 
   * @param message
   */
  public void psSend(String message) {
    this.amqpTemplate.convertAndSend(AmqpConfirguration.FANOUT_EXCHANGE, "", message);
  }
  /**
   *       
   * 
   * @param message
   */
  public void routingSend(String routingKey, String message) {
    this.amqpTemplate.convertAndSend(AmqpConfirguration.DIRECT_EXCHANGE, routingKey, message);
  }
  /**
   *       
   * 
   * @param routingKey
   * @param message
   */
  public void topicSend(String routingKey, String message) {
    this.amqpTemplate.convertAndSend(AmqpConfirguration.TOPIC_EXCHANGE, routingKey, message);
  }
}
メッセージ消費者:

@Component
public class AmqpReceiver {
  /**
   *       
   * 
   * @param message
   */
  @RabbitListener(queues = AmqpConfirguration.SIMPLE_QUEUE)
  public void simpleReceive(String message) {
    System.out.println("    :" + message);
  }
  /**
   *   /      
   * 
   * @param message
   */
  @RabbitListener(queues = AmqpConfirguration.PS_QUEUE_1)
  public void psReceive1(String message) {
    System.out.println(AmqpConfirguration.PS_QUEUE_1 + "    :" + message);
  }
  @RabbitListener(queues = AmqpConfirguration.PS_QUEUE_2)
  public void psReceive2(String message) {
    System.out.println(AmqpConfirguration.PS_QUEUE_2 + "    :" + message);
  }
  /**
   *       
   * 
   * @param message
   */
  @RabbitListener(queues = AmqpConfirguration.ROUTING_QUEUE_1)
  public void routingReceive1(String message) {
    System.out.println(AmqpConfirguration.ROUTING_QUEUE_1 + "    :" + message);
  }
  @RabbitListener(queues = AmqpConfirguration.ROUTING_QUEUE_2)
  public void routingReceive2(String message) {
    System.out.println(AmqpConfirguration.ROUTING_QUEUE_2 + "    :" + message);
  }
  /**
   *       
   * 
   * @param message
   */
  @RabbitListener(queues = AmqpConfirguration.TOPIC_QUEUE_1)
  public void topicReceive1(String message) {
    System.out.println(AmqpConfirguration.TOPIC_QUEUE_1 + "    :" + message);
  }
  
  @RabbitListener(queues = AmqpConfirguration.TOPIC_QUEUE_2)
  public void topicReceive2(String message) {
    System.out.println(AmqpConfirguration.TOPIC_QUEUE_2 + "    :" + message);
  }
}
メッセージ消費者は@Rabbit Listenerを使ってメッセージを傍受する。
3.4テスト

@RunWith(SpringRunner.class)
@SpringBootTest
public class AmqpTest {
  @Autowired
  private AmqpSender sender;
  @Test
  public void testSimpleSend() {
    for (int i = 1; i < 6; i++) {
      this.sender.simpleSend("test simpleSend " + i);
    }
  }
  @Test
  public void testPsSend() {
    for (int i = 1; i < 6; i++) {
      this.sender.psSend("test psSend " + i);
    }
  }
  
  @Test
  public void testRoutingSend() {
    for (int i = 1; i < 6; i++) {
      this.sender.routingSend("order", "test routingSend " + i);
    }
  }
  
  @Test
  public void testTopicSend() {
    for (int i = 1; i < 6; i++) {
      this.sender.topicSend("user.add", "test topicSend " + i);
    }
  }
}
テストの結果は省略しました。
ピットを踏んで注意する1:ACCESS_REFSED C Login was refused using authentication mechange ism PLAN
ソリューション:
1)ユーザー名とパスワードが正しいかどうかを確認してください。ユーザ名とパスワードの値にスペースやタブが含まれているかどうかは注意してください。
2)テストアカウントがgustである場合、rabitmq.comファイルを修正する必要があります。このファイルに「loopback_」を追加します。users=none」配置。
ステップリマインド2:Canot prepare queue for listener.Either the queue doesn't exist or the breoker will not allow us to use it
ソリューション:
私たちはRabbiitMQの管理画面にログインして、Queオプションに手動で対応するキューを追加できます。
以上が本文の全部です。皆さんの勉強に役に立つように、私たちを応援してください。