sparkノートsparkによるクリックフローログ分析事例

2570 ワード

1.アクセスpv
package cn.itcast

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object PV {
  def main(args: Array[String]): Unit = {
        //todo: sparkconf, appName
        //todo:setMaster("local[2]") spark     2 
        val sparkConf: SparkConf = new SparkConf().setAppName("PV").setMaster("local[2]")
        //todo: SparkContext
        val sc: SparkContext = new SparkContext(sparkConf)
        //todo: 
        val file: RDD[String] = sc.textFile("d:\\data\\access.log")
        //todo: , ("pv",1)
        val pvAndOne: RDD[(String, Int)] = file.map(x=>("pv",1))
        //todo: 
         val totalPV: RDD[(String, Int)] = pvAndOne.reduceByKey(_+_)
         totalPV.foreach(println)
         sc.stop()
  }
}

2.アクセスuv

package cn.itcast

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object UV {
  def main(args: Array[String]): Unit = {
    //todo: SparkConf  SparkContext
    val sparkConf: SparkConf = new SparkConf().setAppName("UV").setMaster("local[2]")
    val sc: SparkContext = new SparkContext(sparkConf)
    //todo: 
    val file: RDD[String] = sc.textFile("d:\\data\\access.log")
    //todo: , IP 
    val ips: RDD[(String)] = file.map(_.split(" ")).map(x=>x(0))
    //todo: ip ,  ("UV",1)
    val uvAndOne: RDD[(String, Int)] = ips.distinct().map(x=>("UV",1))
    //todo: 
    val totalUV: RDD[(String, Int)] = uvAndOne.reduceByKey(_+_)
    totalUV.foreach(println)
    //todo: 
    totalUV.saveAsTextFile("d:\\data\\out")
    sc.stop()
  }
}

3.アクセスするtopN
package cn.itcast

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  *  topN
  */
object TopN {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setAppName("TopN").setMaster("local[2]")
    val sc: SparkContext = new SparkContext(sparkConf)
    sc.setLogLevel("WARN")
    // 
    val file: RDD[String] = sc.textFile("d:\\data\\access.log")
    // , ( URL,1)
    val refUrlAndOne: RDD[(String, Int)] = file.map(_.split(" ")).filter(_.length>10).map(x=>(x(10),1))
    //   --> 
    val result: RDD[(String, Int)] = refUrlAndOne.reduceByKey(_+_).sortBy(_._2,false)
    // take topN, 5 
    val finalResult: Array[(String, Int)] = result.take(5)
    println(finalResult.toBuffer)

    sc.stop()

  }

}