RabbiitMQ第5編:Spring集積RabbiitMQ


前のいくつかの説明はラビットMqの使い方を説明していますが、この本は主にスプリング統合ラビットmqを解説しています.
まずプロファイルorg.springframe ework.amqpを紹介します.以下の通りです.

    org.springframework.amqp
    spring-rabbit
    1.7.3.RELEASE


    org.codehaus.jackson
    jackson-mapper-asl
    1.9.13

一:消費者と生成者の公共部分を配置する
rabbiitmq.properties
rabbit.hosts=10.50.200.234
rabbit.username=june
rabbit.password=june
rabbit.port=5672
rabbit.virtualHost=/
#   XML          
rabbit.queue=rabbitmq_test
amqp-share.xml

<beans xmlns="http://www.springframework.org/schema/beans"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xmlns:context="http://www.springframework.org/schema/context"
     xmlns:util="http://www.springframework.org/schema/util"
     xmlns:aop="http://www.springframework.org/schema/aop"
     xmlns:tx="http://www.springframework.org/schema/tx"
     xmlns:rabbit="http://www.springframework.org/schema/rabbit"
     xmlns:p="http://www.springframework.org/schema/p"
     xsi:schemaLocation="
     http://www.springframework.org/schema/context
     http://www.springframework.org/schema/context/spring-context-4.1.xsd
     http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
     http://www.springframework.org/schema/util
     http://www.springframework.org/schema/util/spring-util-4.1.xsd
     http://www.springframework.org/schema/aop
     http://www.springframework.org/schema/aop/spring-aop-4.1.xsd
     http://www.springframework.org/schema/tx
     http://www.springframework.org/schema/tx/spring-tx-4.1.xsd
     http://www.springframework.org/schema/rabbit
     http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

    <context:property-placeholder location="classpath:rabbitmq.properties"/>
    <util:properties id="appConfig" location="classpath:rabbitmq.properties">util:properties>

    <rabbit:connection-factory id="connectionFactory" host="${rabbit.hosts}"
                               port="${rabbit.port}" username="${rabbit.username}" password="${rabbit.password}" virtual-host="${rabbit.virtualHost}"
                               channel-cache-size="50"/>
    <rabbit:admin connection-factory="connectionFactory"/>
    
    <rabbit:queue name="spittle.alert.queue.1" durable="true" auto-delete="false"/>
    <rabbit:queue name="spittle.alert.queue.2" durable="true" auto-delete="false"/>
    <rabbit:queue name="spittle.alert.queue.3" durable="true" auto-delete="false"/>
    
    <rabbit:fanout-exchange id="spittle.fanout" name="spittle.fanout" durable="true">
        <rabbit:bindings>
            <rabbit:binding queue="spittle.alert.queue.1">rabbit:binding>
            <rabbit:binding queue="spittle.alert.queue.2">rabbit:binding>
            <rabbit:binding queue="spittle.alert.queue.3">rabbit:binding>
        rabbit:bindings>
    rabbit:fanout-exchange>

beans>
二:構成生成者

<beans xmlns="http://www.springframework.org/schema/beans"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xmlns:context="http://www.springframework.org/schema/context"
     xmlns:util="http://www.springframework.org/schema/util"
     xmlns:aop="http://www.springframework.org/schema/aop"
     xmlns:tx="http://www.springframework.org/schema/tx"
     xmlns:rabbit="http://www.springframework.org/schema/rabbit"
     xmlns:p="http://www.springframework.org/schema/p"
     xsi:schemaLocation="
     http://www.springframework.org/schema/context
     http://www.springframework.org/schema/context/spring-context-4.1.xsd
     http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
     http://www.springframework.org/schema/util
     http://www.springframework.org/schema/util/spring-util-4.1.xsd
     http://www.springframework.org/schema/aop
     http://www.springframework.org/schema/aop/spring-aop-4.1.xsd
     http://www.springframework.org/schema/tx
     http://www.springframework.org/schema/tx/spring-tx-4.1.xsd
     http://www.springframework.org/schema/rabbit
     http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
    <import resource="amqp-share.xml"/>

    <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter">bean>
    
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"
                     exchange="spittle.fanout" message-converter="jsonMessageConverter" />

