MQ study

13883 ワード

一、簡単に述べる
EJB2.0より,メッセージ駆動のEJB,略称MDB(message driven bean)を導入した.MOMがメッセージを受信すると、このようなBeanに自動的に伝達される.事件と同じ理屈を動かす.注意:ドライバを受信するだけです.
EJB2.1,MDBはまた強化され,例えば,MailMessage,SMSMSMessage,SOAPMessageのような非JMSメッセージを処理できるようになった.
二、例
前章のソースコードを参照してください.
(1)MDBQueueBean.JAvaこれはEJBで、他のHome、Remoteなどのインタフェースは必要ありません.
(2)ejb-jar.xml EJB説明.他のタイプのEJBの説明とは大きく違います.
(3)jboss.xml JBOSS配備の説明.
配置する必要があるのは以上の3つのファイルだけです.myMDB.を参照jar
(4)MDBClient.JAvaこれはクライアントプログラムで、情報を送信するためのもので、実行後、サービス側のMDBはメッセージを受信して応答することができます.
三、MDBの違い:
プログラミングでは、通常のプログラムのJMSと比較して、MDBは初期化時にメッセージターゲットの位置決めとリスニングの登録の文が少なくなっただけです.
      QueueReceiver queueReceiver = queueSession.createReceiver("queue/testQueue");
      queueReceiver.setMessageListener(this);
感覚:バージョンは面倒ですね.『J 2 EE応用開発詳細』(電子工業出版社、飛思科学技術監督)を買ったが、中の例は通じず、主に配置ファイルで、まるでコンパイルにも通じない.
一日余り触って、やっとjbossのウェブサイトで助けのファイルを見つけました:救いの星です:
http://docs.jboss.org/jbossas/admindevel326/html/ch6.chapt.html#d0e11155
しかし、私の環境の中で、私はまだ少し修正しなければなりません.ソースコードを参照...
添付:
(1)MDBQueueBean.java

package study.jms;

import javax.ejb.MessageDrivenBean;
import javax.jms.MessageListener;
import javax.ejb.MessageDrivenContext;
//import javax.ejb.CreateException;
import javax.jms.Message;

/**add by zengabo */
import javax.ejb.EJBException;
import javax.naming.InitialContext;
import javax.naming.Context;
import javax.naming.NamingException;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import java.util.HashMap;
//import java.util.Hashtable;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueSender;
import javax.jms.TextMessage;

