Windowsの下でflume 1.8をインストールして実行します

8271 ワード

一、インストール環境とソフトウェア
1、flumeの実行にはjava環境が必要なので、flumeを実行する前にjava環境をインストールして構成する必要があります(jdkがインストールされているが、jdk環境が構成されていない場合、flumeを実行するとjava.exeが見つからないエラーが投げ出されます).
2、flume 1.8 javaランタイム環境java 1.8以上が必要です.
3、flume 1.8をダウンロードし、アドレスをダウンロードする:http://www.apache.org/dyn/closer.lua/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz; flumeをダウンロードするのはtarですから.gz、直接解凍すればいいです.4、FLUME_の構成HOME環境変数二、運転
1.flumeのconfディレクトリの下でプロファイルを作成する:example.conf
#flume-to-spark-push.conf: A single-node Flume configuration #Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1
#Describe/configure the source#Flume Sourceカテゴリをnetcatに設定し、node 3の33333ポートにバインドする#telnet node 3 333333コマンドでFlume Sourceにメッセージa 1を送信することができる.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 33333
#Describe the sink#Flume Sinkカテゴリはavroに設定、node 2にバインドされた44444ポート#Flume Sourceは収集するメッセージをFlume Sinkに集約した後、Sinkはnode 2の4444444ポート#Spark Streamingプログラムにメッセージをプッシュしてnode 2の44444ポートを傍受し続け、メッセージが到着するとSpark Streamingアプリケーションに引き取られて処理a 1を行う.sinks.k1.type = avro a1.sinks.k1.hostname = localhost a1.sinks.k1.port = 44444 #Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000000 a1.channels.c1.transactionCapacity = 1000000
a1.sources.r1.channels = c1 a1.sinks.k1.channel=c 1 2、powershellでコマンドを実行する:
flume-ng agent -conf-file %FLUME_HOME%\conf\example.conf -name a1 

注意:Linuxのコマンドとの違い---かつ-Dflumeに置き換える.root.logger=INFO、consoleはwindowでサポートされていませんLinuxで情報を印刷したい場合は-property「flume.root.logger=INFO、console」に変更する必要があります.
%FLUME_を認識できない場合HOME%は物理パスに切り替わる問題:このコマンドを実行するとまず実行できないと報告されます.ps 1ファイルのエラー時にset-executionpolicy-executionpolicy unrestrictedを実行すると、システムのセキュリティが低下して実行できる.ps 1ファイルの詳細については、以下を参照してください.https://www.cnblogs.com/bonelee/p/8043421.html次の図のように正常に動作しました:windows下安装运行flume 1.8_第1张图片
三、テスト運転cmd:telnet localhost 3333333後、Ctrl+]キーを押して、車に戻ると、入力が正常に表示されます.データの受信者はsparkstreamingプログラムを作成して検証することができます.
    package sparkstreaming
    
    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.flume._
    
    object FlumeEventCount {
      def main(args: Array[String]) {
        val host = "localhost"
        val port = 44444
    
        // Create the context and set the batch size
        val conf = new SparkConf().setAppName("FlumeEventCount").setMaster("local[2]")
        val ssc = new StreamingContext(conf, Seconds(10))
        //          。   ERROR ,  flume    ,         
        ssc.sparkContext.setLogLevel("ERROR")
    
        // Create a flume stream
        val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
    
        // Print out the count of events received from this server in each batch
        stream.map(x=>new String(x.event.getBody.array())
        ).flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_+_).print()
        ssc.start()
        ssc.awaitTermination()
      }
    }