消費者側のSpring JMS接続ActiveMQ受信生産者Oozie Serverから送信されたOozie作業実行結果

14832 ワード

紹介します
OozieはHadoopワークフローサーバであり、Clientが提出したジョブ(MapReduce作業)の要求を受信し、MapReduceに提出して実行する。また、Oozieはメッセージ通知機能を実現することもできます。メッセージサーバを配置すれば、Oozie Serverはジョブの実行結果をメッセージサーバに送信することができます。Clientは関心のあるメッセージだけを購読すればいいです。具体的な配置はこの文章を参考にします。OozieはActiveMQを使ってJMS通知を実現します。
SpringはJMS関連のサービスを内蔵していますので、ここではSpringに消費者をActiveMQに接続する方法を設定し、生産者Oozieから発信されたメッセージを受信します。
 
二、生産者としてのOozie Server関連配置
これは主にこの文章のOozieでActiveMQを使ってJMS通知を実現することに言及しました。
Oozieのプロファイルoozie-default.xmlに関する構成は以下の通りです。
 <property>
        <name>oozie.jms.producer.connection.properties</name>
        <value>java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#tcp://l
ocalhost:61616;connectionFactoryNames#ConnectionFactory</value>
 </property>

<!-- JMSAccessorService -->
    <property>
        <name>oozie.service.JMSAccessorService.connectioncontext.impl</name>
        <value>
        org.apache.oozie.jms.DefaultConnectionContext
        </value>
        <description>
        Specifies the Connection Context implementation
        </description>
    </property>
 
Destinationの関連構成は、ここのDestinationはTopicであり、つまり生産者がメッセージを送る目的地であり、消費者がメッセージを取る場所でもある。
  <property>
        <name>oozie.service.JMSTopicService.topic.name</name>
        <value>
        default=${username}
        </value>
        <description>
        Topic options are ${username} or ${jobId} or a fixed string which can be specified as default or for a
        particular job type.
        For e.g To have a fixed string topic for workflows, coordinators and bundles,
        specify in the following comma-separated format: {jobtype1}={some_string1}, {jobtype2}={some_string2}
        where job type can be WORKFLOW, COORDINATOR or BUNDLE.
        e.g. Following defines topic for workflow job, workflow action, coordinator job, coordinator action,
        bundle job and bundle action
        WORKFLOW=workflow,
        COORDINATOR=coordinator,
        BUNDLE=bundle
        For jobs with no defined topic, default topic will be ${username}
        </description>
    </property>
 
三つ目は、Springに消費者の接続情報を配置する。
ここではJNDI接続ActiveMQを採用しています。接続情報の構成は以下の通りです。
<bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
        <property name="environment">
            <props>
                <prop key="java.naming.factory.initial">
                    org.apache.activemq.jndi.ActiveMQInitialContextFactory
                </prop>
                <prop key="java.naming.provider.url">
                    tcp://192.168.121.35:61616
                </prop>
                <prop key="java.naming.security.principal">
                    system
                </prop>
                <prop key="java.naming.security.credentials">
                    manager
                </prop>
            </props>
        </property>
    </bean>
 
接続工場の配置:
    <bean id="jndiTopicConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
        <property name="jndiTemplate" ref="jndiTemplate" />
        <property name="jndiName" value="ConnectionFactory" />
    </bean>
工場を結ぶvalue=「Connection Factory」をどうやって知っていますか?私はOozieのデフォルトの設定を採用していますので、Oozie公式サイトから提供された例示的なプログラムに従って、Oozieで使用している接続工場のデバッグを行います。
 
//  Oozie   JMS       , Transport Connectors
        OozieClient oc = new OozieClient("http://192.168.121.35:11000/oozie");
        JMSConnectionInfo jmsInfo = oc.getJMSConnectionInfo();

        Properties jndiProperties = jmsInfo.getJNDIProperties();
        Context jndiContext = new InitialContext(jndiProperties);
このコードはActiveMQの接続文脈に確立されています。上記のコードを調整すると次のような情報が見られます。
{java.naming.provider.url=tcp://192.168.121.35:61616, 
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory, 
connectionFactoryNames=ConnectionFactory}
 
Topicの設定
    <bean id="notifyTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="cdhfive"></constructor-arg>
    </bean>
Topicはデスティネーションですね。oozie-default.xmlの中から生産者のTopicを得て$usernameです。ここのユーザー名はcdhfiveです。だからTopicの配置は以上の通りです。
 
モニターの設定
<bean id="jmsContainer"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="topicConnectionFactory"></property>
        <property name="destinationResolver" ref="destinationResolver"></property>
        <property name="concurrentConsumers" value="2"></property>
        <property name="destination" ref="notifyTopic"></property>
        <property name="messageListener" ref="messageListener"></property>
    </bean>
①モニターがモニターを必要とするデスティネーション、つまり上に配置されたTopicです。②モニターの実現beanであり、Bean implements javax.jms.Message Listenerインターフェース
    <bean id="messageListener" class="com.schedule.tools.message.SimpleJMSReceiver" />
 
これでほとんどの配置が完了しました。
 
四、モニターMessage Listenerインターフェースを実現し、メッセージを受信する。
メッセージが購読者に向けられたとき、javax.jms.Message Listenerインターフェースのone Message()方法が自動的に呼び出され、この方法で受信したメッセージを処理することができる。
@Override
    public void onMessage(Message message) {
        String parentJobId = null;
        String jobId = null;
        String errorMessage = null;
        String status = null;
        Date startTime = null;
        Date endTime = null;
        long runTime = -1;//-1 means job run error
        
        try {
            //            
            
            if (message.getStringProperty(JMSHeaderConstants.APP_TYPE).equals(
                    AppType.WORKFLOW_JOB.name())) {
                
                WorkflowJobMessage wfJobMessage = JMSMessagingUtils
                        .getEventMessage(message);

                //      
                jobId = wfJobMessage.getId();
                errorMessage = wfJobMessage.getErrorMessage();
                status = wfJobMessage.getStatus().toString();
                startTime = wfJobMessage.getStartTime();
                endTime = wfJobMessage.getEndTime();
                
                
                if(endTime != null){
                    runTime = endTime.getTime() - startTime.getTime();
                    System.out.println(jobId + "   :" + (endTime.getTime()-startTime.getTime())/1000 + "s");                    
                }

//other code.....
 
五、参考資料
『JAVAニュースサービス』電子工業出版社
https://oozie.apache.org/docs/4.0.0/DG_JMSNotifications.