Spark StreamingはFlume,Mysql(FlumeベースのPullモード)を統合し,Mysqlにリアルタイムでデータを保存する
4832 ワード
クラスタの割り当ては次のとおりです.
flumeの構成は次のとおりです.
Spark Streamingプログラム
pom.xmlファイル
必要なjarパッケージ:spark-streaming-flume-sink_2.10-2.1.0.JArはクラスタで実行するにはspark-streaming-flume-sink_2.10-2.1.0.JArはFlumeのlibディレクトリの下にコピーし、SparkのjarパッケージをFlumeのlibディレクトリの下にコピーする必要がある.またmysqlドライバパッケージが必要で、クラスタの各マシンのSparkのjarsディレクトリの下でテストする:1.まずFlume 2を起動する必要がある.Spark Streamingプログラム3を起動します.ログファイルを/opt/kevin/logにコピー注意:問題をシーケンス化し、起動順序
192.168.58.11 spark01
192.168.58.12 spark02
192.168.58.13 spark03
spark :spark-2.1.0-bin-hadoop2.7
flume :apache-flume-1.7.0-bin
flumeの構成は次のとおりです.
#flume
#bin/flume-ng agent -n a1 -f conf/a1.conf -c conf -Dflume.root.logger=INFO,console
a1.channels = c1
a1.sinks = k1
a1.sources = r1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /opt/kevin/log
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 100000
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 192.168.58.11(IP )
a1.sinks.k1.port = 1234( )
# source、channel、sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Spark Streamingプログラム
package com.kk.sparkstreaming.flume
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.storage.StorageLevel
import org.apache.spark.api.java.StorageLevels
import java.sql.DriverManager
import java.sql.PreparedStatement
import java.sql.Connection
object FlumePull {
def main(args: Array[String]): Unit = {
//
// Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
//Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
// StreamingContext StreamingContext(conf: SparkConf, batchDuration: Duration)
//Duration
val sparkConf = new SparkConf().setAppName("FlumePull")
val sparkStream = new StreamingContext(sparkConf, Seconds(3)); // 3
// pull
val flumeDStream = FlumeUtils.createPollingStream(sparkStream, "192.168.58.11",1234, StorageLevels.MEMORY_AND_DISK_SER_2)
// Flume :
//e FLume Event
val data = flumeDStream.map { e =>
{
new String(e.event.getBody.array());
}
}
data.print()
val datas = data.map(line => {
val index: Array[String] = line.split(",");
val con = index(1);
(con, 1)
})
datas.print()
datas.foreachRDD(cs => {
var conn: Connection = null;
var ps: PreparedStatement = null;
try {
Class.forName("com.mysql.jdbc.Driver").newInstance();
cs.foreachPartition(f => {
conn = DriverManager.getConnection("jdbc:mysql://192.168.58.14:3306/storm?useUnicode=true&characterEncoding=utf8", "root", "kevin");
ps = conn.prepareStatement("insert into result values(?,?)");
f.foreach(s => {
ps.setString(1, s._1);
ps.setInt(2, s._2);
ps.executeUpdate();
})
})
} catch {
case t: Throwable => t.printStackTrace() // TODO: handle error
} finally {
if (ps != null) {
ps.close()
}
if (conn != null) {
conn.close();
}
}
})
sparkStream.start()
sparkStream.awaitTermination();
}
}
pom.xmlファイル
UTF-8
2.2.1
2.11.1
org.scala-lang
scala-library
${scala.version}
org.scala-lang
scala-compiler
${scala.version}
org.scala-lang
scala-reflect
${scala.version}
org.apache.spark
spark-core_2.11
${spark.version}
org.apache.spark
spark-streaming_2.11
${spark.version}
org.apache.spark
spark-sql_2.11
${spark.version}
org.apache.spark
spark-streaming-flume_2.11
2.2.2
必要なjarパッケージ:spark-streaming-flume-sink_2.10-2.1.0.JArはクラスタで実行するにはspark-streaming-flume-sink_2.10-2.1.0.JArはFlumeのlibディレクトリの下にコピーし、SparkのjarパッケージをFlumeのlibディレクトリの下にコピーする必要がある.またmysqlドライバパッケージが必要で、クラスタの各マシンのSparkのjarsディレクトリの下でテストする:1.まずFlume 2を起動する必要がある.Spark Streamingプログラム3を起動します.ログファイルを/opt/kevin/logにコピー注意:問題をシーケンス化し、起動順序