Springboot RabbiitMQ文書を配置する方法手順


概要
RabbiitMQは、AMQP(高級メッセージキュープロトコル)を実装するメッセージ中間デバイスの一つであり、分散システムに転送メッセージを格納するためのものであり、使用性、拡張性、利用可能性の高いなどの点で、一般的ではないことを示している。
概念:
  • 生産者メッセージの生成者は、メッセージをメッセージキュー
  • に送る責任がある。
  • 消費者メッセージの最終的な受信者は、キュー内の対応するメッセージを傍受し、消費メッセージ
  • キューメッセージのレジスタは、生産者が送信するメッセージ
  • を格納する。
  • スイッチは、ある規則に従って生産者によって生成されたメッセージを配信する責任がある
  • バインディング完了スイッチとキュー間のバインディング
  • モード:
  • direct:直接モードで、インスタンス間のタスクの配信に
  • topic:話題パターンは、exchangeに結び付けられたキュー
  • に構成可能な規則によって配信される。
  • headers:適用規則が複雑な配布は、headers内のパラメータでルールを表現する
  • fanout:exchangeに結合されたすべての列に配布し、routing keyを無視する。
  • Spring Boot統合RabbiitMQ
    一、本道の導入依存
    
    <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
     <version>1.5.2.RELEASE</version>
    </dependency>
    二、配置appication.properties
    
    # rabbitmq
    spring.rabbitmq.host = dev-mq.a.pa.com
    spring.rabbitmq.port = 5672
    spring.rabbitmq.username = admin
    spring.rabbitmq.password = admin
    spring.rabbitmq.virtualHost = /message-test/
    
    三、AmqpConfigrationの配置ファイルを作成する
    
    package message.test.configuration;
    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class AmqpConfiguration {
    /**
     *     
     */
     public static final String MESSAGE_ENCODING = "UTF-8";
     public static final String EXCHANGE_ISSUE = "exchange_message_issue";
     public static final String QUEUE_ISSUE_USER = "queue_message_issue_user";
     public static final String QUEUE_ISSUE_ALL_USER = "queue_message_issue_all_user";
     public static final String QUEUE_ISSUE_ALL_DEVICE = "queue_message_issue_all_device";
     public static final String QUEUE_ISSUE_CITY = "queue_message_issue_city";
     public static final String ROUTING_KEY_ISSUE_USER = "routing_key_message_issue_user";
     public static final String ROUTING_KEY_ISSUE_ALL_USER = "routing_key_message_issue_all_user";
     public static final String ROUTING_KEY_ISSUE_ALL_DEVICE = "routing_key_message_issue_all_device";
     public static final String ROUTING_KEY_ISSUE_CITY = "routing_key_message_issue_city";
     public static final String EXCHANGE_PUSH = "exchange_message_push";
     public static final String QUEUE_PUSH_RESULT = "queue_message_push_result";
    
     @Autowired
     private RabbitProperties rabbitProperties;
    
     @Bean
     public Queue issueUserQueue() {
      return new Queue(QUEUE_ISSUE_USER);
     }
    
     @Bean
     public Queue issueAllUserQueue() {
      return new Queue(QUEUE_ISSUE_ALL_USER);
     }
    
     @Bean
     public Queue issueAllDeviceQueue() {
      return new Queue(QUEUE_ISSUE_ALL_DEVICE);
     }
    
     @Bean
     public Queue issueCityQueue() {
      return new Queue(QUEUE_ISSUE_CITY);
     }
    
     @Bean
     public Queue pushResultQueue() {
      return new Queue(QUEUE_PUSH_RESULT);
     }
    
     @Bean
     public DirectExchange issueExchange() {
      return new DirectExchange(EXCHANGE_ISSUE);
     }
    
     @Bean
     public DirectExchange pushExchange() {
      //   1:  
      //   2:     
      //   3:      
      return new DirectExchange(EXCHANGE_PUSH, true, true);
     }
    
     @Bean
     public Binding issueUserQueueBinding(@Qualifier("issueUserQueue") Queue queue,
        @Qualifier("issueExchange") DirectExchange exchange) {
       return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_USER);
     }
    
     @Bean
     public Binding issueAllUserQueueBinding(@Qualifier("issueAllUserQueue") Queue queue,
        @Qualifier("issueExchange") DirectExchange exchange) {
      return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_ALL_USER);
     }
    
     @Bean
     public Binding issueAllDeviceQueueBinding(@Qualifier("issueAllDeviceQueue") Queue queue,
        @Qualifier("issueExchange") DirectExchange exchange) {
      return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_ALL_DEVICE);
     }
    
     @Bean
     public Binding issueCityQueueBinding(@Qualifier("issueCityQueue") Queue queue,
        @Qualifier("issueExchange") DirectExchange exchange) {
      return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_CITY);
     }
    
     @Bean
     public Binding pushResultQueueBinding(@Qualifier("pushResultQueue") Queue queue,
        @Qualifier("pushExchange") DirectExchange exchange) {
      return BindingBuilder.bind(queue).to(exchange).withQueueName();
     }
    
     @Bean
     public ConnectionFactory defaultConnectionFactory() {
      CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
      connectionFactory.setHost(rabbitProperties.getHost());
      connectionFactory.setPort(rabbitProperties.getPort());
      connectionFactory.setUsername(rabbitProperties.getUsername());
      connectionFactory.setPassword(rabbitProperties.getPassword());
      connectionFactory.setVirtualHost(rabbitProperties.getVirtualHost());
      return connectionFactory;
     }
    
     @Bean
     public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
        @Qualifier("defaultConnectionFactory") ConnectionFactory connectionFactory) {
      SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
      factory.setConnectionFactory(connectionFactory);
      factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
      return factory;
     }
    
     @Bean
     public AmqpTemplate rabbitTemplate(@Qualifier("defaultConnectionFactory") ConnectionFactory connectionFactory) 
     {
      return new RabbitTemplate(connectionFactory);
     }
    }
    
    
    三、編纂生産者
    
    body = JSON.toJSONString(issueMessage).getBytes(AmqpConfiguration.MESSAGE_ENCODING);
     rabbitTemplate.convertAndSend(AmqpConfiguration.EXCHANGE_ISSUE,
                AmqpConfiguration.ROUTING_KEY_ISSUE_USER, body);
    四、消費者を編纂する
    
    @RabbitListener(queues = AmqpConfiguration.QUEUE_PUSH_RESULT)
    public void handlePushResult(@Payload byte[] data, Channel channel,
        @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        
    }
    
    以上が本文の全部です。皆さんの勉強に役に立つように、私たちを応援してください。