beans>
三:生産者プログラム
/**
 * june.mq:com.june.mq.rabbit.spring.Spittle.java
 *   :2017 7 13 
 */
package com.june.mq.rabbit.spring;

import java.io.Serializable;
import java.util.Date;

/**
 * Spittle 
* * @author [email protected] * @blog https://www.github.com/junehappylove * @date 2017 7 13 3:42:21 * @version 1.0.0 */
public class Spittle implements Serializable { private static final long serialVersionUID = 1L; private Long id; private Spitter spitter; private String message; private Date postedTime; public Spittle(Long id, Spitter spitter, String message, Date postedTime) { this.id = id; this.spitter = spitter; this.message = message; this.postedTime = postedTime; } public Long getId() { return this.id; } public String getMessage() { return this.message; } public Date getPostedTime() { return this.postedTime; } public Spitter getSpitter() { return this.spitter; } } /** * june.mq:com.june.mq.rabbit.spring.ProducerMain.java * :2017 7 13 */ package com.june.mq.rabbit.spring; import java.util.Date; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; /** * ProducerMain
* * @author [email protected] * @blog https://www.github.com/junehappylove * @date 2017 7 13 3:44:02 * @version 1.0.0 */
public class ProducerMain { private static ApplicationContext context; /** * @param args * @throws InterruptedException */ public static void main(String[] args) throws InterruptedException { context = new ClassPathXmlApplicationContext("amqp/amqp-producer.xml"); AmqpTemplate template = (AmqpTemplate) context.getBean("rabbitTemplate"); for (int i = 0; i < 10; i++) { System.out.println("Sending message #" + i); Spittle spittle = new Spittle((long) i, null, "Hello world (" + i + ")", new Date()); template.convertAndSend(spittle); Thread.sleep(3000); } System.out.println("Done!"); } }
convertAndSend方法のデフォルトでは、最初のパラメータはスイッチ名であり、第二のパラメータはルート名であり、第三のパラメータは私たちが送ったデータである.
今プログラムを起動します.効果は以下の通りです.
   20, 2017 4:01:04    org.springframework.context.support.ClassPathXmlApplicationContext prepareRefresh
  : Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@22d8cfe0: startup date [Thu Jul 20 16:01:04 CST 2017]; root of context hierarchy
   20, 2017 4:01:04    org.springframework.beans.factory.xml.XmlBeanDefinitionReader loadBeanDefinitions
  : Loading XML bean definitions from class path resource [amqp/amqp-producer.xml]
   20, 2017 4:01:04    org.springframework.beans.factory.xml.XmlBeanDefinitionReader loadBeanDefinitions
  : Loading XML bean definitions from class path resource [amqp/amqp-share.xml]
   20, 2017 4:01:04    org.springframework.context.support.DefaultLifecycleProcessor start
  : Starting beans in phase -2147482648
Sending message #0
   20, 2017 4:01:04    org.springframework.amqp.rabbit.connection.CachingConnectionFactory createBareConnection
  : Created new connection: connectionFactory#2638011:0/SimpleConnection@c03cf28 [delegate=amqp://[email protected]:5672/, localPort= 57422]
Sending message #1
Sending message #2
Sending message #3
Sending message #4
Sending message #5
Sending message #6
Sending message #7
Sending message #8
Sending message #9
Done!
四:消費者プログラム
まず生産者の情報を傍受するためのコードを作成します.
package com.june.mq.rabbit.spring;

import java.io.UnsupportedEncodingException;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

