sparkノートsparkによるクリックフローログ分析事例
2570 ワード
1.アクセスpv
2.アクセスuv
3.アクセスするtopN
package cn.itcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object PV {
def main(args: Array[String]): Unit = {
//todo: sparkconf, appName
//todo:setMaster("local[2]") spark 2
val sparkConf: SparkConf = new SparkConf().setAppName("PV").setMaster("local[2]")
//todo: SparkContext
val sc: SparkContext = new SparkContext(sparkConf)
//todo:
val file: RDD[String] = sc.textFile("d:\\data\\access.log")
//todo: , ("pv",1)
val pvAndOne: RDD[(String, Int)] = file.map(x=>("pv",1))
//todo:
val totalPV: RDD[(String, Int)] = pvAndOne.reduceByKey(_+_)
totalPV.foreach(println)
sc.stop()
}
}
2.アクセスuv
package cn.itcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object UV {
def main(args: Array[String]): Unit = {
//todo: SparkConf SparkContext
val sparkConf: SparkConf = new SparkConf().setAppName("UV").setMaster("local[2]")
val sc: SparkContext = new SparkContext(sparkConf)
//todo:
val file: RDD[String] = sc.textFile("d:\\data\\access.log")
//todo: , IP
val ips: RDD[(String)] = file.map(_.split(" ")).map(x=>x(0))
//todo: ip , ("UV",1)
val uvAndOne: RDD[(String, Int)] = ips.distinct().map(x=>("UV",1))
//todo:
val totalUV: RDD[(String, Int)] = uvAndOne.reduceByKey(_+_)
totalUV.foreach(println)
//todo:
totalUV.saveAsTextFile("d:\\data\\out")
sc.stop()
}
}
3.アクセスするtopN
package cn.itcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* topN
*/
object TopN {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setAppName("TopN").setMaster("local[2]")
val sc: SparkContext = new SparkContext(sparkConf)
sc.setLogLevel("WARN")
//
val file: RDD[String] = sc.textFile("d:\\data\\access.log")
// , ( URL,1)
val refUrlAndOne: RDD[(String, Int)] = file.map(_.split(" ")).filter(_.length>10).map(x=>(x(10),1))
// -->
val result: RDD[(String, Int)] = refUrlAndOne.reduceByKey(_+_).sortBy(_._2,false)
// take topN, 5
val finalResult: Array[(String, Int)] = result.take(5)
println(finalResult.toBuffer)
sc.stop()
}
}