分散メッセージ中間件(一)——ActiveMQポイントメッセージモード

5262 ワード

一、ダウンロード実行1、公式サイトダウンロード     Windows版:apache-activemq-513.3-bin.zip     Linux版:apache-activemq-513.3-bin.tar.gz 2、Windowsではオペレーティングシステムの桁数に応じてactivemq.batファイルを実行し、ActiveMQサービスを開始します。    注意:Java環境変数を先に設定してください。さもなければ起動します。3、アクセスhttp://localhost:8161/admin MQサービス正常アクセス二、ActiveMQポイントメッセージモード    ActiveMQポイント対点モードクライアントプログラムを作成します。1、activemq-all-51.1.jarのバッグを導入する
2、ニュース生産者
/**
 *      
 * @author Max
 *
 */
public class JMSProducer {

	private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; //         
	private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; //        
	private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; //        
	private static final int SENDNUM=10; //        
	
	public static void main(String[] args) {
		
		ConnectionFactory connectionFactory; //     
		Connection connection = null; //   
		Session session; //               
		Destination destination; //       
		MessageProducer messageProducer; //      
		
		//        
		connectionFactory=new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);
		
		try {
			connection=connectionFactory.createConnection(); //           
			connection.start(); //     
			session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); //   Session;       3 
			destination=session.createQueue("FirstQueue1"); //       
			messageProducer=session.createProducer(destination); //        
			sendMessage(session, messageProducer); //     
			session.commit();
		} catch (Exception e) {
			e.printStackTrace();
		} finally{
			if(connection!=null){
				try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}
	
	/**
	 *     
	 * @param session
	 * @param messageProducer
	 * @throws Exception
	 */
	public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{
		for(int i=0;i<JMSProducer.SENDNUM;i++){
			TextMessage message=session.createTextMessage("ActiveMQ      "+i);
			System.out.println("    :"+"ActiveMQ      "+i);
			messageProducer.send(message);
		}
	}
3、メッセージ消費者
/**
 *       
 * @author Max
 *
 */
public class JMSConsumer {

	private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; //         
	private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; //        
	private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; //        

	public static void main(String[] args) {
		ConnectionFactory connectionFactory; //     
		Connection connection = null; //   
		Session session; //               
		Destination destination; //       
		//          ,
		MessageConsumer messageConsumer; //       

		//        
		connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME,
				JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);

		try {
			connection = connectionFactory.createConnection(); //           
			connection.start(); //     
			session = connection.createSession(Boolean.FALSE,
					Session.AUTO_ACKNOWLEDGE); //   Session
			destination = session.createQueue("FirstQueue1"); //          ;               。
			messageConsumer = session.createConsumer(destination); //        
			while (true) {
				TextMessage textMessage = (TextMessage) messageConsumer
						.receive(100000);//1s   1   
				if (textMessage != null) {
					System.out.println("     :" + textMessage.getText());
				} else {
					break;
				}
			}
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}
Linsnerモニター:
public class JMSConsumer2 {
	private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; //         
	private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; //        
	private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; //        
	
	public static void main(String[] args) {
		ConnectionFactory connectionFactory; //     
		Connection connection = null; //   
		Session session; //               
		Destination destination; //       
		MessageConsumer messageConsumer; //       
		
		//        
		connectionFactory=new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD, JMSConsumer2.BROKEURL);
				
		try {
			connection=connectionFactory.createConnection();  //           
			connection.start(); //     
			session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); //   Session
			destination=session.createQueue("FirstQueue1");  //          
			messageConsumer=session.createConsumer(destination); //        
			//        ,    ,           
			messageConsumer.setMessageListener(new Listener()); //       
		} catch (JMSException e) {
			e.printStackTrace();
		} 
	}
}

/**
 *     
 * @author Administrator
 *
 */
public class Listener implements MessageListener{

	@Override
	public void onMessage(Message message) {
		// TODO Auto-generated method stub
		try {
			System.out.println("     :"+((TextMessage)message).getText());
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}}