133.Spark大手電子商取引プロジェクト-ユーザーの活躍度分析モジュール-指定時間内に最もアクセス数の多い10人のユーザーを統計する
この記事では,指定された時間内に最もアクセス数の多い10人のユーザを統計し,SparkSessionを用いてデータを解析する.
コード#コード#
UserActiveDegreeAnalyze.scala
実行結果
コード#コード#
UserActiveDegreeAnalyze.scala
package graduation.scala.spark
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* FileName: UserActiveDegreeAnalyze
* Author: hadoop
* Email: [email protected]
* Date: 19-8-4 11:31
* Description:
*
*
*
* , spark , scala+eclipse
*
* , java spark , hadoop ,hadoop mapreduce、hbase, java
* ,
*
* , , spark ,spark scala , spark scala
* , scala
*
*/
object UserActiveDegreeAnalyze {
case class UserActionLog(logId: Long, userId: Long, actionTime: String, actionType: Long, purchaseMoney: Double)
case class UserActionLogVO(logId: Long, userId: Long, actionValue: Long)
case class UserActionLogWithPurchaseMoneyVO(logId: Long, userId: Long, purchaseMoney: Double)
def main(args: Array[String]) {
// , , mysql (java web , java web mysql )
// , ,
// ,
val startDate = "2019-08-01";
val endDate = "2019-09-01";
//
// spark 2.0 , , ,
// , spark 2.0
val conf = new SparkConf().setAppName("UserActiveDegreeAnalyze").setMaster("local[2]")
/*val spark = SparkSession
.builder()
.appName("UserActiveDegreeAnalyze")
.master("local")
.config("spark.sql.warehouse.dir", "D:\\test\\spark\\mall\\spark-warehouse")
.getOrCreate()*/
val spark = SparkSession.builder().config(conf).getOrCreate()
val sc = spark.sparkContext
// spark
import spark.implicits._
// spark sql functions
import org.apache.spark.sql.functions._
val dataPath:String ="/home/hadoop/IdeaProjects/BigDataGraduationProject/log/"
//
val userBaseInfo = spark.read.json(dataPath+"user_base_info.json")
val userActionLog = spark.read.json(dataPath+"user_action_log.json")
// : 10
// : , ,pm , 100 ~1000 ,
//{"logId": 00,"userId": 0, "actionTime": "2019-08-05 22:32:36", "actionType": 0, "purchaseMoney": 0.0}
//{"userId": 0, "username": "user0", "registTime": "2019-08-05 22:32:36"}
userActionLog
// : ,
.filter("actionTime >= '" + startDate + "' and actionTime <= '" + endDate + "' and actionType = 0")
// :
.join(userBaseInfo, userActionLog("userId") === userBaseInfo("userId"))
// : , userid username
.groupBy(userBaseInfo("userId"), userBaseInfo("username"))
// :
.agg(count(userActionLog("logId")).alias("actionCount"))
// :
.sort($"actionCount".desc)
// :
.limit(10)
// : , , mysql
.show()
sc.stop()
spark.stop()
}
}
実行結果
+------+----------+-----------+
|userId| username|actionCount|
+------+----------+-----------+
| 63|userName63| 212|
| 4| userName4| 210|
| 17|userName17| 210|
| 45|userName45| 176|
| 35|userName35| 174|
| 15|userName15| 162|
| 37|userName37| 156|
| 69|userName69| 153|
| 36|userName36| 122|
| 99|userName99| 114|
+------+----------+-----------+