Sparkでビジネスを処理しmysqlに書き込む
20645 ワード
データサンプル
ip.txt
ip.log
ip.log中のipはipと行う.txt中のipを比較し,log中のipホームへの書き込みを実現する
Sparkコード
ip.txt
1.0.1.0|1.0.3.255|16777472|16778239| | | | || |350100|China|CN|119.306239|26.075302
1.0.8.0|1.0.15.255|16779264|16781311| | | | || |440100|China|CN|113.280637|23.125178
1.0.32.0|1.0.63.255|16785408|16793599| | | | || |440100|China|CN|113.280637|23.125178
1.1.0.0|1.1.0.255|16842752|16843007| | | | || |350100|China|CN|119.306239|26.075302
1.1.2.0|1.1.7.255|16843264|16844799| | | | || |350100|China|CN|119.306239|26.075302
1.1.8.0|1.1.63.255|16844800|16859135| | | | || |440100|China|CN|113.280637|23.125178
1.2.0.0|1.2.1.255|16908288|16908799| | | | || |350100|China|CN|119.306239|26.075302
ip.log
20090121001933462849000|123.197.57.136|cache.tv.qq.com|/v8/live.html|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; QQDownload 1.7)||pvid=97224176; flv=9.0; icache=EFD@DAFL@; pt2gguin=o0120405284; ptcz=14c914fe9bd53c38f438aafe542b9759ec57bd8e43449b5d6e2e0f664cca84c9; SortType=1; aduid=eWpJenMS; r_cookie=9154378587; o_cookie=120405284; suid=451154856; __utma=136017777.2691260586985878000.1231497979.1231826172.1231842013.4; __utmz=136017777.1231842013.4.2.utmcsr=shuqian.qq.com
20090121001933705364000|115.120.10.152|www.f130.com|/song/6974/88207.htm|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1)|http://www.f130.com/musiclist88/singer2047.htm|__utma=261378458.884122920.1231756328.1232415113.1232465640.6; __utmz=261378458.1232465640.6.5.utmccn=(referral)
20090121001934312761000|115.120.30.116|www.56.com|/u51/v_NDA3MTEzOTI.html|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1)|http://so.56.com/index?key=%CF%B2%D1%F2%D1%F2%D3%EB%BB%D2%CC%AB%C0%C7+%C5%A3%C6%F8%B3%E5%CC%EC|geoip= -; whistoryview=38791745-40659093-40711596-40711572-40711514-40693155-40693140-40704681-40704663-40704659-38918964-40688916-40703735-40686314-40561191-40561202-40278125-40363298-40363282-40278150; pass_hex=a0725e776c7d6b72ea9714667d3c8e5f; member_login=fab76622eed31d1085a0814bf1eeaf6c; host=m49.56.com; wl_all_s=y
20090121001934691779000|115.120.10.222|www.finaleden.com|/Cartoon/Games-59/Comics-59905_cc7072ce_page-2.htm|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; CIBA)||
20090121001935224842000|117.75.234.44|mag.xunlei.com|/newyear/newyear.htm|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1)||__utma=166345655.3658587124681420000.1222262534.1232443721.1232445423.6; __utmz=166345655.1232445427.6.6.utmcsr=recommend.xunlei.com
20090121001935241752000|222.55.71.29|www.swode.com|/r/459c.html|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1)||_visitor=42c55b20-385e-102c-bffa-0015c5898473; _wid=1; _cid=22; cnzz_a1215756=1; vw1215756=%3A44215580%3A44211895%3A; sin1215756=; rtime=0; ltime=1232468365671; cnzz_eid=42673776-1232468355-
ip.log中のipはipと行う.txt中のipを比較し,log中のipホームへの書き込みを実現する
Sparkコード
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.rdd.RDD
object ipWork {
// ip long
def ip2Long(ip: String): Long = {
val fragments = ip.split("[.]")
var ipNum = 0L
for (i <- 0 until fragments.length) {
ipNum = fragments(i).toLong | ipNum << 8L
}
ipNum
}
//
def binarySearch(lst: Array[(Long, Long, String)], logIp: Long): String = {
var beginIndex = 0
var endIndex = lst.length - 1
var returnStr = " "
while (endIndex >= beginIndex) {
var minIndex = (beginIndex + endIndex) / 2
if (logIp <= lst(minIndex)._2 && logIp >= lst(minIndex)._1) {
val vals: String = lst(minIndex)._3
return vals
} else if (logIp > lst(minIndex)._2) {
beginIndex = minIndex + 1
} else {
endIndex = minIndex - 1
}
}
returnStr
}
def main(args: Array[String]): Unit = {
if (args.length != 2) {
println("Useage ipWork " )
sys.exit(1)
}
val Array(input, input2) = args
// SparkContext
val sc = sparkUtils.getSC.getSparkContext(this.getClass.getSimpleName)
val ipData: RDD[String] = sc.textFile(input) // ip.txt
val logData: RDD[String] = sc.textFile(input2) //ip.log
//
val ipProData = ipData.map(x => {
val data = x.split("\\|")
(data(2).toLong, data(3).toLong, data(6))
}).collect()
val logProData = logData.map(x => {
val ip = x.split("\\|")(1)
ip2Long(ip)
})
val value = logProData.map(x => {
binarySearch(ipProData, x)
})
val res: RDD[(String, Int)] = value.map((_, 1)).reduceByKey(_ + _).sortByKey()
//
res.foreachPartition(x => {
val url = "jdbc:mysql://localhost:3306/gmjsqoop?characterEncoding=utf-8"
var conn: Connection = null
var prestat1: PreparedStatement = null
var statement: PreparedStatement = null
try {
conn = DriverManager.getConnection(url, "xxx", "xxx")
prestat1 = conn.prepareStatement("create table if not exists province (province varchar(20),cnts int)")
prestat1.execute()
statement = conn.prepareStatement("insert into province values(?,?)")
x.foreach { case (pro, cnt) => {
statement.setString(1, pro)
statement.setInt(2, cnt)
statement.execute()
}
}
} catch {
case e: Exception => e.printStackTrace()
}
finally {
if (conn != null) conn.close()
if (prestat1 != null) prestat1.close()
if (statement != null) statement.close()
}
})
sc.stop()
}
}