spring hornetq selectorフィルタメッセージ
19098 ワード
JMSメッセージを受信する時、私達は常にメッセージの列の中で自分の必要なメッセージを濾過して、私達のいらないニュースを捨てます.この時はJMSのselector機能が必要です.ここではspring 3.1と結合して、一例を示します.
メッセージの送信の設定:
メッセージを送るSenderクラス
メッセージを送るメール関数クラス
使用するインターフェースクラス:
----------------------------------受信メッセージを開始する-------------------
受信したspringのプロファイル:
Message Listenerインターフェースを実現したクラス:
メーン関数を含むブートクラス:
------------備考:
上で受信したspringの配置ファイルはまた別のJMS Namespace Supportで構成できます.destination=「org.spring.jms.selector.queue」ここはqueueの名前で、引用ではありません.
設定ファイルは以下の通りです.
メッセージの送信の設定:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util"
xmlns:jee="http://www.springframework.org/schema/jee" xmlns:lang="http://www.springframework.org/schema/lang"
xmlns:jms="http://www.springframework.org/schema/jms" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd
http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-3.0.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">
<bean id="selectorQueue" class="org.hornetq.api.jms.HornetQJMSClient"
factory-method="createQueue">
<constructor-arg value="org.spring.jms.selector.queue" />
</bean>
<bean id="transportConfiguration" class="org.hornetq.api.core.TransportConfiguration">
<constructor-arg
value="org.hornetq.core.remoting.impl.netty.NettyConnectorFactory" />
<constructor-arg>
<map key-type="java.lang.String" value-type="java.lang.Object">
<entry key="host" value="localhost"></entry>
<entry key="port" value="5445"></entry>
</map>
</constructor-arg>
</bean>
<bean id="connectionFactory" class="org.hornetq.api.jms.HornetQJMSClient"
factory-method="createConnectionFactoryWithoutHA">
<constructor-arg type="org.hornetq.api.jms.JMSFactoryType"
value="CF" />
<constructor-arg ref="transportConfiguration" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
</bean>
<bean id="sender" class="com.wanmei.jms.spring.selector.config.sender.Sender">
<property name="jmsTemplate" ref="jmsTemplate" />
<property name="destination" ref="selectorQueue" />
</bean>
</beans>
メッセージを送るSenderクラス
/**
* <pre>
* </pre>
*/
package com.wanmei.jms.spring.selector.config.sender;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.log4j.Logger;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import com.wanmei.jms.spring.selector.java.State;
/**
* <pre>
* date 2012-12-20
* </pre>
*/
public class Sender implements State {
private static final Logger LOGGER = Logger.getLogger(Sender.class);
private JmsTemplate jmsTemplate;
private Destination destination;
public void send(final String message, final String fromNode,
final String toNode) {
try {
LOGGER.info("start to send message to " + destination
+ " [message:" + message + ",fromNode:" + fromNode
+ ",toNode:" + toNode);
jmsTemplate.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session)
throws JMSException {
LOGGER.info("session:"+session + "
message : " + message);
TextMessage msg = session.createTextMessage(message);
msg.setStringProperty(FROM_NODE, fromNode);
msg.setStringProperty(TO_NODE, toNode);
LOGGER.info("-->"+msg);
LOGGER.info(TO_NODE+"-->"+toNode);
return msg;
}
});
LOGGER.info("send message to " + destination + " successfully!");
} catch (Throwable t) {
LOGGER.error("Error:" + t.getMessage(), t);
}
}
// ----------------- setter / getter
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public Destination getDestination() {
return destination;
}
public void setDestination(Destination destination) {
this.destination = destination;
}
public static Logger getLogger() {
return LOGGER;
}
}
メッセージを送るメール関数クラス
/**
* <pre>
* </pre>
*/
package com.wanmei.jms.spring.selector.config.sender;
import org.apache.log4j.Logger;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
/**
* <pre>
* date 2012-12-20
* </pre>
*/
public class BootstrapSender {
private static final Logger LOGGER = Logger
.getLogger(BootstrapSender.class);
/**
* <pre>
* @param args
* </pre>
*/
public static void main(String[] args) {
LOGGER.info("start to work and initialize spring frame.");
String configLocation = "E:/workspace_java/hornetq/src/com/wanmei/jms/spring/selector/config/sender/applicationContext.xml";
ApplicationContext applicationContext = new FileSystemXmlApplicationContext(
configLocation);
LOGGER.info("initialize spring frame successfully!");
Sender sender = applicationContext.getBean("sender", Sender.class);
for (int i = 0; i < 10; i++) {
Msg msg = createMessage(i);
sender.send(msg.getMessage() + "-" + i, msg.getFromNode(), msg.getToNode());
}
}
private static Msg createMessage(int i) {
String base1 = "127.0.0.1";
String base2 = "127.0.0.2";
String message = message(base1);
String fromNode = from(base1);
String toNode = to(base2);
if (i % 2 == 0) {
message = message(base2);
fromNode = from(base2);
toNode = to(base1);
}
return new Msg(message, fromNode, toNode);
}
private static String message(String str) {
return "send " + str;
}
private static String from(String str) {
return "from " + str;
}
private static String to(String str) {
return "to " + str;
}
}
class Msg {
private String message;
private String fromNode;
private String toNode;
/**
* <pre>
* </pre>
*/
public Msg() {
super();
}
/**
* <pre>
* @param message
* @param fromNode
* @param toNode
* </pre>
*/
public Msg(String message, String fromNode, String toNode) {
super();
this.message = message;
this.fromNode = fromNode;
this.toNode = toNode;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public String getFromNode() {
return fromNode;
}
public void setFromNode(String fromNode) {
this.fromNode = fromNode;
}
public String getToNode() {
return toNode;
}
public void setToNode(String toNode) {
this.toNode = toNode;
}
}
使用するインターフェースクラス:
/**
* <pre>
* </pre>
*/
package com.wanmei.jms.spring.selector.java;
/**
* <pre>
* date 2012-12-20
* </pre>
*/
public interface State {
public final static String FROM_NODE = "FROM_NODE";
public final static String TO_NODE = "TO_NODE";
}
----------------------------------受信メッセージを開始する-------------------
受信したspringのプロファイル:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util"
xmlns:jee="http://www.springframework.org/schema/jee" xmlns:lang="http://www.springframework.org/schema/lang"
xmlns:jms="http://www.springframework.org/schema/jms" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd
http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-3.0.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">
<bean id="selectorQueue" class="org.hornetq.api.jms.HornetQJMSClient"
factory-method="createQueue">
<constructor-arg value="org.spring.jms.selector.queue" />
</bean>
<bean id="transportConfiguration" class="org.hornetq.api.core.TransportConfiguration">
<constructor-arg
value="org.hornetq.core.remoting.impl.netty.NettyConnectorFactory" />
<constructor-arg>
<map key-type="java.lang.String" value-type="java.lang.Object">
<entry key="host" value="localhost"></entry>
<entry key="port" value="5445"></entry>
</map>
</constructor-arg>
</bean>
<bean id="connectionFactory" class="org.hornetq.api.jms.HornetQJMSClient"
factory-method="createConnectionFactoryWithoutHA">
<constructor-arg type="org.hornetq.api.jms.JMSFactoryType"
value="CF" />
<constructor-arg ref="transportConfiguration" />
</bean>
<bean id="receiveListener" class="com.wanmei.jms.spring.selector.java.ReceiveListener"/>
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="selectorQueue"/>
<property name="messageListener" ref="receiveListener"/>
<property name="messageSelector" value="TO_NODE='to 127.0.0.1'"/>
</bean>
</beans>
Message Listenerインターフェースを実現したクラス:
/**
* <pre>
* </pre>
*/
package com.wanmei.jms.spring.selector.java;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.apache.log4j.Logger;
/**
* <pre>
* date 2012-12-20
* </pre>
*/
public class ReceiveListener implements MessageListener, State {
private static final Logger LOGGER = Logger
.getLogger(ReceiveListener.class);
/*
* (non-Javadoc)
*
* @see javax.jms.MessageListener#onMessage(javax.jms.Message)
*/
@Override
public void onMessage(Message msg) {
try {
// LOGGER.info("start to receive from " + msg.getJMSDestination());
TextMessage message = (TextMessage) msg;
String fromNode = message.getStringProperty(FROM_NODE);
String toNode = message.getStringProperty(TO_NODE);
LOGGER.info("receive message from " + message.getJMSDestination()
+ ", msg : " + message.getText() + ", fromNode : " + fromNode
+ ", toNode : " + toNode);
} catch (JMSException e) {
LOGGER.error("Error" + e.getMessage(), e);
}
}
}
メーン関数を含むブートクラス:
/**
* <pre>
* </pre>
*/
package com.wanmei.jms.spring.selector;
import org.apache.log4j.Logger;
import org.springframework.context.support.FileSystemXmlApplicationContext;
/**
* <pre>
* date 2012-12-20
* </pre>
*/
public class Bootstrap {
private static final Logger LOGGER = Logger
.getLogger(Bootstrap.class);
/**
*<pre>
* @param args
*</pre>
*/
public static void main(String[] args) {
LOGGER.info("start to work and initialize spring frame.");
String configLocation = "E:/workspace_tmp_copy/hornetq/src/com/wanmei/jms/spring/selector/config/receiver/c1/applicationContext.xml";
new FileSystemXmlApplicationContext(configLocation);
LOGGER.info("initialize spring frame successfully!");
LOGGER.info("spring : " + configLocation);
}
}
------------備考:
上で受信したspringの配置ファイルはまた別のJMS Namespace Supportで構成できます.destination=「org.spring.jms.selector.queue」ここはqueueの名前で、引用ではありません.
設定ファイルは以下の通りです.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util"
xmlns:jee="http://www.springframework.org/schema/jee" xmlns:lang="http://www.springframework.org/schema/lang"
xmlns:jms="http://www.springframework.org/schema/jms" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd
http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-3.0.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">
<bean id="transportConfiguration" class="org.hornetq.api.core.TransportConfiguration">
<constructor-arg
value="org.hornetq.core.remoting.impl.netty.NettyConnectorFactory" />
<constructor-arg>
<map key-type="java.lang.String" value-type="java.lang.Object">
<entry key="host" value="localhost"></entry>
<entry key="port" value="5445"></entry>
</map>
</constructor-arg>
</bean>
<bean id="connectionFactory" class="org.hornetq.api.jms.HornetQJMSClient"
factory-method="createConnectionFactoryWithoutHA">
<constructor-arg type="org.hornetq.api.jms.JMSFactoryType"
value="CF" />
<constructor-arg ref="transportConfiguration" />
</bean>
<bean id="receiveListener" class="com.wanmei.jms.spring.selector.java.ReceiveListener"/>
<jms:listener-container connection-factory="connectionFactory"
concurrency="10">
<jms:listener destination="org.spring.jms.selector.queue" ref="receiveListener" selector="TO_NODE='to 127.0.0.1'"/>
</jms:listener-container>
</beans>
具体的な内容はspring.3.1のreferenceの文書を参照してください.