Spark Streaming統合flume(一)push方式
4979 ワード
一:streaming統合flume 1)クラスタサーバmasterのapache-flume/confにflume-push-streamingを追加する.confプロファイル
2)依存pomを追加する.xml
3)ローカルideaプログラムにおけるFlumePushWC.scala
(4)ローカルテスト、ideaプログラムを先に実行する
(5)flumeの起動
(6)クラスタmasterマシンに入力:次にデータを入力し,ローカルideaオブザーバ
二:クラスタ上でstreaming統合flumeをテストする
(1)ローカルプログラムをwinにパッケージしてこのプロジェクトディレクトリに入る
(2)ローカルのjarをクラスタにコミットする(クラスタをコミットするのをためらうのはコードしかコミットできないが、中にはspark-streaming-flume.jar新聞がないので、2つの方法でコミットする)第1の方法
第2の方法:spark-streaming-flumeをローカルでダウンロードする.jar
(3)マスター下flume起動
(4)masterで起動し、データを入力telnet localhost 4444444
# Name the components on this agent
simple-agent.sources = netcat-source
simple-agent.sinks = avro-sink
simple-agent.channels = memory-channel
simple-agent.sources.netcat-source.type = netcat
# telnet localhost 44444( , )
simple-agent.sources.netcat-source.bind = localhost
simple-agent.sources.netcat-source.port = 44444
simple-agent.sinks.avro-sink.type = avro
# idea , idea 0.0.0.0( )
simple-agent.sinks.avro-sink.hostname=192.168.1.125
simple-agent.sinks.avro-sink.port=41414
simple-agent.channels.memory-channel.type = memory
simple-agent.sources.netcat-source.channels = memory-channel
simple-agent.sinks.avro-sink.channel = memory-channel
2)依存pomを追加する.xml
org.apache.spark
spark-streaming-flume_2.11
2.0.2
3)ローカルideaプログラムにおけるFlumePushWC.scala
package com.streaming.flume
import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Spark Streaming Flume
*/
object FlumePushWordCount {
def main(args: Array[String]): Unit = {
// , idea
// if(args.length != 2) {
// System.err.println("Usage: FlumePushWordCount ")
// System.exit(1)
// }
//
// val Array(hostname, port) = args
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
//TODO... SparkStreaming Flume
val flumeStream = FlumeUtils.createStream(ssc, "0.0.0.0", 41414)
flumeStream.map(x=> new String(x.event.getBody.array()).trim)
.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()
}
}
(4)ローカルテスト、ideaプログラムを先に実行する
(5)flumeの起動
bin/flume-ng agent --conf conf --conf-file ./conf/flume-push-streaming.conf --name simple-agent -Dflume.root.logger=INFO,console
(6)クラスタmasterマシンに入力:次にデータを入力し,ローカルideaオブザーバ
telnet localhost 44444
二:クラスタ上でstreaming統合flumeをテストする
(1)ローカルプログラムをwinにパッケージしてこのプロジェクトディレクトリに入る
mvn clean package -DskipTests
package com.streaming.flume
import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Spark Streaming Flume
*/
object FlumePushWordCount {
def main(args: Array[String]): Unit = {
if(args.length != 2) {
System.err.println("Usage: FlumePushWordCount ")
System.exit(1)
}
val Array(hostname, port) = args
val sparkConf = new SparkConf()//.setMaster("local[2]").setAppName("FlumePushWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
//TODO... SparkStreaming Flume
val flumeStream = FlumeUtils.createStream(ssc, hostname, port.toInt)
flumeStream.map(x=> new String(x.event.getBody.array()).trim)
.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()
}
}
(2)ローカルのjarをクラスタにコミットする(クラスタをコミットするのをためらうのはコードしかコミットできないが、中にはspark-streaming-flume.jar新聞がないので、2つの方法でコミットする)第1の方法
spark-submit \
--class com.streaming.flume.FlumePushWordCount \
--master local[2] \
--packages org.apache.spark:spark-streaming-flume_2.11:2.0.2 \
/home/wl/miooc/streaming/flume/SparkStreaingTest-1.0.jar
localhost 41414
第2の方法:spark-streaming-flumeをローカルでダウンロードする.jar
spark-submit \
--class com.streaming.flume.FlumePushWordCount \
--master local[2] \
--jars /usr/local/src/spark-hadoop2.0.2/jars/spark-streaming-flume-assembly_2.11-2.2.0.jar \
/home/wl/miooc/streaming/flume/SparkStreaingTest-1.0.jar
localhost 41414
(3)マスター下flume起動
bin/flume-ng agent --conf conf --conf-file ./conf/flume-push-streaming.conf --name simple-agent -Dflume.root.logger=INFO,console
# Name the components on this agent
simple-agent.sources = netcat-source
simple-agent.sinks = avro-sink
simple-agent.channels = memory-channel
simple-agent.sources.netcat-source.type = netcat
simple-agent.sources.netcat-source.bind = localhost
simple-agent.sources.netcat-source.port = 44444
simple-agent.sinks.avro-sink.type = avro
simple-agent.sinks.avro-sink.hostname=localhost
simple-agent.sinks.avro-sink.port=41414
simple-agent.channels.memory-channel.type = memory
simple-agent.sources.netcat-source.channels = memory-channel
simple-agent.sinks.avro-sink.channel = memory-channel
(4)masterで起動し、データを入力telnet localhost 4444444