133.Spark大手電子商取引プロジェクト-ユーザーの活躍度分析モジュール-指定時間内に最もアクセス数の多い10人のユーザーを統計する


この記事では,指定された時間内に最もアクセス数の多い10人のユーザを統計し,SparkSessionを用いてデータを解析する.
コード#コード#
UserActiveDegreeAnalyze.scala 
package graduation.scala.spark

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
  * FileName: UserActiveDegreeAnalyze
  * Author:   hadoop
  * Email:    [email protected]
  * Date:     19-8-4   11:31
  * Description:
  *
  *        
  *
  *            ,  spark             ,  scala+eclipse      
  *
  *      ,       java   spark  ,  hadoop          ,hadoop mapreduce、hbase,   java
  *                  ,             
  *
  *        ,        , spark      ,spark scala  ,    spark    scala
  *            ,       scala    
  *
  */
object UserActiveDegreeAnalyze {

  case class UserActionLog(logId: Long, userId: Long, actionTime: String, actionType: Long, purchaseMoney: Double)
  case class UserActionLogVO(logId: Long, userId: Long, actionValue: Long)
  case class UserActionLogWithPurchaseMoneyVO(logId: Long, userId: Long, purchaseMoney: Double)

  def main(args: Array[String]) {
    //             ,              ,    mysql          (java web           ,  java web       mysql )
    //         ,          ,        
    //              ,        
    val startDate = "2019-08-01";
    val endDate = "2019-09-01";

    //      
    // spark 2.0          ,          ,      ,     
    //                  ,       spark 2.0     
    val conf  = new SparkConf().setAppName("UserActiveDegreeAnalyze").setMaster("local[2]")
    /*val spark = SparkSession
      .builder()
      .appName("UserActiveDegreeAnalyze")
      .master("local")
      .config("spark.sql.warehouse.dir", "D:\\test\\spark\\mall\\spark-warehouse")
      .getOrCreate()*/
    val  spark = SparkSession.builder().config(conf).getOrCreate()
    val sc = spark.sparkContext
    //   spark     
    import spark.implicits._
    //   spark sql functions
    import org.apache.spark.sql.functions._

    val  dataPath:String ="/home/hadoop/IdeaProjects/BigDataGraduationProject/log/"
    //        
    val userBaseInfo = spark.read.json(dataPath+"user_base_info.json")
    val userActionLog = spark.read.json(dataPath+"user_action_log.json")

    //      :                 10   
    //   :        ,      ,pm    ,    100 ~1000   ,       
    //{"logId": 00,"userId": 0, "actionTime": "2019-08-05 22:32:36", "actionType": 0, "purchaseMoney": 0.0}
    //{"userId": 0, "username": "user0", "registTime": "2019-08-05 22:32:36"}
        userActionLog
            //    :    ,            
            .filter("actionTime >= '" + startDate + "' and actionTime <= '" + endDate + "' and actionType = 0")
            //    :             
            .join(userBaseInfo, userActionLog("userId") === userBaseInfo("userId"))
            //    :    ,  userid username
            .groupBy(userBaseInfo("userId"), userBaseInfo("username"))
            //    :    
            .agg(count(userActionLog("logId")).alias("actionCount"))
            //    :    
            .sort($"actionCount".desc)
            //    :       
            .limit(10)
            //    :    ,      ,        mysql
            .show()

    sc.stop()
    spark.stop()

  }

}

実行結果
+------+----------+-----------+
|userId|  username|actionCount|
+------+----------+-----------+
|    63|userName63|        212|
|     4| userName4|        210|
|    17|userName17|        210|
|    45|userName45|        176|
|    35|userName35|        174|
|    15|userName15|        162|
|    37|userName37|        156|
|    69|userName69|        153|
|    36|userName36|        122|
|    99|userName99|        114|
+------+----------+-----------+