public class MDBQueueBean
    implements MessageDrivenBean, MessageListener {
  MessageDrivenContext messageDrivenContext;

  /** add by zengabo */
  private Context myContext;
  private QueueConnectionFactory queueConnectionFactory;
  private String queueConnectionFactoryName = "java:comp/env/jms/QCF";
  private QueueConnection queueConnection;
  private QueueSession queueSession;
  private int i_count = 0 ;

  public MDBQueueBean(){

  }
  public void ejbCreate() { //throws CreateException {
    System.out.println(" ejbCreate() is time: "+Integer.toString(++i_count));
    try{
       init();
    }catch (Exception e) {
      System.out.println(" init QueueMDB error"+e.toString());
      throw new EJBException(" init QueueMDB error", e);
    }
 }
  private void init() throws JMSException, NamingException {
    myContext =  new InitialContext();
    /** lookup 1 */
    Object obj = myContext.lookup(queueConnectionFactoryName);
    queueConnectionFactory = (QueueConnectionFactory) obj;
    queueConnection = queueConnectionFactory.createQueueConnection();
    queueConnection.start();
    queueSession = queueConnection.createQueueSession(false,QueueSession.AUTO_ACKNOWLEDGE);
    /** in MDB , these are not use .
      QueueReceiver queueReceiver =null;
      queueReceiver = queueSession.createReceiver("queue/testQueue");
      queueReceiver.setMessageListener(this);
     *
     * */
  }

  public void ejbRemove() {
      this.messageDrivenContext = null ;
      try{
      if (queueSession != null) {
         queueSession.close();
       }
       if (queueConnection != null) {
         queueConnection.close();
       }
     }catch (Exception e) {
    }
  }

  public void onMessage(Message message) {
    if (message instanceof javax.jms.BytesMessage) {
      javax.jms.BytesMessage bytesMessage = (javax.jms.BytesMessage) message;
      /** @todo Process bytesMessage here */
      System.out.println("**** get bytesMessage:") ;
      System.out.println(bytesMessage.toString()) ;
    }
    else if (message instanceof javax.jms.MapMessage) {
      javax.jms.MapMessage mapMessage = (javax.jms.MapMessage) message;
      /** @todo Process mapMessage here */
      System.out.println("**** get mapMessage:") ;
      System.out.println(mapMessage.toString()) ;
    }
    else if (message instanceof javax.jms.ObjectMessage) {
      javax.jms.ObjectMessage objectMessage = (javax.jms.ObjectMessage) message;
      /** @todo Process objectMessage here */
      System.out.println("**** get ObjectMessage:") ;
      HashMap hm = null;
      try {
        hm = (HashMap) objectMessage.getObject();
      }
      catch (JMSException ex2) {
        System.out.println(ex2.toString());
      }
      System.out.println(hm.get("name")) ;
      System.out.println(hm.get("age")) ;
      System.out.println(hm.get("sex")) ;
    }
    else if (message instanceof javax.jms.StreamMessage) {
      javax.jms.StreamMessage streamMessage = (javax.jms.StreamMessage) message;
      /** @todo Process streamMessage here */
      System.out.println("**** get StreamMessage:") ;
      System.out.println(streamMessage.toString()) ;
    }
    else if (message instanceof javax.jms.TextMessage) {
      javax.jms.TextMessage objectMessage = (javax.jms.TextMessage) message;
      /** @todo Process textMessage here */
      String rev = "";
      System.out.println("**** get textMessage:") ;
      try {
        rev =  objectMessage.getText();
        System.out.println(rev);
      }
      catch (JMSException ex1) {
        System.out.println(ex1.toString());
      }
    }
    else{
      System.out.println("**** Message is unknow type") ;
      return ;
    }
    /**   */
    try{
      Queue dest = (Queue) message.getJMSReplyTo();
      sendReply(" jboss recv ok", dest);
    }catch(Exception ee){
      System.out.println("**** Reply Message Error"+ee.toString()) ;
    }
  }
  private void sendReply(String text, Queue dest) throws JMSException {
    QueueSender sender = queueSession.createSender(dest);
    TextMessage tm = queueSession.createTextMessage(text);
    sender.send(tm);
    sender.close();
  }

  public void setMessageDrivenContext(MessageDrivenContext messageDrivenContext) {
    this.messageDrivenContext = messageDrivenContext;
  }
}

(2)ejb-jar.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE ejb-jar PUBLIC "-//Sun Microsystems, Inc.//DTD Enterprise JavaBeans 2.0//EN" "http://java.sun.com/dtd/ejb-jar_2_0.dtd">
<ejb-jar>
  <display-name>myMDB</display-name>
  <enterprise-beans>
    <message-driven>
      <ejb-name>MDBQueue</ejb-name>
      <ejb-class>study.jms.MDBQueueBean</ejb-class>
      <transaction-type>Container</transaction-type>
      <message-driven-destination>
        <destination-type>javax.jms.Queue</destination-type>
      </message-driven-destination>
      <resource-ref>
        <description>test mdb</description>
        <res-ref-name>jms/QCF</res-ref-name>
        <res-type>javax.jms.QueueConnectionFactory</res-type>
        <res-auth>Container</res-auth>
      </resource-ref>
    </message-driven>
  </enterprise-beans>
  <assembly-descriptor>
    <container-transaction>
      <method>
        <ejb-name>MDBQueue</ejb-name>
        <method-name>*</method-name>
      </method>
      <trans-attribute>Required</trans-attribute>
    </container-transaction>
  </assembly-descriptor>