/**
 * 
 * SpittleAlertHandler 
* * @author [email protected] * @blog https://www.github.com/junehappylove */
public class SpittleAlertHandler implements MessageListener { @Override public void onMessage(Message message) { try { String body = new String(message.getBody(), "UTF-8"); System.out.println(body); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } }
MessageListenerを実現するためには、メッセージのbodyを取得するだけでいいです.jsonを通じて必要なプログラムを変換します.
私たちは消費者を配置します.

<beans xmlns="http://www.springframework.org/schema/beans"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xmlns:context="http://www.springframework.org/schema/context"
     xmlns:util="http://www.springframework.org/schema/util"
     xmlns:aop="http://www.springframework.org/schema/aop"
     xmlns:tx="http://www.springframework.org/schema/tx"
     xmlns:rabbit="http://www.springframework.org/schema/rabbit"
     xmlns:p="http://www.springframework.org/schema/p"
     xsi:schemaLocation="
     http://www.springframework.org/schema/context
     http://www.springframework.org/schema/context/spring-context-4.1.xsd
     http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
     http://www.springframework.org/schema/util
     http://www.springframework.org/schema/util/spring-util-4.1.xsd
     http://www.springframework.org/schema/aop
     http://www.springframework.org/schema/aop/spring-aop-4.1.xsd
     http://www.springframework.org/schema/tx
     http://www.springframework.org/schema/tx/spring-tx-4.1.xsd
     http://www.springframework.org/schema/rabbit
     http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

