flume+kafka+stormの統合を簡単にテスト

12093 ワード

Flume/kafka/stormを統合する方法は、ログファイルを収集するために導入され、最終的にstormにログを転送して分析します.stormの解析方法は後述するが,ここでは集積方法のみを論じる.以下は具体的な手順とテスト方法である:1.各サーバでzookeeper/kafka/stormをそれぞれ起動し、[hadoop@master apache-flume-1.5.2-bin]$ jps1926 QuorumPeerMain3659 Kafka3898 Jps3787 core3726 nimbus3838 supervisor[hadoop@slave1 kafka_2.9.2-0.8.1.1]$jps 16068 Kafka 5637 DataNode 16192 Jps 16135 supervisor 7851 QuarumPeerMaihttp://10.9.16.91:8080/index.htmlstormの情報を表示できます.2.Kafkaトピック(Topic)をKafkaディレクトリの下に作成します:$bin/kafka-topics.sh--create--zookeeper 10.9.16.91:2181--replication-factor 3--partitions 1--topic mykafkaステータスを表示:[hadoop@master kafka_2.9.2-0.8.1.1]$ bin/kafka-topics.sh--describe--zookeeper 10.9.16.91:2181 Topic:mykafka PartitionCount:1 ReplicationFactor:1 Configs:Topic:mykafka Partition:0 Leader:0 Replicas:0 Isr:0説明:
partition
同じtopicの下で複数のpartitionを設定し、topicの下のmessageを異なるpartitionの下に格納し、並列性を向上させることを目的としています.
leader
このpartitionの読み書き操作を担当し、各brokerがpartitionのリーダーになる可能性があります.
replicas
コピー、すなわちこのpartitionは、brokerが生存しているかどうかにかかわらず、どのbrokerにバックアップがありますか?
isr
生き残ったreplicas
Kafkaのテストに成功したかどうか:slave 1で、生産者を開始し、いくつかのデータを書き込みます:[hadoop@slave1 kafka_2.9.2-0.8.1.1]$ bin/kafka-console-producer.sh --broker-list master:9092 --sync --topic testtopicSLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.dsadfasdfdfasdf^H^H中国語のバーマスターで、消費者が対応する情報を受け取ることができるかどうか:[hadoop@master kafka_2.9.2-0.8.1.1]$ bin/kafka-console-consumer.sh --zookeeper master:2181 --topic testtopic --from-beginningSLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". ♪slf 4 j*.JArはkafkaの下のlibの下に入れて、このエラーSLF 4 J:Defaulting to no-operation(NOP)logger implementationSLF 4 J:Seeを提示しませんhttp://www.slf4j.org/codes.html#StaticLoggerBinder for further details.jf dskkdsadfasdfdfasdf中国語のバー以上は正常です.3.KafkaとStormの統合:Kafka Spouttest.JAvaファイルの内容:
package cn.logme.storm.kafka;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
 
public class KafkaSpouttest implements IRichSpout {
    private SpoutOutputCollector collector;
    private ConsumerConnector consumer;
    private String topic;
 
    public KafkaSpouttest() {}
     
    public KafkaSpouttest(String topic) {
        this.topic = topic;
    }
 
    public void nextTuple() {    }
 
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }
 
    public void ack(Object msgId) {    }
 
    public void activate() {         
        consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); 
        Map<String,Integer> topickMap = new HashMap<String, Integer>();  
        topickMap.put(topic, 1);  
 
        System.out.println("*********Results********topic:"+topic);  
 
        Map<String, List<KafkaStream<byte[],byte[]>>>  streamMap=consumer.createMessageStreams(topickMap);  
        KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);  
        ConsumerIterator<byte[],byte[]> it =stream.iterator();   
        while(it.hasNext()){  
             String value =new String(it.next().message());
             SimpleDateFormat formatter = new SimpleDateFormat   ("yyyy MM dd  HH:mm:ss SSS");  
             Date curDate = new Date(System.currentTimeMillis());//        
             String str = formatter.format(curDate);   
             System.out.println("storm kafka ------->" + value);
             collector.emit(new Values(value,1,str), value);
        }  
    }
     
    private static ConsumerConfig createConsumerConfig() {  
        Properties props = new Properties();  
        //  zookeeper 
        props.put("zookeeper.connect","master:2181,slave1:2181,slave2:2181");  
        //  group id
        props.put("group.id", "1");  
        // kafka group  zookeeper ,  zookeeper ,  
        props.put("auto.commit.interval.ms", "1000");
        props.put("zookeeper.session.timeout.ms","10000");  
        return new ConsumerConfig(props);  
    }  
 
    public void close() {    }
 
    public void deactivate() {    }
 
    public void fail(Object msgId) {    }
 
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word","id","time"));
    }
 
    public Map<String, Object> getComponentConfiguration() {
        System.out.println("getComponentConfiguration ");
        topic="testtopic";
        return null;
    }
}

もう一つ作ろうjava:
package cn.logme.storm.kafka;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
 
