flink使用Event_timeリアルタイムデータの処理
5589 ワード
//flink
//event time:
//processing time: flink scala map
//ingest time: , streaming
// evet time , , , 30min,
// ,
//
//kafka
package kafka.partition.test;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class PartitionProducer {
public static void main(String[] args) {
Map props = new HashMap<>();
props.put("acks", "1");
props.put("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("bootstrap.servers", "bigdata01:9092");
String topic = "event_time";
KafkaProducer producer = new KafkaProducer<>(props);
for(int i = 0 ; i <= 20;i++) {
//flink watermarkassginer 5000
long mills = System.currentTimeMillis();
if(i%3==0) {
// event time
//kafka event_time topic 0 4000
String line = (mills-4000)+" "+"partition-0--this is a big +" +i;
ProducerRecord record = new ProducerRecord(topic, new Integer(0), null, i+"", line);
producer.send(record);
}else if(i%3==1) {
//kafka event_time topic 1 5000
String line = (mills-5000)+" "+"partition-1--this is a big +" +i;
ProducerRecord record = new ProducerRecord(topic, new Integer(1), null, i+"", line);
producer.send(record);
}else if(i%3==2) {
//kafka event_time topic 2 8000
String line = (mills-8000)+" "+"partition-2--this is a big +" +i;
ProducerRecord record = new ProducerRecord(topic, new Integer(2), null, i+"", line);
producer.send(record);
}
}
producer.close();
}
}
// TimestampsAndWatermarks
package flink.streaming
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.watermark.Watermark
class CustomWaterMarks extends AssignerWithPeriodicWatermarks[String]{
//
val maxOutOrderness = 5000l
//flink
def getCurrentWatermark():Watermark ={
val sysMilssecons = System.currentTimeMillis()
new Watermark(sysMilssecons-maxOutOrderness)
}
// even time even_time
def extractTimestamp(element: String,previousElementTimestamp: Long): Long = {
((element.split(" ")).head).toLong
}
}
package flink.streaming
import java.util.Properties
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
object StreamWithEventTimeAndWaterMarks {
def main(args: Array[String]): Unit = {
val kafkaProps = new Properties()
//kafka
kafkaProps.setProperty("bootstrap.servers", "bigdata01:9092")
//
kafkaProps.setProperty("group.id", "group2")
//
val evn = StreamExecutionEnvironment.getExecutionEnvironment
// event time
evn.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//kafka consumer,test1 topic
val kafkaSource = new FlinkKafkaConsumer[String]("event_time",new SimpleStringSchema,kafkaProps)
// TimestampsAndWatermarks
kafkaSource.assignTimestampsAndWatermarks(new CustomWaterMarks)
// offset
//kafkaSource.setStartFromGroupOffsets()
kafkaSource.setStartFromLatest()
// offset
kafkaSource.setCommitOffsetsOnCheckpoints(true)
//flink checkpoint
//evn.enableCheckpointing(2000)
// consumer
val stream = evn.addSource(kafkaSource)
evn.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE)
//stream.setParallelism(3)
val text = stream.flatMap{ _.toLowerCase().split(" ").drop(1).filter { _.nonEmpty} }
.map{(_,1)}
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
.map(x=>{(x._1,(new Integer(x._2)))})
text.print()
//
//text.addSink(new Ssinks())
evn.execute("kafkawd")
}
}
partition-2
1> (big,14)
4> (is,14)
1> (+0,1)
2> (+1,1)
3> (partition-1--this,7)
4> (+15,1)
3> (+12,1)
1> (partition-0--this,7)
3> (+6,1)
1> (+16,1)
4> (+10,1)
2> (+18,1)
4> (+7,1)
3> (+3,1)
2> (+9,1)
3> (+19,1)
2> (+13,1)
3> (a,14)
2> (+4,1)
「ITPUBブログ」からのリンク:http://blog.itpub.net/31506529/viewspace-2637182/転載する必要がある場合は、出典を明記してください.そうしないと、法律責任を追及します.
転載先:http://blog.itpub.net/31506529/viewspace-2637182/