【オリジナル】JMS生産者と消費者【PTP同時受信メッセージ】

10201 ワード

一般手順:
  • JMS接続工i工場を要求した.
  • は、接続ファクトリで接続を作成します.
  • JMS接続を開始します.
  • 接続によってセッションが作成されます.
  • ターゲットを取得します.
  • 生産者を作成する、またはa.生産者を作成し、b.JMSメッセージを作成し、ターゲット
  • に送信する.
  • 消費者を作成するか、a.消費者を作成し、b.メッセージリスナーを登録する.
  • メッセージを送信または受信します.
  • は、すべてのリソース(接続、セッション、生産者、消費者など)を閉じます.

  •  
    まずActiveMQバックグラウンドにログインしてTestQueueとしてキューを作成します.
    ..省略
    生産者の作成:
    package com.thunisoft.jms.mine;
    
    
    
    import java.util.HashMap;
    
    
    
    import javax.jms.Connection;
    
    import javax.jms.ConnectionFactory;
    
    import javax.jms.DeliveryMode;
    
    import javax.jms.Destination;
    
    import javax.jms.MessageProducer;
    
    import javax.jms.ObjectMessage;
    
    import javax.jms.Session;
    
    
    
    import org.apache.activemq.ActiveMQConnection;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    
    
    /**
    
     * JMS 
    
     * 
    
     * @author zhangxsh
    
     * 
    
     */
    
    public class Producer {
    
    
    
        /**
    
         * @param args
    
         */
    
        public static void main(String[] args) {
    
            ConnectionFactory connectionFactory;
    
            Connection connection = null;
    
            Session session;
    
            Destination destination;
    
            MessageProducer producer;
    
            connectionFactory = new ActiveMQConnectionFactory(
    
                    ActiveMQConnection.DEFAULT_USER,
    
                    ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
    
            try {
    
                //  
    
                connection = connectionFactory.createConnection();
    
                //  
    
                connection.start();
    
                //  
    
                session = connection.createSession(Boolean.TRUE,
    
                        Session.AUTO_ACKNOWLEDGE);
    
                //  
    
                destination = session.createQueue("TestQueue");
    
                //  
    
                producer = session.createProducer(destination);
    
                //  
    
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    
                for (int i = 1; i <= 10; i++) {
    
                    ObjectMessage message = session.createObjectMessage();
    
                    HashMap m = new HashMap();
    
                    m.put("key" + i, i);
    
                    message.setObject(m);
    
                    //  
    
                    System.out.println(" :" + "ActiveMq  " + i);
    
                    producer.send(message);
    
                }
    
                session.commit();
    
            } catch (Exception e) {
    
                e.printStackTrace();
    
            } finally {
    
                try {
    
                    if (null != connection)
    
                        connection.close();
    
                } catch (Throwable ignore) {
    
                }
    
            }
    
        }
    
    }

    消費者:
    package com.thunisoft.jms.mine;
    
    
    
    import javax.jms.Connection;
    
    import javax.jms.ConnectionFactory;
    
    import javax.jms.Destination;
    
    import javax.jms.MessageConsumer;
    
    import javax.jms.ObjectMessage;
    
    import javax.jms.Session;
    
    
    
    import org.apache.activemq.ActiveMQConnection;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    
    
    /**
    
     * JMS 
    
     * 
    
     * @author zhangxsh
    
     * 
    
     */
    
    public class Consumer {
    
    
    
        /**
    
         * @param args
    
         */
    
        public static void main(String[] args) {
    
            ConnectionFactory connectionFactory;
    
            Connection connection = null;
    
            Session session;
    
            Destination destination;
    
            MessageConsumer consumer;
    
            connectionFactory = new ActiveMQConnectionFactory(
    
                    ActiveMQConnection.DEFAULT_USER,
    
                    ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
    
            try {
    
                connection = connectionFactory.createConnection();
    
                connection.start();
    
                session = connection.createSession(Boolean.FALSE,
    
                        Session.AUTO_ACKNOWLEDGE);
    
                destination = session.createQueue("TestQueue");
    
                consumer = session.createConsumer(destination);
    
                while (true) {
    
                    ObjectMessage message = (ObjectMessage) consumer
    
                            .receive(100000);
    
                    if (null != message) {
    
                        System.out.println(" " + message.getObject());
    
                    } else {
    
                        break;
    
                    }
    
                }
    
            } catch (Exception e) {
    
                e.printStackTrace();
    
            } finally {
    
                try {
    
                    if (null != connection)
    
                        connection.close();
    
                } catch (Throwable ignore) {
    
                }
    
            }
    
    
    
        }
    
    
    
    }