Flink-電子商取引ユーザー行動分析(kafka)


この文章はhttps://blog.csdn.net/qq_46548855/article/details/107144990kafka , Sink Kafka、ES、Redis に変更
実際の生産環境では,我々のデータストリームはKafkaから取得されることが多い.コードをより生産性に近づけるには、sourceをKafkaに変更するだけです.
 val properties = new Properties()
    properties.setProperty("bootstrap.servers", "hadoop102:9092")
    properties.setProperty("group.id", "consumer-group")
    properties.setProperty("key.deserializer",
      "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer",
      "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset", "latest")


val stream = env 
  .addSource(new FlinkKafkaConsumer[String]("hotitems", new
  SimpleStringSchema(), properties)) 
import java.sql.Timestamp
import java.util.Properties

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer
import scala.tools.cmd.{Property, PropertyMapper, Reference, Spec}

//          
case class UserBehavior(UserID:Long,itemId:Long,categoryId:Int,behavior:String,timestamp:Long)
//      (         )
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)


object kafkahot {
  def main(args: Array[String]): Unit = {
    //    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  //    
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "hadoop102:9092")
    properties.setProperty("group.id", "consumer-group")
    properties.setProperty("key.deserializer",
      "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer",
      "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset", "latest")



    //2.     834377,4541270,3738615,pv,1511658000
    val dataStream = env.addSource(new FlinkKafkaConsumer[String]("hotitems",new SimpleStringSchema(),properties))
      .map(data=>{
        val dataArray = data.split(",")
        UserBehavior(dataArray(0).trim.toLong,dataArray(1).trim.toLong,dataArray(2).trim.toInt,dataArray(3).trim,
          dataArray(4).trim.toLong)
      })
      .assignAscendingTimestamps(_.timestamp * 1000L)


    //transform    
    val processedStream = dataStream
        .filter(_.behavior == "pv")
        .keyBy(_.itemId)
        .timeWindow(Time.hours(1),Time.minutes(5))
        .aggregate(new CountAgg,new WindowResult())
        .keyBy(_.windowEnd)  //      
        .process(new TopNHot(3))



    //sink      
    processedStream.print()

    env.execute("hot items job")

  }

}

//        
class CountAgg() extends AggregateFunction[UserBehavior,Long,Long]{
  override def createAccumulator(): Long = 0L

  override def add(in: UserBehavior, acc: Long): Long = acc + 1

  override def getResult(acc: Long): Long = acc

  override def merge(acc: Long, acc1: Long): Long = acc + acc1
}

//             
class AverageAgg() extends AggregateFunction[UserBehavior,(Long,Int),Double]{
  override def createAccumulator(): (Long, Int) = (0L,0)

  override def add(in: UserBehavior, acc: (Long, Int)): (Long, Int) = (acc._1 + in.timestamp,acc._2 + 1)

  override def getResult(acc: (Long, Int)): Double = acc._1/acc._2

  override def merge(acc: (Long, Int), acc1: (Long, Int)): (Long, Int) = (acc._1+acc1._1 ,acc._2+acc1._2)
}

//       ,  ItemViewCount      Long        long     long ItemId
class WindowResult() extends WindowFunction[Long,ItemViewCount,Long,TimeWindow]{
  override def apply(key: Long, window: TimeWindow, input: Iterable[Long],
                     out: Collector[ItemViewCount]): Unit = {
    out.collect(ItemViewCount(key,window.getEnd,input.iterator.next()))

  }
}

//               windowEnd   long
class TopNHot(topSize:Int) extends KeyedProcessFunction[Long,ItemViewCount,String]{

  private var itemSate:ListState[ItemViewCount] = _

  override def open(parameters: Configuration): Unit = {
    itemSate = getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("item-state",classOf[ItemViewCount]))

  }

  //      ,       ,     
  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, ItemViewCount, String]#OnTimerContext,
                       out: Collector[String]): Unit = {
    //   state      ,     list buffer 
    val allItems:ListBuffer[ItemViewCount] = new ListBuffer()
    import scala.collection.JavaConversions._
    for (item <- itemSate.get()){
      allItems += item
    }
    //  Count         N 
    val sortedItems = allItems.sortBy(_.count)(Ordering.Long.reverse).take(topSize)

    //                   out.collect(sortedItems.toString())  
    itemSate.clear()

    //            -1        +1
    val result:StringBuilder = new StringBuilder()
    result.append("  :").append(new Timestamp(timestamp-1)).append("
"
) // for (i <- sortedItems.indices){ val currentItem = sortedItems(i) result.append("No").append(i+1).append(":") .append(" ID =").append(currentItem.itemId) .append(" ").append(currentItem.count) .append("
"
) } result.append("=====================") // Thread.sleep(1000) out.collect(result.toString()) } override def processElement(i: ItemViewCount, context: KeyedProcessFunction [Long, ItemViewCount, String]#Context, collector: Collector[String]): Unit = { // itemSate.add(i) // context.timerService().registerEventTimeTimer(i.windowEnd + 1) } }