public class KafkaTopologytest {
 
    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new KafkaSpouttest(""), 1);
        builder.setBolt("bolt1", new Bolt1(), 2).shuffleGrouping("spout");
        builder.setBolt("bolt2", new Bolt2(), 2).fieldsGrouping("bolt1",new Fields("word"));
 
        Map conf = new HashMap();
        conf.put(Config.TOPOLOGY_WORKERS, 1);
        conf.put(Config.TOPOLOGY_DEBUG, true);
 
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("my-flume-kafka-storm-topology-integration", conf, builder.createTopology());
         
        //Utils.sleep(1000*60*5); // local cluster test ...
        //cluster.shutdown(); // 
    }
     
    public static class Bolt1 extends BaseBasicBolt {
        
        private static final long serialVersionUID = 1L;
        public void execute(Tuple input, BasicOutputCollector collector) {
            try {
                String msg = input.getString(0);
                int id = input.getInteger(1);
                String time = input.getString(2);
                msg = msg+"bolt1";
                System.out.println(" 1 -------[arg0]:"+ msg +"---[arg1]:"+id+"---[arg2]:"+time+"------->"+msg);
                if (msg != null) {
                    collector.emit(new Values(msg));
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
  
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
    }
     
    public static class Bolt2 extends BaseBasicBolt {
        private static final long serialVersionUID = 1L;
        Map<String, Integer> counts = new HashMap<String, Integer>();
  
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String msg = tuple.getString(0);
            msg = msg + "bolt2";
            System.out.println(" 2 ---------->"+msg);
            collector.emit(new Values(msg,1));
        }
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word", "count"));
        }
    }
}

以上をKafka_としてエクスポートStorm.JArファイルをサーバに配置します.(私はここではmvn方式を使用しません)Kafkaの中のいくつかのjarパッケージをstormに入れて、主にkafka_*.jar, metrics-core*.jar, scala-library*.JAr,実行コマンド:$/home/hadoop/apache-storm-0.9.3/bin/storm jar~/Kafka_Storm.jar cn.logme.storm.kafka.KafkaTopologytest4. Flameディレクトリの下でFlameを起動する:プロファイルを作成する:(flumeng-kafka-plugin.jarというファイルをlibディレクトリに先に置く)
[hadoop@master apache-flume-1.5.2-bin]$ vi conf/kafka-flume.properties 

producer.sources = s
producer.channels = c
producer.sinks = r

producer.sources.s.channels = c
producer.sources.s.type= exec
producer.sources.s.command = tail -f -n+1 /home/hadoop/logs.log
#####producer.sources.s.bind= 10.9.16.91
#####producer.sources.s.port= 44444

############producer.sinks.r.type = cn.logme.storm.kafka.KafkaSink
producer.sinks.r.type = org.apache.flume.plugins.KafkaSink
producer.sinks.r.metadata.broker.list=10.9.16.91:9092
producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
producer.sinks.r.request.required.acks=1
producer.sinks.r.max.message.size=1000000
producer.sinks.r.custom.topic.name=testtopic
producer.sinks.r.channel = c

producer.channels.c.type = memory
producer.channels.c.capacity = 1000

また、kafka下libsの3つのファイル:kafka-*を必要とするか否かによって異なる.jar   scala-library-*.jar   metrics-core-*.jar  flumeng-kafka-plugin.JArはflumeのlibの下にコピーします.(注:KafkaSinkはflume-pluginに組み込まれており、jarパッケージに独自にコンパイルする必要はありません)クライアントを起動します:$bin/flume-ng agent--conf--conf-file conf/kafka-flume.properties --name producer -Dflume.root.logger=INFO,console5.コマンドおよびテストの実行:順序を実行する場合は、zookeeper/storm(nimbus/ui/supervisor)を起動し、kafka-server-start、kafka-console-producer、kafka-console-producer、storm jar結果コマンドセット:1.#予備2.bin/flume-ng agent -c conf -f conf/kafka-flume.properties -n producer -Dflume.root.logger=INFO,console3. apache-storm-0.9.3/bin/storm jar Kafka_Storm.jar cn.logme.storm.kafka.KafkaTopologytest4. bin/kafka-server-start.sh config/server.properties5. bin/kafka-console-producer.sh --broker-list 10.9.16.91:9092 --topic testtopic6. bin/kafka-console-consumer.sh --zookeeper 10.9.16.91:2181 --topoic testtopic --from-beginnign7.(92)bin/kafka-server-start.sh kafka_2.9.2-0.8.1.1/config/server.properties6.統合のテスト:[hadoop@master~]$echo"4444 444 44 44433はどこで行けばいいのか22222222">>logs.ロゴは各端末に表示され、Flumeを経てデータが生成され、kafkaの生産者、消費者に処理され、最終的にはStormによって表示される.問題のトラブルシューティング:a、jarロードエラーが発生しました:対応するjarパッケージを対応するlibフォルダにコピー(できればソフトチェーン)します.b、stormとkafkaの統合時にstorm jarコマンドプロンプト:backtypeを実行する.storm.util - Async loop died!java.lang.NoClassDefFoundError:org/apache/zookeeper/Watcher解決:zookeeperディレクトリの下のzookeeper-3.4.5.JArをStormのlibディレクトリにコピーすると、正常になります.c、kafkaはbin/kafka-console-consumerを起動する.sh--zookeeper 10.9.16.91:2181--topic testtopicの場合、ヒント:SLF 4 J:Failed to load class"org.slf 4 j.impl.StaticLoggerBinder".解決:http://www.slf4j.org/download.html対応するパッケージをダウンロードします.デフォルトのflumeに含まれるslf 4 j-log 4 j 12-1.6.1.JArファイル、後の「12」は1.2版を表し、「1.6」のJAVAに基づいている.
 
 
このリンクは次のとおりです.http://www.logme.cn/blog/29/test_flume+kafka+storm/