flumeとMosquittoの集積

21911 ワード

文章の由来:http://www.cnblogs.com/hark0623/p/4173714.html   転送は明記してください 
 
業務上のニーズにより、flumeがMQTTのデータを収集する必要があります。 方法はflumeカスタムsource、sourceでMQTTを購読することです。
 
flume sourceのjavaコードは以下の通りです。
package com.yhx.sensor.flume.source;



import java.util.HashMap;

import java.util.Map;



import org.apache.flume.Context;

import org.apache.flume.Event;

import org.apache.flume.EventDrivenSource;

import org.apache.flume.conf.Configurable;

import org.apache.flume.event.EventBuilder;

import org.apache.flume.source.AbstractSource;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;

import org.eclipse.paho.client.mqttv3.MqttCallback;

import org.eclipse.paho.client.mqttv3.MqttClient;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;

import org.eclipse.paho.client.mqttv3.MqttException;

import org.eclipse.paho.client.mqttv3.MqttMessage;

import org.eclipse.paho.client.mqttv3.MqttTopic;



public class MQTTSource extends AbstractSource implements EventDrivenSource,

        Configurable {

    /**

     * The initialization method for the Source. The context contains all the

     * Flume configuration info, and can be used to retrieve any configuration

     * values necessary to set up the Source.

     */

    @Override

    public void configure(Context arg0) {

        // TODO Auto-generated method stub



    }



    SimpleMqttClient client = null;



    /**

     * Start any dependent systems and begin processing events.

     */

    @Override

    public void start() {

        // TODO Auto-generated method stub

        // super.start();

        client = new SimpleMqttClient();

        client.runClient();

    }



    /**

     * Stop processing events and shut any dependent systems down.

     */

    @Override

    public void stop() {

        // TODO Auto-generated method stub

        // super.stop();

        if (client != null) {

            client.closeConn();

        }

    }



    // public static void main(String[] args) {

    // SimpleMqttClient smc = new SimpleMqttClient();

    // smc.runClient();

    // }



    public class SimpleMqttClient implements MqttCallback {



        MqttClient myClient;

        MqttConnectOptions connOpt;



        String BROKER_URL = "tcp://192.168.116.128:1883";

        String M2MIO_DOMAIN = "192.168.116.128";

        String M2MIO_STUFF = "yhx";

        String M2MIO_THING = "yhx_flume";

        // String M2MIO_USERNAME = "<m2m.io username>";

        // String M2MIO_PASSWORD_MD5 =

        // "<m2m.io password (MD5 sum of password)>";



        Boolean subscriber = true;

        Boolean publisher = false;



        /**

         * 

         * connectionLost This callback is invoked upon losing the MQTT

         * connection.

         * 

         */

        @Override

        public void connectionLost(Throwable t) {

            System.out.println("Connection lost!");

            // code to reconnect to the broker would go here if desired

        }



        public void closeConn() {

            if (myClient != null) {

                if (myClient.isConnected()) {

                    try {

                        myClient.disconnect();

                    } catch (MqttException e) {

                        // TODO Auto-generated catch block

                        e.printStackTrace();

                    }

                }

            }

        }



        /**

         * 

         * deliveryComplete This callback is invoked when a message published by

         * this client is successfully received by the broker.

         * 

         */

        @Override

        public void deliveryComplete(IMqttDeliveryToken token) {

            // System.out.println("Pub complete" + new

            // String(token.getMessage().getPayload()));

        }



        /**

         * 

         * messageArrived This callback is invoked when a message is received on

         * a subscribed topic.

         * 

         */

        @Override

        public void messageArrived(String topic, MqttMessage message)

                throws Exception {

            // System.out

            // .println("-------------------------------------------------");

            // // System.out.println("| Topic:" + topic.getName());

            // System.out.println("| Topic:" + topic);

            // System.out

            // .println("| Message: " + new String(message.getPayload()));

            // System.out

            // .println("-------------------------------------------------");





            Map<String, String> headers = new HashMap<String, String>();

            //headers.put("curDate", df.format(new Date()));

 Event flumeEvent = EventBuilder.withBody(message.getPayload(), headers); try { getChannelProcessor().processEvent(flumeEvent); } catch (Exception e) { // TODO: handle exception

 e.printStackTrace(); }



        }



        /**

         * 

         * runClient The main functionality of this simple example. Create a

         * MQTT client, connect to broker, pub/sub, disconnect.

         * 

         */

        public void runClient() {

            // setup MQTT Client

            String clientID = M2MIO_THING;

            connOpt = new MqttConnectOptions();



            connOpt.setCleanSession(true);

            connOpt.setKeepAliveInterval(3000);

            // connOpt.setUserName(M2MIO_USERNAME);

            // connOpt.setPassword(M2MIO_PASSWORD_MD5.toCharArray());



            // Connect to Broker

            try {

                myClient = new MqttClient(BROKER_URL, clientID);

                myClient.setCallback(this);

                myClient.connect(connOpt);

            } catch (MqttException e) {

                e.printStackTrace();

                System.exit(-1);

            }



            System.out.println("Connected to " + BROKER_URL);



            // setup topic

            // topics on m2m.io are in the form <domain>/<stuff>/<thing>

            String myTopic = M2MIO_DOMAIN + "/" + M2MIO_STUFF + "/"

                    + M2MIO_THING;

            System.out.println("myTopic:" + myTopic);

            MqttTopic topic = myClient.getTopic(myTopic);



            // subscribe to topic if subscriber

            if (subscriber) {

                try {

                    int subQoS = 0;

                    myClient.subscribe(myTopic, subQoS);



                } catch (Exception e) {

                    e.printStackTrace();

                }

            }



            // publish messages if publisher

            if (publisher) {

                for (int i = 1; i <= 10; i++) {

                    String pubMsg = "{\"pubmsg\":" + i + "}";

                    int pubQoS = 0;

                    MqttMessage message = new MqttMessage(pubMsg.getBytes());

                    message.setQos(pubQoS);

                    message.setRetained(false);



                    // Publish the message

                    System.out.println("Publishing to topic \"" + topic

                            + "\" qos " + pubQoS);

                    MqttDeliveryToken token = null;

                    try {

                        // publish message to broker

                        token = topic.publish(message);

                        // Wait until the message has been delivered to the

                        // broker

                        token.waitForCompletion();

                        Thread.sleep(100);

                    } catch (Exception e) {

                        e.printStackTrace();

                    }

                }

            }



            // disconnect

            try {

                // wait to ensure subscribed messages are delivered

                if (subscriber) {

                    while (true) {

                        Thread.sleep(5000);

                    }

                }

                // myClient.disconnect();

            } catch (Exception e) {

                e.printStackTrace();

            } finally {

            }

        }



    }



}
JARバッグを作るときは、クラスP-athを書いてください。次のようにします。
Manifest-Version: 1.0