    <import resource="amqp-share.xml"/>
    <rabbit:listener-container connection-factory="connectionFactory">
      <rabbit:listener ref="spittleListener" method="onMessage" queues="spittle.alert.queue.1,spittle.alert.queue.3,spittle.alert.queue.2"/>
    rabbit:listener-container>
    <bean id="spittleListener" class="com.june.mq.rabbit.spring.SpittleAlertHandler"/>

beans>
spittleListenerは傍受のプログラムです.methodは実行の方法です.queuesは私達が傍受する列です.複数の列はコンマで区切られています.
今はプログラムを起動するだけで実行できます.
/**
 * june.mq:com.june.mq.rabbit.spring.ConsumerMain.java
 *   :2017 7 13 
 */
package com.june.mq.rabbit.spring;

import org.springframework.context.ApplicationContext;

/**
 * ConsumerMain 
* * @author [email protected] * @blog https://www.github.com/junehappylove */
public class ConsumerMain { static ApplicationContext context = null; /** * @param args */ public static void main(String[] args) { context = new org.springframework.context.support.ClassPathXmlApplicationContext("amqp/amqp-consumer.xml"); } }
プログラム実行結果
   20, 2017 4:09:05    org.springframework.context.support.ClassPathXmlApplicationContext prepareRefresh
  : Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@22d8cfe0: startup date [Thu Jul 20 16:09:05 CST 2017]; root of context hierarchy
   20, 2017 4:09:05    org.springframework.beans.factory.xml.XmlBeanDefinitionReader loadBeanDefinitions
  : Loading XML bean definitions from class path resource [amqp/amqp-consumer.xml]
   20, 2017 4:09:05    org.springframework.beans.factory.xml.XmlBeanDefinitionReader loadBeanDefinitions
  : Loading XML bean definitions from class path resource [amqp/amqp-share.xml]
   20, 2017 4:09:06    org.springframework.context.support.DefaultLifecycleProcessor start
  : Starting beans in phase -2147482648
   20, 2017 4:09:06    org.springframework.context.support.DefaultLifecycleProcessor start
  : Starting beans in phase 2147483647
   20, 2017 4:09:06    org.springframework.amqp.rabbit.connection.CachingConnectionFactory createBareConnection
  : Created new connection: connectionFactory#40727802:0/SimpleConnection@5cf96df5 [delegate=amqp://[email protected]:5672/, localPort= 57477]
{"id":0,"spitter":null,"message":"Hello world (0)","postedTime":1500537664622}
{"id":0,"spitter":null,"message":"Hello world (0)","postedTime":1500537664622}
{"id":0,"spitter":null,"message":"Hello world (0)","postedTime":1500537664622}
{"id":1,"spitter":null,"message":"Hello world (1)","postedTime":1500537667743}
{"id":1,"spitter":null,"message":"Hello world (1)","postedTime":1500537667743}
{"id":1,"spitter":null,"message":"Hello world (1)","postedTime":1500537667743}
{"id":2,"spitter":null,"message":"Hello world (2)","postedTime":1500537670744}
{"id":2,"spitter":null,"message":"Hello world (2)","postedTime":1500537670744}
{"id":2,"spitter":null,"message":"Hello world (2)","postedTime":1500537670744}
{"id":3,"spitter":null,"message":"Hello world (3)","postedTime":1500537673745}
{"id":3,"spitter":null,"message":"Hello world (3)","postedTime":1500537673745}
{"id":3,"spitter":null,"message":"Hello world (3)","postedTime":1500537673745}
{"id":4,"spitter":null,"message":"Hello world (4)","postedTime":1500537676747}
{"id":4,"spitter":null,"message":"Hello world (4)","postedTime":1500537676747}
{"id":4,"spitter":null,"message":"Hello world (4)","postedTime":1500537676747}
{"id":5,"spitter":null,"message":"Hello world (5)","postedTime":1500537679748}
{"id":5,"spitter":null,"message":"Hello world (5)","postedTime":1500537679748}
{"id":5,"spitter":null,"message":"Hello world (5)","postedTime":1500537679748}
{"id":6,"spitter":null,"message":"Hello world (6)","postedTime":1500537682748}
{"id":6,"spitter":null,"message":"Hello world (6)","postedTime":1500537682748}
{"id":6,"spitter":null,"message":"Hello world (6)","postedTime":1500537682748}
{"id":7,"spitter":null,"message":"Hello world (7)","postedTime":1500537685748}
{"id":7,"spitter":null,"message":"Hello world (7)","postedTime":1500537685748}
{"id":7,"spitter":null,"message":"Hello world (7)","postedTime":1500537685748}
{"id":8,"spitter":null,"message":"Hello world (8)","postedTime":1500537688748}
{"id":8,"spitter":null,"message":"Hello world (8)","postedTime":1500537688748}
{"id":8,"spitter":null,"message":"Hello world (8)","postedTime":1500537688748}
{"id":9,"spitter":null,"message":"Hello world (9)","postedTime":1500537691748}
{"id":9,"spitter":null,"message":"Hello world (9)","postedTime":1500537691748}
{"id":9,"spitter":null,"message":"Hello world (9)","postedTime":1500537691748}
もちろんdirectは上の状況と同じです.ただ、これはルートマッチングによって、まずデータを交換機に送ります.そして、ルートとキューを結びつけて、スイッチIDと道の由来を通じてキューを見つけます.以下はいくつかの主要な配置です.
ampq-share.xml
<rabbit:queue id="spring-test-queue1" durable="true" auto-delete="false" exclusive="false" name="spring-test-queue1">rabbit:queue>
<rabbit:queue name="spring-test-queue2" durable="true" auto-delete="false" exclusive="false">rabbit:queue>


<rabbit:direct-exchange name="${rabbit.exchange.direct}" durable="true" auto-delete="false" id="${rabbit.exchange.direct}">
    <rabbit:bindings>
        <rabbit:binding queue="spring-test-queue1" key="spring.test.queueKey1"/>
        <rabbit:binding queue="spring-test-queue2" key="spring.test.queueKey2"/>
  rabbit:bindings>
rabbit:direct-exchange>
amqp-producer.xml

<rabbit:template exchange="${rabbit.exchange.direct}" id="rabbitTemplate" connection-factory="connectionFactory"
message-converter="jsonMessageConverter">rabbit:template>

<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter">bean>
以下は消費者モニターの配置です.
amqp-consumer.xml
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
    <rabbit:listener queues="spring-test-queue1" method="onMessage" ref="queueListenter">rabbit:listener>
rabbit:listener-container>
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
    <rabbit:listener queues="spring-test-queue2" method="onMessage" ref="queueListenter">rabbit:listener>
rabbit:listener-container>
次はプログラムです
public static void main(String[] args) {
    ApplicationContext context = new ClassPathXmlApplicationContext("amqp/amqp-producer.xml");
    MQProducer mqProducer = (MQProducer) context.getBean("mqProducer");
    mqProducer.sendDataToQueue("spring.test.queueKey1", "Hello World spring.test.queueKey1");
    mqProducer.sendDataToQueue("spring.test.queueKey2", "Hello World spring.test.queueKey2");
}
実際の状況は、消費者と生成者を分離するための手順が必要かもしれません.もちろんspringには負荷バランスの設定がありますが、ここではあまり紹介しません.
          ,      

                                            ---- 2017-07-20 junehappylove