</ejb-jar>

(3)jboss.xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE jboss PUBLIC "-//JBoss//DTD JBOSS 3.2//EN" "http://www.jboss.org/j2ee/dtd/jboss_3_2.dtd">
<jboss>
  <enterprise-beans>
    <message-driven>
      <ejb-name>MDBQueue</ejb-name>
      <destination-jndi-name>queue/B</destination-jndi-name>
      <configuration-name>Standard Message Driven Bean</configuration-name>
      <resource-ref>
        <res-ref-name>jms/QCF</res-ref-name>
        <jndi-name>ConnectionFactory</jndi-name>
      </resource-ref>
    </message-driven>
  </enterprise-beans>
</jboss>

(4)MDBClient.java

package study.jms;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import EDU.oswego.cs.dl.util.concurrent.CountDown;

import java.util.Hashtable;
import javax.naming.Context;

/**
 *  A complete JMS client example program that sends N TextMessages to
 *  a Queue B and asynchronously receives the messages as modified by
 *  TextMDB from Queue A.
 *
 *  @author [email protected]
 *  @version $Revision: 1.10 $
 */
public class MDBClient
{
    static final int N = 10;
    static CountDown done = new CountDown(N);

    QueueConnection conn;
    QueueSession session;
    Queue queA;
    Queue queB;

    public static class ExListener
        implements MessageListener
    {
        public void onMessage(Message msg)
        {
            done.release();
            TextMessage tm = (TextMessage) msg;
            try {
                System.out.println("onMessage, recv text="+tm.getText());
            } catch(Throwable t) {
                t.printStackTrace();
            }
        }
    }

    public void setupPTP()
        throws JMSException, NamingException
    {
//        InitialContext iniCtx =  new InitialContext();
        InitialContext iniCtx =  getInitialContext();
        Object tmp = iniCtx.lookup("ConnectionFactory"); //java:/XAConnectionFactory ConnectionFactory
        QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
        conn = qcf.createQueueConnection();
        queA = (Queue) iniCtx.lookup("queue/A");
        queB = (Queue) iniCtx.lookup("queue/B");
        session = conn.createQueueSession(false,
                                          QueueSession.AUTO_ACKNOWLEDGE);
        conn.start();
    }
    private InitialContext getInitialContext() throws NamingException {
      Hashtable environment = new Hashtable();
      environment.put(Context.INITIAL_CONTEXT_FACTORY,
                      "org.jnp.interfaces.NamingContextFactory");
      environment.put(Context.URL_PKG_PREFIXES,
                      "org.jboss.naming:org.jnp.interfaces");
      environment.put(Context.PROVIDER_URL, "jnp://localhost:1099");
      return new InitialContext(environment);
    }

    public void sendRecvAsync(String textBase)
        throws JMSException, NamingException, InterruptedException
    {
        System.out.println("Begin sendRecvAsync");

        // Setup the PTP connection, session
        setupPTP();

        // Set the async listener for queA
        QueueReceiver recv = session.createReceiver(queA);
        recv.setMessageListener(new ExListener());

        // Send a few text msgs to queB
        QueueSender send = session.createSender(queB);

        for(int m = 0; m < 10; m ++) {
            TextMessage tm = session.createTextMessage(textBase+"#"+m);
            tm.setJMSReplyTo(queA);
            send.send(tm);
            System.out.println("sendRecvAsync, sent text="+tm.getText());
        }
        System.out.println("End sendRecvAsync");
    }

    public void stop()
        throws JMSException
    {
        conn.stop();
        session.close();
        conn.close();
    }
    public static void main(String args[])
        throws Exception
    {
        System.out.println("Begin MDBClient,now=" +
                           System.currentTimeMillis());
        MDBClient client = new MDBClient();
        client.sendRecvAsync("A text msg");
        client.done.acquire();
        client.stop();
        System.exit(0);
        System.out.println("End MDBClient");
    }

}