IPアドレスに基づいて帰属地を計算する

2565 ワード

package utils

import java.sql.DriverManager

object UtilsDemo {
  /**
    *     ip       
    * @param ip
    * @return
    */
  def ip2Long(ip:String):Long={
    val fragments = ip.split("[.]")
    var ipNum =0L
    for(i=lines(middle)._1) && (ip <=lines(middle)._2))
        return middle
      if (ip< lines(middle)._1)
        high = middle -1
      else{
        low = middle +1
      }
    }
    -1
  }

  /**
    *        ,     MySQL 
    */
  def data2Mysql(part:Iterator[(String,Int)]): Unit ={
    //    jdbc   
    val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?serverTimezone=Asia/Shanghai&useSSL=false","root","123456")
    val st = conn.prepareStatement("insert into access_log values(?,?)")

    part.foreach(data=>{
      st.setString(1,data._1)
      st.setInt(2,data._2)
      st.executeUpdate()
    })
    conn.close()
    st.close()
  }

}
package IP

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{SparkConf, SparkContext}
import utils.UtilsDemo

/**
  *   ip      
  */
object IpTest {
  def main(args: Array[String]): Unit = {
    val conf =new SparkConf().setAppName("IpTest").setMaster("local[*]")
    val sc =new SparkContext(conf)

    //  access    
    val lines = sc.textFile(args(0))
    val access_log = lines.map(line => {
      val fields = line.split("[|]")
      val startNum = fields(2).toLong
      val endNum = fields(3).toLong
      val province = fields(6)
      (startNum, endNum, province)
    })
    //      Driver 
    val access = access_log.collect()
    //      
    val broadcast: Broadcast[Array[(Long, Long, String)]] = sc.broadcast(access)

    //       
    val lines2 = sc.textFile(args(1))
    val ipToProvince = lines2.map(line => {
      val fields = line.split("[|]")
      val ip = fields(1)
      // ip      
      val ipNum = UtilsDemo.ip2Long(ip)
      val rules: Array[(Long, Long, String)] = broadcast.value
      //          
      val index: Int = UtilsDemo.binarySearch(rules, ipNum)

      //        
      var province = "    "
      if (index != -1) {
        province = rules(index)._3
      }
      (province, 1)
    })
    val reduced = ipToProvince.reduceByKey(_+_)
    //  
    val result = reduced.sortBy(_._2,false)
  //  result.collect().foreach(println(_))

    result.foreachPartition(pait=>{
        UtilsDemo.data2Mysql(pait)
    })
    //    
    sc.stop()
  }
}