Flinkリアルタイムコンピューティングステータスプログラミングケース-リアルタイム統計当日ダウンロード量ランキング(Java実装)
92803 ワード
一、需要の簡単な概要
1. 原因:この文章を見た人は毎日のダウンロード数ランキングを統計するという需要がT+1のオフラインバッチ需要に聞こえると思いますが、実は私もそう思っています.だから、どうしてこれを書きますか.これは以前の需要であり,以前はリアルタイム統計の需要であったが,順位などは後期のインタフェースでデータベースのデータを読み取ることによって実現されたものであり,今ではインタフェースによってデータベースのデータを取得してソートするなどの効率が低いと感じ,直接ソート結果をデータベースに書き込んでほしい.これも、普段はScaleでSparkやFlinkを書くのに慣れている理由ですが、今回はJavaで書く理由は、以前のプロジェクトに書いてあるので、以前の書き方を使わなければなりません.そして私はどうして文章にまとめたのですか?この需要はFlinkのステータスプログラミング、WaterMark、時間の意味、ステータスバックエンド、タイマーなどを使っているので、最初はネットで関連資料を探しても多くないので、やはり少し記録的な意味があります. 2. 需要:最初はリアルタイムで当日の0时から计算时までのこの时间帯の各ゲームのダウンロード量ランキングを统计すると思っていたので、このように実现して、后で私は书き终わってやっとリアルタイムでダウンロード量の最大のあのゲームを统计するだけでいいことを知っていて、実は差は多くないので、コードの中で私は注釈の一部があります. 3. 実現構想:データを取得した後、すべてのゲームの数はそれほど大きくないため、すべてのゲームのダウンロード量を1つの状態で保存し、後で最大ダウンロード量のゲームデータを統計して入庫するのに便利である.だから同じ日のデータは同じ状態を使うことができて、その問題は来て、この今日の状態の後で何の役にも立たないはずで、それではこの状態は毎日1つのますます多くなるのではありませんて、だから私はタイマーを設置して、タイマーは2日後にトリガーして、タイマーがトリガーする時の操作は2日前のその状態に対して1つのクリアを行います.タイマーを保存するには2番目のステータスが必要です.また、3番目のタイマーを再使用するために設定しました.ここにはもう一つの細かい点があります.例えば、あなたのデータは2日後になってから着いたのです.この時、2日前のタイマーが空になったら、データの間違いを招きやすいので、watermarkの時間と伝達の時間を比較しなければなりません.もし伝達の時間に2日がwatermarkより大きい時間を加えたら、処理しません.大体の考え方はこのようにして、このような需要は今日の零点から今までのダウンロード量が最も大きくて、時計ごとにある時間ではありませんて、5分ごとのダウンロード量が最も大きいこのような実用的なWindowは便利で、そんなに多くの状態を考慮する必要はありません.しかし、ここにはWindowを加えることもできます.何分ごとに実行されますが、私はここに追加していません.差も多くありません.最後にMongoDBデータベースに出力しますが、ここでは出力コードを貼り付けません.
二、コード実装
pom.xml:
データ形式:
1つのデータは1回のダウンロードで、上のこのデータの意味はimei 1というデバイスが1576684800というタイムスタンプでid 1のゲームをダウンロードしたことです. Topology.java SortDowComplete.java SortKeyByFunction.java KeyTimeProcessFunction.java SortGameDownloads.java IMongoUpdate.java
1. 原因:この文章を見た人は毎日のダウンロード数ランキングを統計するという需要がT+1のオフラインバッチ需要に聞こえると思いますが、実は私もそう思っています.だから、どうしてこれを書きますか.これは以前の需要であり,以前はリアルタイム統計の需要であったが,順位などは後期のインタフェースでデータベースのデータを読み取ることによって実現されたものであり,今ではインタフェースによってデータベースのデータを取得してソートするなどの効率が低いと感じ,直接ソート結果をデータベースに書き込んでほしい.これも、普段はScaleでSparkやFlinkを書くのに慣れている理由ですが、今回はJavaで書く理由は、以前のプロジェクトに書いてあるので、以前の書き方を使わなければなりません.そして私はどうして文章にまとめたのですか?この需要はFlinkのステータスプログラミング、WaterMark、時間の意味、ステータスバックエンド、タイマーなどを使っているので、最初はネットで関連資料を探しても多くないので、やはり少し記録的な意味があります. 2. 需要:最初はリアルタイムで当日の0时から计算时までのこの时间帯の各ゲームのダウンロード量ランキングを统计すると思っていたので、このように実现して、后で私は书き终わってやっとリアルタイムでダウンロード量の最大のあのゲームを统计するだけでいいことを知っていて、実は差は多くないので、コードの中で私は注釈の一部があります. 3. 実現構想:データを取得した後、すべてのゲームの数はそれほど大きくないため、すべてのゲームのダウンロード量を1つの状態で保存し、後で最大ダウンロード量のゲームデータを統計して入庫するのに便利である.だから同じ日のデータは同じ状態を使うことができて、その問題は来て、この今日の状態の後で何の役にも立たないはずで、それではこの状態は毎日1つのますます多くなるのではありませんて、だから私はタイマーを設置して、タイマーは2日後にトリガーして、タイマーがトリガーする時の操作は2日前のその状態に対して1つのクリアを行います.タイマーを保存するには2番目のステータスが必要です.また、3番目のタイマーを再使用するために設定しました.ここにはもう一つの細かい点があります.例えば、あなたのデータは2日後になってから着いたのです.この時、2日前のタイマーが空になったら、データの間違いを招きやすいので、watermarkの時間と伝達の時間を比較しなければなりません.もし伝達の時間に2日がwatermarkより大きい時間を加えたら、処理しません.大体の考え方はこのようにして、このような需要は今日の零点から今までのダウンロード量が最も大きくて、時計ごとにある時間ではありませんて、5分ごとのダウンロード量が最も大きいこのような実用的なWindowは便利で、そんなに多くの状態を考慮する必要はありません.しかし、ここにはWindowを加えることもできます.何分ごとに実行されますが、私はここに追加していません.差も多くありません.最後にMongoDBデータベースに出力しますが、ここでは出力コードを貼り付けません.
二、コード実装
pom.xml:
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-coreartifactId>
<version>1.8.1version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-javaartifactId>
<version>1.8.1version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-streaming-java_2.11artifactId>
<version>1.8.1version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-statebackend-rocksdb_2.11artifactId>
<version>1.8.1version>
dependency>
<dependency>
<groupId>org.mongodbgroupId>
<artifactId>mongo-java-driverartifactId>
<version>3.8.2version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-hadoop-compatibility_2.11artifactId>
<version>1.8.1version>
dependency>
<dependency>
<groupId>org.mongodb.mongo-hadoopgroupId>
<artifactId>mongo-hadoop-coreartifactId>
<version>2.0.0version>
dependency>
データ形式:
id
id1 1576684800 imei1 version1 channel1
1つのデータは1回のダウンロードで、上のこのデータの意味はimei 1というデバイスが1576684800というタイムスタンプでid 1のゲームをダウンロードしたことです.
import com.mongodb.client.model.WriteModel;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class Topology {
public static final String ProjectName = "halo-download";
public static void main(String[] args) throws Exception {
Logger logger = LoggerFactory.getLogger(Topology.class);
//
ParameterTool params = ParameterTool
.fromPropertiesFile(
Topology.class.getResourceAsStream("/normal.properties")
).mergeWith(ParameterTool.fromPropertiesFile(
Topology.class.getResourceAsStream("/mongodb.properties")
)).mergeWith(ParameterTool.fromPropertiesFile(
Topology.class.getResourceAsStream("/loghub.properties")
)).mergeWith(
ParameterTool.fromArgs(args)
);
String hdfsMaster = params.get("hdfs", "hdfs://");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
.enableCheckpointing(Time.seconds(
Integer.valueOf(params.get("checkpoint.sec", "300"))
).toMilliseconds())
.setStateBackend(new RocksDBStateBackend(params.get("RocksDBStateBackend", hdfsMaster + "/flink/checkpoints")));
env.getConfig().setGlobalJobParameters(params);
//
String execEnv = params.get("exec.env", "dev");
//
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// log
env.getConfig().enableSysoutLogging();
// , ,EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// latency
env.getConfig().setLatencyTrackingInterval(1000);
// Watermark 100
env.getConfig().setAutoWatermarkInterval(100L);
logger.info("execEnv: " + execEnv);
// ,
// , ,
//id1 1576684800 imei1 version1 channel1
//12-22 id2 1576944000 imei2 version2 channel1 12-22 , ,
//12-23 id3 1577030400 imei2 version2 channel1
//12-24 id4 1577116800 imei2 version2 channel1
//12-25 id5 1577203200 imei2 version2 channel1
//12-26 id6 1577289600 imei2 version2 channel1
//12-27 id7 1577376000 imei2 version2 channel1
//12-28 id8 1577462400 imei2 version2 channel1
SingleOutputStreamOperator<SortDowComplete> localhost = env.socketTextStream("localhost", 8888).map(new MapFunction<String, SortDowComplete>() {
@Override
public SortDowComplete map(String s) throws Exception {
String[] splits = s.split("\\W+");
SortDowComplete sortDowComplete = new SortDowComplete(splits[0], Integer.parseInt(splits[1]), splits[2], splits[3], splits[4]);
return sortDowComplete;
}
});//.setParallelism(1)
SingleOutputStreamOperator<List<WriteModel<? extends Document>>> process =
// WaterMark, , , watermark
//https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/event_timestamp_extractors.html
localhost.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SortDowComplete>() {
@Override
public long extractAscendingTimestamp(SortDowComplete sortDowComplete) {
return sortDowComplete.getTs() * 1000L;
}
}).setParallelism(1)
// keyBy , processFunction
.keyBy(new SortKeyByFunction()).process(new KeyTimeProcessFunction());
process.println();//
}
}
/**
* @Author: fseast
* @Date: 2020/3/24 8:32
* @Description:
*/
public class SortDowComplete{
private String gameId;
private long ts;
private String imei;
private String haloVersion;
private String haloChannel;
public SortDowComplete(){}
public SortDowComplete(String gameId, int ts, String imei, String haloVersion, String haloChannel) {
this.gameId = gameId;
this.ts = ts;
this.imei = imei;
this.haloVersion = haloVersion;
this.haloChannel = haloChannel;
}
public String getGameId() {
return gameId;
}
public void setGameId(String gameId) {
this.gameId = gameId;
}
public Long getTs() {
return ts;
}
public void setTs(int ts) {
this.ts = ts;
}
public String getImei() {
return imei;
}
public void setImei(String imei) {
this.imei = imei;
}
public String getHaloVersion() {
return haloVersion;
}
public void setHaloVersion(String haloVersion) {
this.haloVersion = haloVersion;
}
public String getHaloChannel() {
return haloChannel;
}
public void setHaloChannel(String haloChannel) {
this.haloChannel = haloChannel;
}
@Override
public String toString() {
return "SortDowComplete{" +
"gameId='" + gameId + '\'' +
", ts=" + ts +
", haloVersion='" + haloVersion + '\'' +
", haloChannel='" + haloChannel + '\'' +
", imei='" + imei + '\'' +
'}';
}
}
import org.apache.flink.api.java.functions.KeySelector;
/**
* @Author: fseast
* @Date: 2020/3/24 11:14
* @Description:
*/
public class SortKeyByFunction implements KeySelector<SortDowComplete,Long> {
@Override
public Long getKey(SortDowComplete sortDowComplete) throws Exception {
return sortDowComplete.getTs();
}
}
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.WriteModel;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.bson.Document;
import java.util.*;
/**
* @Author: fseast
* @Date: 2020/3/24 5:23
* @Description:
*/
public class KeyTimeProcessFunction extends KeyedProcessFunction<Long,SortDowComplete,List<WriteModel<? extends Document>>> {
//private ValueState gameState1;
//
// state, ,state MapState,key gameId,value (ga, )
private transient MapState<String,Integer> gameState;
// state, ,
private transient ValueState<Long> timerState;
// , 、 imei, gameID
private transient MapState<String,Integer> distinctState;
//List list ;
//Map map;
@Override
public void open(Configuration parameters) throws Exception {
//System.out.println("====");
//super.open(parameters);
gameState = getRuntimeContext().getMapState(new MapStateDescriptor<>("gameState", String.class,Integer.class));
timerState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timerState",Long.class));
distinctState = getRuntimeContext().getMapState(new MapStateDescriptor<String, Integer>("distinctState", String.class, Integer.class));
//map = new HashMap<>();
//list = new ArrayList<>();
}
@Override
public void processElement(SortDowComplete sortDowComplete, Context context, Collector<List<WriteModel<? extends Document>>> collector) throws Exception {
//set
TreeSet<SortGameDownloads> set = new TreeSet<>();
// , 1000 。
long oneTime = sortDowComplete.getTs() * 1000L;
long watermark = context.timerService().currentWatermark() + 1L;
//System.out.println(day);//null 1
System.out.println(" process :" + context.timerService().currentProcessingTime() + ",watermark :"+ (context.timerService().currentWatermark()+1));
System.out.println("sortDowComplete :" + oneTime);
//System.out.println(" state:"+ timerState.value());
// , value null
//watermark , , 。
//watermark , state 。
// , , ,
if (oneTime > watermark && timerState.value() == null) {
// ,
long timerTs = oneTime + 129600000L;// 36 , 3 ,5 0
context.timerService().registerEventTimeTimer(timerTs);
// state ,
timerState.update(timerTs);
System.out.println(" :" + timerTs);
}
//System.out.println(" state:"+ timerState.value());
/*Iterator> iterator = gameState.iterator();
while (iterator.hasNext()) {
Map.Entry next = iterator.next();
System.out.println("gameState key:"+next.getKey()+"==,value:"+next.getValue());
}
gameState.put(sortDowComplete.getGameId(),101);*/
// 24 , , 3 , 5 ,
long judge = oneTime + 172800000;
//System.out.println(" :"+judge+" ,watermark:"+watermark);
if (judge <= watermark){
System.out.println(" ");
}else {
System.out.println(" ");
// ID 。
Integer gameDowNum = gameState.get(sortDowComplete.getGameId());
String stateKey = sortDowComplete.getGameId() + "_" + sortDowComplete.getImei();
System.out.println(stateKey+", "+distinctState.get(stateKey));
// ,
if (gameDowNum != null){
Integer dist = distinctState.get(stateKey);
// ,
if (dist == null || dist == 0){
gameDowNum +=1;
//
gameState.put(sortDowComplete.getGameId(),gameDowNum);
//
distinctState.put(stateKey,1);// imei
}
} else if (gameDowNum == null || gameDowNum == 0){// , 1,
gameState.put(sortDowComplete.getGameId(),1);
distinctState.put(stateKey,1);// imei
}
Iterator<Map.Entry<String, Integer>> iterator = gameState.iterator();
while (iterator.hasNext()){
Map.Entry<String, Integer> next = iterator.next();
//System.out.println("key:"+next.getKey()+",value:"+next.getValue());
SortGameDownloads sortGameDownloads = new SortGameDownloads(sortDowComplete.getTs(),next.getKey(), next.getValue());
set.add(sortGameDownloads);
}
List<WriteModel<? extends Document>> writeModels = new ArrayList<>();
//
SortGameDownloads first = set.first();
first.setTop(1);
writeModels.add(new UpdateOneModel(first.getQueryDoc(),first.getUpdateDoc(),first.getOption()));
/* ,
Iterator iteSort = set.iterator();
Integer i = 1;
Integer temp = -1;
while (iteSort.hasNext()) {
SortGameDownloads sortNext = iteSort.next();
if (i == 1){
temp = sortNext.getDownloads();
//
sortNext.setTop(1);
//writeModels.add(new DeleteManyModel(sortNext.deleteFilter()));
writeModels.add(new UpdateOneModel(sortNext.getQueryDoc(),sortNext.getUpdateDoc(),sortNext.getOption()));
i = 2;
} else if (i == 2){//
if (temp == sortNext.getDownloads()){//
//
sortNext.setTop(1);
writeModels.add(new UpdateOneModel(sortNext.getQueryDoc(),sortNext.getUpdateDoc(),sortNext.getOption()));
i = 2;
} else {
i ++;
break;
}
}
}*/
System.out.println("set ");
System.out.println(first);
System.out.println(set);
System.out.println("set ");
/*for (SortGameDownloads sortGameDow : set) {
writeModels.add(new UpdateOneModel(sortGameDow.getQueryDoc(),sortGameDow.getUpdateDoc(),sortGameDow.getOption()));
}*/
System.out.println(writeModels);
System.out.println("===1111111");
collector.collect(writeModels);
}
System.out.println();
}
// watermark ,
// keyBy key ,
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<List<WriteModel<? extends Document>>> out) throws Exception {
//super.onTimer(timestamp, ctx, out);
// keyBy state, 。
gameState.clear();
timerState.clear();
distinctState.clear();
System.out.println("== :"+timestamp);
}
}
import com.mongodb.client.model.UpdateOptions;
import org.bson.Document;
import java.util.Date;
/**
* @Author: fseast
* @Date: 2020/3/26 5:24
* @Description:
*/
public class SortGameDownloads implements Comparable , IMongoUpdate {
private Long time;
private String gameId;
private Integer downloads;
private Integer top = -1;
public SortGameDownloads(Long time, String gameId, Integer downloads) {
this.time = time;
this.gameId = gameId;
this.downloads = downloads;
}
public Long getTime() {
return time;
}
public void setTime(Long time) {
this.time = time;
}
public String getGameId() {
return gameId;
}
public void setGameId(String gameId) {
this.gameId = gameId;
}
public Integer getDownloads() {
return downloads;
}
public void setDownloads(Integer downloads) {
this.downloads = downloads;
}
public Integer getTop() {
return top;
}
public void setTop(Integer top) {
this.top = top;
}
@Override
public String toString() {
return "SortGameDownloads{" +
"time=" + time +
", gameId='" + gameId + '\'' +
", downloads=" + downloads +
", top=" + top +
'}';
}
@Override
public int compareTo(Object o) {
if (o instanceof SortGameDownloads){
SortGameDownloads sort = (SortGameDownloads) o;
// , id
int num = -(this.downloads - sort.downloads);
if (num == 0){
return this.gameId.compareTo(sort.gameId);
}
return num;
}
return 0;
}
@Override
public Document getQueryDoc() {
Document query = new Document();
//query.append("game_id",gameId);
query.append("time",new Date(time * 1000L));
return query;
}
@Override
public Document getUpdateDoc() {
Document inc = new Document();
inc.append("downloads",downloads);
inc.append("top",top);
inc.append("game_id",gameId);
return new Document("$set",inc);
}
@Override
public UpdateOptions getOption() {
return new UpdateOptions().upsert(true);
}
//
/*public Document deleteFilter(){
Document delete = new Document();
delete.append("time",new Date(time * 1000L));
return delete;
}*/
}
import com.mongodb.client.model.UpdateOptions;
import org.bson.Document;
public interface IMongoUpdate {
Document getQueryDoc();
Document getUpdateDoc();
UpdateOptions getOption();
}