ActiveMQ Connecctonにおけるキューの傍受の使用とテスト
DestinationSourceを使用して、現在のConnectionのqueueとtopicの個数と情報のリスニング時間をリスニングします.
package easyway.app.activemq.demo.monitors;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.DestinationEvent;
import org.apache.activemq.advisory.DestinationListener;
import org.apache.activemq.advisory.DestinationSource;
import org.apache.activemq.advisory.ProducerEvent;
import org.apache.activemq.advisory.ProducerEventSource;
import org.apache.activemq.advisory.ProducerListener;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.ActiveMQTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Conneccton
*
* ActiveMQ
* @author longgangbai
*/
public class DestinationSourceMonitor implements DestinationListener ,ProducerListener{
private static final transient Logger LOG = LoggerFactory.getLogger(DestinationSourceMonitor.class);
protected ActiveMQConnection connection;
protected ActiveMQConnectionFactory connectionFactory;
protected ActiveMQQueue sampleQueue = new ActiveMQQueue("foo.bar");
protected ActiveMQTopic sampleTopic = new ActiveMQTopic("cheese");
protected Boolean useTopic=true;
protected int consumerCounter;
protected BrokerService broker;
protected Session consumerSession1;
protected Session consumerSession2;
protected ProducerEventSource producerEventSource;
protected List<ActiveMQDestination> newDestinations = new ArrayList<ActiveMQDestination>();
String bindAddress="tcp://localhost:61619";//ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
protected BlockingQueue<ProducerEvent> eventQueue = new ArrayBlockingQueue<ProducerEvent>(1000);
protected ActiveMQDestination destination;
/**
*
* DestinationSource Connection
* @throws Exception
*/
public void destiationSourceHasInitialDestinations() throws Exception {
// DestinationSource
DestinationSource destinationSource = connection.getDestinationSource();
// connection p2p
Set<ActiveMQQueue> queues = destinationSource.getQueues();
// Connection topic
Set<ActiveMQTopic> topics = destinationSource.getTopics();
// connection p2p
Set<ActiveMQTempQueue> tmpqueues = destinationSource.getTemporaryQueues();
// Connection topic
Set<ActiveMQTempTopic> tmptopics = destinationSource.getTemporaryTopics();
LOG.info("Number of Queues: " + queues.size());
LOG.info("Queues: " + queues);
LOG.info("Number of topics: " + topics.size());
LOG.info("Topics: " + topics);
for (ActiveMQTempTopic topic : tmptopics) {
LOG.info("topic: " + topic);
}
LOG.info("Number of ActiveMQTempQueue: " + tmpqueues.size());
LOG.info("ActiveMQTempQueue: " + tmpqueues);
LOG.info("Number of ActiveMQTempTopic: " + tmptopics.size());
LOG.info("ActiveMQTempTopic: " + tmptopics);
LOG.info("The queues should not be empty!"+" ,"+!queues.isEmpty());
LOG.info("The topics should not be empty!"+" ,"+ !topics.isEmpty());
LOG.info("the connection contains initial queue: " + queues+","+queues.contains(sampleQueue));
LOG.info("the connection contains initial topic: " + queues+" ,"+topics.contains(sampleTopic));
destinationSource.start();
}
/**
* ProductorListener
* @throws Exception
*/
public void productorMonitor() throws Exception{
consumerSession1 = createSession();
consumerSession2 = createSession();
producerEventSource.start();
assertConsumerEvent(2, true);
consumerSession1.close();
consumerSession1 = null;
assertConsumerEvent(1, false);
consumerSession2.close();
consumerSession2 = null;
assertConsumerEvent(0, false);
}
protected Session createSession() throws JMSException {
final String consumerText = "Consumer: " + (++consumerCounter);
LOG.info("Creating consumer: " + consumerText + " on destination: " + destination);
Session answer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
return answer;
}
protected void assertConsumerEvent(int count, boolean started) throws InterruptedException {
ProducerEvent event = waitForProducerEvent();
LOG.info("Producer count ="+count+ ", "+event.getProducerCount());
LOG.info(" Producer started"+" ="+started+" ,"+ event.isStarted());
}
protected ProducerEvent waitForProducerEvent() throws InterruptedException {
ProducerEvent answer = eventQueue.poll(100000, TimeUnit.MILLISECONDS);
LOG.info("Should have received a consumer event!"+" ,"+ (answer != null));
return answer;
}
/**
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
DestinationSourceMonitor monitor=new DestinationSourceMonitor();
monitor.init();
monitor.destiationSourceHasInitialDestinations();
monitor.productorMonitor();
monitor.stopBroker();
}
/**
*
* @return
* @throws Exception
*/
protected Connection createConnection() throws Exception {
return connectionFactory.createConnection();
}
/**
* activemq
* @param subject
* @return
*/
protected ActiveMQDestination createDestination(String subject) {
if (useTopic) {
return new ActiveMQTopic(subject);
} else {
return new ActiveMQQueue(subject);
}
}
/**
* Returns the name of the destination used in this test case
*/
protected String getDestinationString() {
return getClass().getName() + "." +"activemq";
}
/**
* Factory method to create a new {@link ConnectionFactory} instance
*
* @return a newly created connection factory
*/
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory(bindAddress);
}
/**
*
* @throws Exception
*/
protected void startBroker() throws Exception {
broker.start();
}
/**
*
* @throws Exception
*/
public void stopBroker() throws Exception{
broker.stop();
}
/**
* @return whether or not persistence should be used
*/
protected boolean isPersistent() {
return false;
}
/**
*
*/
public void onDestinationEvent(DestinationEvent event) {
ActiveMQDestination destination = event.getDestination();
if (event.isAddOperation()) {
LOG.info("Added: " + destination);
newDestinations.add(destination);
}
else {
LOG.info("Removed: " + destination);
newDestinations.remove(destination);
}
}
/**
*
* @throws Exception
*/
protected void init() throws Exception {
if (broker == null) {
broker = createBroker();
}
startBroker();
connectionFactory = createConnectionFactory();
destination = createDestination();
// DestinationSource
connection = (ActiveMQConnection) createConnection();
connection.start();
connection.getDestinationSource().setDestinationListener(this);
// ProducerEventSource
producerEventSource = new ProducerEventSource(connection, destination);
producerEventSource.setProducerListener(this);
producerEventSource.start();
}
protected ActiveMQDestination createDestination() {
return createDestination(getDestinationString());
}
/**
* broker
* @return
* @throws Exception
*/
protected BrokerService createBroker() throws Exception {
broker = new BrokerService();
broker.setPersistent(isPersistent());
broker.addConnector(bindAddress);
broker.setDestinations(new ActiveMQDestination[]{
sampleQueue,
sampleTopic
});
return broker;
}
/**
*
* @throws Exception
*/
protected void destory() throws Exception {
if (producerEventSource != null) {
producerEventSource.stop();
}
if (connection != null) {
connection.close();
}
}
public void onProducerEvent(ProducerEvent event) {
eventQueue.add(event);
}
}