Flink-電子商取引ユーザー行動分析(kafka)
36719 ワード
この文章はhttps://blog.csdn.net/qq_46548855/article/details/107144990kafka
実際の生産環境では,我々のデータストリームはKafkaから取得されることが多い.コードをより生産性に近づけるには、sourceをKafkaに変更するだけです.
, Sink Kafka、ES、Redis
に変更実際の生産環境では,我々のデータストリームはKafkaから取得されることが多い.コードをより生産性に近づけるには、sourceをKafkaに変更するだけです.
val properties = new Properties()
properties.setProperty("bootstrap.servers", "hadoop102:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
val stream = env
.addSource(new FlinkKafkaConsumer[String]("hotitems", new
SimpleStringSchema(), properties))
import java.sql.Timestamp
import java.util.Properties
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector
import scala.collection.mutable.ListBuffer
import scala.tools.cmd.{Property, PropertyMapper, Reference, Spec}
//
case class UserBehavior(UserID:Long,itemId:Long,categoryId:Int,behavior:String,timestamp:Long)
// ( )
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)
object kafkahot {
def main(args: Array[String]): Unit = {
//
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//
val properties = new Properties()
properties.setProperty("bootstrap.servers", "hadoop102:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
//2. 834377,4541270,3738615,pv,1511658000
val dataStream = env.addSource(new FlinkKafkaConsumer[String]("hotitems",new SimpleStringSchema(),properties))
.map(data=>{
val dataArray = data.split(",")
UserBehavior(dataArray(0).trim.toLong,dataArray(1).trim.toLong,dataArray(2).trim.toInt,dataArray(3).trim,
dataArray(4).trim.toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000L)
//transform
val processedStream = dataStream
.filter(_.behavior == "pv")
.keyBy(_.itemId)
.timeWindow(Time.hours(1),Time.minutes(5))
.aggregate(new CountAgg,new WindowResult())
.keyBy(_.windowEnd) //
.process(new TopNHot(3))
//sink
processedStream.print()
env.execute("hot items job")
}
}
//
class CountAgg() extends AggregateFunction[UserBehavior,Long,Long]{
override def createAccumulator(): Long = 0L
override def add(in: UserBehavior, acc: Long): Long = acc + 1
override def getResult(acc: Long): Long = acc
override def merge(acc: Long, acc1: Long): Long = acc + acc1
}
//
class AverageAgg() extends AggregateFunction[UserBehavior,(Long,Int),Double]{
override def createAccumulator(): (Long, Int) = (0L,0)
override def add(in: UserBehavior, acc: (Long, Int)): (Long, Int) = (acc._1 + in.timestamp,acc._2 + 1)
override def getResult(acc: (Long, Int)): Double = acc._1/acc._2
override def merge(acc: (Long, Int), acc1: (Long, Int)): (Long, Int) = (acc._1+acc1._1 ,acc._2+acc1._2)
}
// , ItemViewCount Long long long ItemId
class WindowResult() extends WindowFunction[Long,ItemViewCount,Long,TimeWindow]{
override def apply(key: Long, window: TimeWindow, input: Iterable[Long],
out: Collector[ItemViewCount]): Unit = {
out.collect(ItemViewCount(key,window.getEnd,input.iterator.next()))
}
}
// windowEnd long
class TopNHot(topSize:Int) extends KeyedProcessFunction[Long,ItemViewCount,String]{
private var itemSate:ListState[ItemViewCount] = _
override def open(parameters: Configuration): Unit = {
itemSate = getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("item-state",classOf[ItemViewCount]))
}
// , ,
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, ItemViewCount, String]#OnTimerContext,
out: Collector[String]): Unit = {
// state , list buffer
val allItems:ListBuffer[ItemViewCount] = new ListBuffer()
import scala.collection.JavaConversions._
for (item <- itemSate.get()){
allItems += item
}
// Count N
val sortedItems = allItems.sortBy(_.count)(Ordering.Long.reverse).take(topSize)
// out.collect(sortedItems.toString())
itemSate.clear()
// -1 +1
val result:StringBuilder = new StringBuilder()
result.append(" :").append(new Timestamp(timestamp-1)).append("
")
//
for (i <- sortedItems.indices){
val currentItem = sortedItems(i)
result.append("No").append(i+1).append(":")
.append(" ID =").append(currentItem.itemId)
.append(" ").append(currentItem.count)
.append("
")
}
result.append("=====================")
//
Thread.sleep(1000)
out.collect(result.toString())
}
override def processElement(i: ItemViewCount, context: KeyedProcessFunction
[Long, ItemViewCount, String]#Context, collector: Collector[String]): Unit = {
//
itemSate.add(i)
//
context.timerService().registerEventTimeTimer(i.windowEnd + 1)
}
}