Spring BootはRabbiitMQとRabbiitMQを使って紹介します.

12845 ワード

title: SpringBoot MQ    
date: 2019-06-24
author: maxzhao
tags:
  - SpringBoot
  - MQ
categories:
  - DevelopTools
  - SpringBoot
  -    
SpringBootはRabbiitMQを使用しています.
ラビットMQは
  • メッセージキュー
  • は、AMQP(高級メッセージキュープロトコルAdvanced Message Queuing Protocol)を実装するメッセージ中間の
  • の一つである.
    作用:主にアプリケーションの非同期と結合を実現するために用いられ、同時にメッセージバッファ、メッセージ配信の役割も果たすことができる.
    プロセス:
    一般的なメッセージ・キューは、生産者がメッセージをキューに送り、消費者がモニターして消費するものです.
    rabbiitmqのうちの一つの仮想ホスト(vhostデフォルト/)は、一つ以上のスイッチ(Exchange)を持っている.ユーザーは仮想ホストの粒度のみで権限制御を行うことができ、スイッチは、あるポリシーに従ってキューにバインディング(Binding)し、このように生産者とキューは直接に連絡せずに、メッセージを送信するスイッチであり、スイッチは、メッセージをバインディングされたキューに転送する.
    スイッチ(Exchange)はrabbitmq独特の概念で、最もよく使われるのは4つのタイプです.
  • Direct:まずマッチングしてから投げます.バインディング時にrouting_を設定します.key、メッセージのroutingkeyがマッチングすると、スイッチがバインディングされたキューに送られます.スイッチとキューは正確な対応関係でなければなりません.これが一番簡単です.
  • Topic:転送メッセージは主にワイルドカードによるものです.このようなスイッチの下で、キューとスイッチのバインディングはルーティングモードを定義し、このようなルーティングモードとルーティングキーとの間で一致した後にスイッチがメッセージを転送することができる.これはDirectのフレキシブルバージョン
  • と考えられる.
  • Headers:ルールによってもマッチングし、directとtopicより固定的にroutingkeyを使用し、headersはカスタムマッチングルールのタイプであり、キューとスイッチが結合されたときにキーペアルールを設定し、メッセージにはキーペアまたはすべてのマッチングがあるときにキー値ペアが設定されます.メッセージは対応するキュー
  • に送られる.
  • Fanoout:メッセージブロードキャストモードは、ルーティングキーまたはルーティングモードにかかわらず、メッセージをそのすべてのキューにバインドし、routingkeyを設定すると、
  • を無視することができる.
    例をあげて説明する
    2つのスイッチdirectExchangefanoutExchange、3つのキューqueueAqueueBqueueC.
    キューdirectExchangeは、ポイントとして送信され、キューA Bを含む.
    キューfanoutExchangeは、ブロードキャストとして送信され、キューA B Cを含む.
    @Configuration
    public class RabbitConfig {
        @Resource
        private RabbitTemplate rabbitTemplate;
        /**
         *    Direct          .
         *
         * @return the exchange
         */
        @Bean("directExchange")
        public Exchange directExchange() {
            return ExchangeBuilder.directExchange("DIRECT_EXCHANGE").durable(true).build();
        }
        /**
         *    fanout    .
         *
         * @return the exchange
         */
        @Bean("fanoutExchange")
        public FanoutExchange fanoutExchange() {
            return (FanoutExchange) ExchangeBuilder.fanoutExchange("FANOUT_EXCHANGE").durable(true).build();
        }
        /**
         *             .
         *
         * @return the queue
         */
        @Bean("queueA")
        public Queue directQueue() {
            return QueueBuilder.durable("QUEUE_A").build();
        }
        @Bean("queueB")
        public Queue directQueue() {
            return QueueBuilder.durable("QUEUE_B").build();
        }
        @Bean("queueC")
        public Queue directQueue() {
            return QueueBuilder.durable("QUEUE_C").build();
        }
        /**
         *     A   direct    .
         *
         * @param queue          the queue
         * @param exchange       the  exchange
         * @return the binding
         */
        @Bean
        public Binding bindingA(@Qualifier("queueA") Queue queue,
                                @Qualifier("directExchange") exchange fanoutExchange) {
            return BindingBuilder.bind(queue).to(fanoutExchange).with("DIRECT_ROUTING_KEY_A").noargs();
        }
        @Bean
        public Binding bindingA(@Qualifier("queueB") Queue queue,
                                @Qualifier("directExchange") exchange fanoutExchange) {
            return BindingBuilder.bind(queue).to(fanoutExchange).with("DIRECT_ROUTING_KEY_B").noargs();
        }
        /**
         *     A   fanout    .
         *
         * @param queue          the queue
         * @param exchange       the  exchange
         * @return the binding
         */
        @Bean
        public Binding bindingA(@Qualifier("queueA") Queue queue,
                                @Qualifier("fanoutExchange") exchange fanoutExchange) {
            return BindingBuilder.bind(queue).to(fanoutExchange);
        }
        @Bean
        public Binding bindingA(@Qualifier("queueB") Queue queue,
                                @Qualifier("fanoutExchange") exchange fanoutExchange) {
            return BindingBuilder.bind(queue).to(fanoutExchange).;
        }
        @Bean
        public Binding bindingA(@Qualifier("queueC") Queue queue,
                                @Qualifier("fanoutExchange") exchange fanoutExchange) {
            return BindingBuilder.bind(queue).to(fanoutExchange).;
        }
    
    }
    
    メッセージ送信クラス
    @Service
    public class SenderService {
        private Logger logger = LoggerFactory.getLogger(this.getClass());
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        /**
         *       .
         *
         * @param p the p
         * @return the response entity
         */
        public void broadcast(String p) {
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            rabbitTemplate.convertAndSend("FANOUT_EXCHANGE", "", p, correlationData);
        }
    
        /**
         *    Direct   .
         *
         * @param p the p
         * @return the response entity
         */
        public void directA(String p) {
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "DIRECT_ROUTING_KEY_A", p, correlationData);
        }
        public void directB(String p) {
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "DIRECT_ROUTING_KEY_B", p, correlationData);
        }
        public void directNull(String p) {
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "", p, correlationData);
        }
    
    }
    
    
    メッセージ受信クラス
    @Component
    public class Receiver {
        private static final Logger log = LoggerFactory.getLogger(Receiver.class);
    
        /**
         * FANOUT       .
         *
         * @param message the message
         * @param channel the channel
         * @throws IOException the io exception          
         */
        @RabbitListener(queues = {"QUEUE_A"})
        public void on(Message message, Channel channel) throws IOException {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
            log.debug("FANOUT_QUEUE_A " + new String(message.getBody()));
        }
        @RabbitListener(queues = {"QUEUE_B"})
        public void t(Message message, Channel channel) throws IOException {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
            log.debug("FANOUT_QUEUE_B " + new String(message.getBody()));
        }
        @RabbitListener(queues = {"QUEUE_C"})
        public void t(Message message, Channel channel) throws IOException {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
            log.debug("FANOUT_QUEUE_C " + new String(message.getBody()));
        }
    
        /**
         * DIRECT  .
         *
         * @param message the message
         * @param channel the channel
         * @throws IOException the io exception          
         */
        @RabbitListener(queues = {"DIRECT_QUEUE"})
        public void message(Message message, Channel channel) throws IOException {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
            log.debug("DIRECT_QUEUE " + new String(message.getBody()));
        }
    }
    
    テストクラス
    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = Application.class)
    public class SenderServiceTest {
        @Autowired
        private SenderService senderService;
    
        @Test
        public void testCache() throws InterruptedException {
            //       
            senderService.broadcast("   Test          !");
            //   Direct  
            senderService.directA("   Test         A ");
            senderService.directB("   Test         B ");
            senderService.directNull("   Test         null key ");
            Thread.sleep(5000L);
        }
    }
    
    
    結果
    DIRECT_QUEUE_A "   Test          !"
    FANOUT_QUEUE_B "   Test          !"
    FANOUT_QUEUE_C "   Test          !"
    DIRECT_QUEUE_A"   Test         A "
    DIRECT_QUEUE_B"   Test         B "
    
    null keyは現れていないので、directExchangeにはブロードキャスト可能なキューがありません.
    Maven依存
        
            org.springframework.boot
            spring-boot-starter-amqp
        
    
    設定
    spring:
      rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: maxzhao
        password: maxzhao
        #      
        publisher-confirms: true 
        #      
        publisher-returns: true      
        #    /
        virtual-host: maxzhao_vhost 
        listener:
          simple:
            acknowledge-mode: manual #      
            concurrency: 1 #          
            max-concurrency: 1 #          
            retry:
              enabled: true #      
    
    その他の参考
    JMS紹介と使用シーン
    プロフィール:
    ニュースキューとJMSの基礎知識と使用シーンを説明します.
  • は、JMS:Javaメッセージサービス(Java Message Service)とは、Javaプラットフォームにおけるメッセージ中間デバイス向けインタフェース
  • です.
  • JMSは、JDBC(Java Database Connective ity)に類似したメッセージ授受システムメッセージにアクセスするための、ベンダーとは無関係のAPIである.ここで、JDBCは、多くの異なる関係データベースにアクセスできるAPI
  • である.
  • 使用シーン:
  • クロスプラットフォーム
  • 多言語
  • 多項目
  • デカップリング
  • 分散式事務
  • 流量制御
  • 最終一致性
  • RPC呼び出し
  • ダウンストリームドッキング、データソース変動->通知
  • 概念
  • JMSプロバイダ:Apache ActiveMQ、ラビットMQ、Kafka、Notify、MetaQ、RocketMQ
  • JMS生産者(Message Producer)
  • JMS消費者(Message Consmer)
  • JMSメッセージ
  • JMSキュー
  • JMSテーマ
  • JMSメッセージは、通常2つのタイプがあります.ポイントペア、リリース/購読(Publish/Subscribe
  • )
    専門用語
  • Broker-簡単に言えば、メッセージキューサーバのエンティティです.
  • Exchange-メッセージルータは、メッセージをバインディングされたキューに転送し、メッセージをどのルールでどの列にルーティングするかを指定します.
  • Queueue-メッセージ・キューは、メッセージを格納するために使用され、各メッセージは1つまたは複数の列に投入されます.
  • Binding-バインディングは、ExchangeとQueueをルーティング規則に従って結びつける役割を果たします.
  • RoutingKey-ルーティングキー、Exchangeはこのキーワードに基づいてメッセージを配信します.
  • Producter-メッセージ生産者、メッセージ生成プログラム.
  • Consmer-メッセージ消費者、メッセージ受信プログラム.
  • Channel-メッセージチャネルは、クライアントの各接続に複数のChannelを確立することができます.各チャンネルはセッションを表します.
  • プログラミングモデル
  • Connection Factory:工場に接続し、JMSはそれで
  • を作成します.
  • Connection:JMSクライアントからJMS Providerへの接続
  • Session:メッセージを送信または受信するスレッド
  • Destination:メッセージの宛先、メッセージは誰に送信されますか?
  • Message Consmer/Message Producer:メッセージ受信者、消費者
  • ラビットMQ
    RabbiitMQは優れたメッセージングエージェントです.メッセージを受け取り、転送します.これを郵便局と見なしてもいいです.自分の手紙を宛先の住所に書いてポストに入れると大丈夫です.郵便局が責任を持って目的地に送ります.
    一、据え付ける
    ここで使っているのはArchLInuxです.
    sudo pacman -S rabbitmq
    #       socat  erlang-nox rabbitmq 
    
    RPM実装
    #    erlang
    rpm -Uvh https://mirrors.ustc.edu.cn/epel/7/x86_64/Packages/e/epel-release-7-11.noarch.rpm
    yum install erlang
    #  RabbitMQ
    rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
    wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.15/rabbitmq-server-3.6.15-1.el7.noarch.rpm
    wget ftp://195.220.108.108/linux/centos/7.4.1708/os/x86_64/Packages/socat-1.7.3.2-2.el7.x86_64.rpm
    yum localinstall -C -y --disablerepo=* *.rpm
    
    web管理画面のインストール
    sudo rabbitmq-plugins enable rabbitmq_management
    #        
    sudo rabbitmq-plugins list
    
    起動
    #         
    systemctl enable rabbitmq-server
    #   RabbitMQ
    systemctl start rabbitmq-server
    #   
    systemctl restart rabbitmq-server
    
    設定
    vim /etc/rabbitmq/rabbitmq.config
    
    設定ポート、備考:メッセージポート5672であれば、ウェブアクセスポートは15672である.
    [
        {rabbit,
          [
            {loopback_users, []},
            {tcp_listeners, [5672]}
          ]
        }
    ]
    
    ユーザー管理
    #   guest   
    sudo rabbitmqctl list_users
    sudo rabbitmqctl change_password guest guest
    
    #            test/test:
    sudo rabbitmqctl add_user maxzhao maxzhao
    # 
    sudo rabbitmqctl set_user_tags maxzhao administrator
    # /  vhost     Configure regexp   Write regexp  Read regexp
    sudo rabbitmqctl set_permissions -p / maxzhao ".*" ".*" ".*"
    # Sets user topic permissions for an exchange,     AMQP default  exchange
    # sudo rabbitmqctl set_topic_permissions
    
    vhostを追加
    #     
    sudo rabbitmqctl  --help
    #      vhost    
    sudo rabbitmqctl add_vhost --help
    #    
    sudo rabbitmqctl add_vhost maxzhao_vhost
    #   
    sudo rabbitmqctl  list_vhosts
    #   
    sudo rabbitmqctl set_permissions -p /maxzhao_vhost maxzhao ".*" ".*" ".*"
    
    
    vhostを削除
    sudo rabbitmqctl add_vhost maxzhaoTest
    sudo rabbitmqctl delete_vhost maxzhaoTest
    
    二、タスクのキューなどの参照
    本論文の住所:SpringBootはRabbiitMQ及びRabbiitMQを使用して紹介する.
    おすすめ
    公式の例
    下の2017年に書いたのですが、比較的全面的な新米教材です.
    RabbiitMQ簡易教程-タスクキュー
    RabbiitMQ簡易教程-購読を発表します.
    RabbiitMQ簡易教程-ルート
    RabbiitMQ簡易教程-テーマ
    RabbiitMQ簡易教程-RPC
    RabbitMQ簡易教程-Websocket
    RabbiitMQ簡易教程-併発スケジュール