IPアドレスに基づいて帰属地を計算する
2565 ワード
package utils
import java.sql.DriverManager
object UtilsDemo {
/**
* ip
* @param ip
* @return
*/
def ip2Long(ip:String):Long={
val fragments = ip.split("[.]")
var ipNum =0L
for(i=lines(middle)._1) && (ip <=lines(middle)._2))
return middle
if (ip< lines(middle)._1)
high = middle -1
else{
low = middle +1
}
}
-1
}
/**
* , MySQL
*/
def data2Mysql(part:Iterator[(String,Int)]): Unit ={
// jdbc
val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?serverTimezone=Asia/Shanghai&useSSL=false","root","123456")
val st = conn.prepareStatement("insert into access_log values(?,?)")
part.foreach(data=>{
st.setString(1,data._1)
st.setInt(2,data._2)
st.executeUpdate()
})
conn.close()
st.close()
}
}
package IP
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{SparkConf, SparkContext}
import utils.UtilsDemo
/**
* ip
*/
object IpTest {
def main(args: Array[String]): Unit = {
val conf =new SparkConf().setAppName("IpTest").setMaster("local[*]")
val sc =new SparkContext(conf)
// access
val lines = sc.textFile(args(0))
val access_log = lines.map(line => {
val fields = line.split("[|]")
val startNum = fields(2).toLong
val endNum = fields(3).toLong
val province = fields(6)
(startNum, endNum, province)
})
// Driver
val access = access_log.collect()
//
val broadcast: Broadcast[Array[(Long, Long, String)]] = sc.broadcast(access)
//
val lines2 = sc.textFile(args(1))
val ipToProvince = lines2.map(line => {
val fields = line.split("[|]")
val ip = fields(1)
// ip
val ipNum = UtilsDemo.ip2Long(ip)
val rules: Array[(Long, Long, String)] = broadcast.value
//
val index: Int = UtilsDemo.binarySearch(rules, ipNum)
//
var province = " "
if (index != -1) {
province = rules(index)._3
}
(province, 1)
})
val reduced = ipToProvince.reduceByKey(_+_)
//
val result = reduced.sortBy(_._2,false)
// result.collect().foreach(println(_))
result.foreachPartition(pait=>{
UtilsDemo.data2Mysql(pait)
})
//
sc.stop()
}
}