Class-Path: flume-ng-configuration-1.5.2.jar flume-ng-core-1.5.2.jar flume-ng-node-1.5.2.jar flume-ng-sdk-1.5.2.jar org.eclipse.paho.client.mqttv3-1.0.0.jar
 
ヒットしたJARバッグをflumeのlibカタログに入れます。(注意してください。class-pathで説明したjarはlibに包んでください。ないなら、上に置いてください。)
 
次にflumeの配置ファイルを修正します。これを見てください。 これはUDPの監督をしているからです。
a1.sources = sourceMqtt sourceUdp

a1.sinks = sinkMqtt sinkUdp

a1.channels = channelMqtt channelUdp



# Describe/configure the source

a1.sources.sourceMqtt.type = com.yhx.sensor.flume.source.MQTTSource



# Describe the sink

a1.sinks.sinkMqtt.type = logger



# Use a channel which buffers events in memory

a1.channels.channelMqtt.type = memory

a1.channels.channelMqtt.capacity = 1000

a1.channels.channelMqtt.transactionCapacity = 100



# Bind the source and sink to the channel

a1.sources.sourceMqtt.channels = channelMqtt

a1.sinks.sinkMqtt.channel = channelMqtt







# a2.sources = sourceUdp

# a2.sinks = sinkUdp

# a2.channels = channelUdp



# Describe/configure the source

a1.sources.sourceUdp.type = syslogudp

a1.sources.sourceUdp.host = 0.0.0.0

a1.sources.sourceUdp.port = 12459

a1.sources.sourceUdp.interceptors=interceptorUdp



a1.sources.sourceUdp.interceptors.interceptorUdp.type=com.yhx.sensor.flume.intercepter.UDPIntercepter$Builder



# Describe the sink

a1.sinks.sinkUdp.type = logger



# Use a channel which buffers events in memory

a1.channels.channelUdp.type = memory

a1.channels.channelUdp.capacity = 1000

a1.channels.channelUdp.transactionCapacity = 100



# Bind the source and sink to the channel

a1.sources.sourceUdp.channels = channelUdp

a1.sinks.sinkUdp.channel = channelUdp
 
設定ファイルをflumeディレクトリの下に保存するconfをflume.com nfといいます。
そしてflume起動命令は以下の通りです。 
bin/flume-ng agent --conf conf --conf-file conf/flume.conf --name a1