Flinkリアルタイムコンピューティングステータスプログラミングケース-リアルタイム統計当日ダウンロード量ランキング(Java実装)

92803 ワード

一、需要の簡単な概要
  1. 原因:この文章を見た人は毎日のダウンロード数ランキングを統計するという需要がT+1のオフラインバッチ需要に聞こえると思いますが、実は私もそう思っています.だから、どうしてこれを書きますか.これは以前の需要であり,以前はリアルタイム統計の需要であったが,順位などは後期のインタフェースでデータベースのデータを読み取ることによって実現されたものであり,今ではインタフェースによってデータベースのデータを取得してソートするなどの効率が低いと感じ,直接ソート結果をデータベースに書き込んでほしい.これも、普段はScaleでSparkやFlinkを書くのに慣れている理由ですが、今回はJavaで書く理由は、以前のプロジェクトに書いてあるので、以前の書き方を使わなければなりません.そして私はどうして文章にまとめたのですか?この需要はFlinkのステータスプログラミング、WaterMark、時間の意味、ステータスバックエンド、タイマーなどを使っているので、最初はネットで関連資料を探しても多くないので、やはり少し記録的な意味があります.  2. 需要:最初はリアルタイムで当日の0时から计算时までのこの时间帯の各ゲームのダウンロード量ランキングを统计すると思っていたので、このように実现して、后で私は书き终わってやっとリアルタイムでダウンロード量の最大のあのゲームを统计するだけでいいことを知っていて、実は差は多くないので、コードの中で私は注釈の一部があります.  3. 実現構想:データを取得した後、すべてのゲームの数はそれほど大きくないため、すべてのゲームのダウンロード量を1つの状態で保存し、後で最大ダウンロード量のゲームデータを統計して入庫するのに便利である.だから同じ日のデータは同じ状態を使うことができて、その問題は来て、この今日の状態の後で何の役にも立たないはずで、それではこの状態は毎日1つのますます多くなるのではありませんて、だから私はタイマーを設置して、タイマーは2日後にトリガーして、タイマーがトリガーする時の操作は2日前のその状態に対して1つのクリアを行います.タイマーを保存するには2番目のステータスが必要です.また、3番目のタイマーを再使用するために設定しました.ここにはもう一つの細かい点があります.例えば、あなたのデータは2日後になってから着いたのです.この時、2日前のタイマーが空になったら、データの間違いを招きやすいので、watermarkの時間と伝達の時間を比較しなければなりません.もし伝達の時間に2日がwatermarkより大きい時間を加えたら、処理しません.大体の考え方はこのようにして、このような需要は今日の零点から今までのダウンロード量が最も大きくて、時計ごとにある時間ではありませんて、5分ごとのダウンロード量が最も大きいこのような実用的なWindowは便利で、そんなに多くの状態を考慮する必要はありません.しかし、ここにはWindowを加えることもできます.何分ごとに実行されますが、私はここに追加していません.差も多くありません.最後にMongoDBデータベースに出力しますが、ここでは出力コードを貼り付けません.
二、コード実装
pom.xml:
        
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-coreartifactId>
            <version>1.8.1version>
        dependency>
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-javaartifactId>
            <version>1.8.1version>
        dependency>

        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-streaming-java_2.11artifactId>
            <version>1.8.1version>
        dependency>
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-statebackend-rocksdb_2.11artifactId>
            <version>1.8.1version>
        dependency>
        <dependency>
            <groupId>org.mongodbgroupId>
            <artifactId>mongo-java-driverartifactId>
            <version>3.8.2version>
        dependency>
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-hadoop-compatibility_2.11artifactId>
            <version>1.8.1version>
        dependency>
        <dependency>
            <groupId>org.mongodb.mongo-hadoopgroupId>
            <artifactId>mongo-hadoop-coreartifactId>
            <version>2.0.0version>
        dependency>  

データ形式:
  id	   	    	  		  
id1 1576684800 imei1 version1 channel1

1つのデータは1回のダウンロードで、上のこのデータの意味はimei 1というデバイスが1576684800というタイムスタンプでid 1のゲームをダウンロードしたことです.
  • Topology.java
  • import com.mongodb.client.model.WriteModel;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
    import org.apache.flink.streaming.api.CheckpointingMode;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.bson.Document;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.List;
    
    public class Topology {
        public static final String ProjectName = "halo-download";
    
        public static void main(String[] args) throws Exception {
            Logger logger = LoggerFactory.getLogger(Topology.class);
            //               
            ParameterTool params = ParameterTool
                    .fromPropertiesFile(
                            Topology.class.getResourceAsStream("/normal.properties")
                    ).mergeWith(ParameterTool.fromPropertiesFile(
                            Topology.class.getResourceAsStream("/mongodb.properties")
                    )).mergeWith(ParameterTool.fromPropertiesFile(
                            Topology.class.getResourceAsStream("/loghub.properties")
                    )).mergeWith(
                            ParameterTool.fromArgs(args)
                    );
    
            String hdfsMaster = params.get("hdfs", "hdfs://");
    
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
                    .enableCheckpointing(Time.seconds(
                            Integer.valueOf(params.get("checkpoint.sec", "300"))
                    ).toMilliseconds())
                    .setStateBackend(new RocksDBStateBackend(params.get("RocksDBStateBackend", hdfsMaster + "/flink/checkpoints")));
    
            env.getConfig().setGlobalJobParameters(params);
    
            //     
            String execEnv = params.get("exec.env", "dev");
            //         
            env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
            //     log  
            env.getConfig().enableSysoutLogging();
            //          ,        ,EventTime
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            // latency      
            env.getConfig().setLatencyTrackingInterval(1000);
            //  Watermark      100  
            env.getConfig().setAutoWatermarkInterval(100L);
    
    
            logger.info("execEnv: " + execEnv);
            
            //           ,               
            //        ,        ,           
            //id1 1576684800 imei1 version1 channel1          
            //12-22		id2 1576944000 imei2 version2 channel1         12-22         ,        ,      
            //12-23		id3 1577030400 imei2 version2 channel1 
            //12-24		id4 1577116800 imei2 version2 channel1 
            //12-25		id5 1577203200 imei2 version2 channel1 
            //12-26		id6 1577289600 imei2 version2 channel1 
            //12-27		id7 1577376000 imei2 version2 channel1 
            //12-28		id8 1577462400 imei2 version2 channel1 
            SingleOutputStreamOperator<SortDowComplete> localhost = env.socketTextStream("localhost", 8888).map(new MapFunction<String, SortDowComplete>() {
                @Override
                public SortDowComplete map(String s) throws Exception {
                    String[] splits = s.split("\\W+");
    
                    SortDowComplete sortDowComplete = new SortDowComplete(splits[0], Integer.parseInt(splits[1]), splits[2], splits[3], splits[4]);
                    return sortDowComplete;
                }
            });//.setParallelism(1)
            
    
            SingleOutputStreamOperator<List<WriteModel<? extends Document>>> process =
                    //  WaterMark,        ,                       ,          watermark              
                    //https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/event_timestamp_extractors.html
                    localhost.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SortDowComplete>() {
                        @Override
                        public long extractAscendingTimestamp(SortDowComplete sortDowComplete) {
                            return sortDowComplete.getTs() * 1000L;
                        }
                    }).setParallelism(1)
                    //     keyBy   ,               processFunction 
                            .keyBy(new SortKeyByFunction()).process(new KeyTimeProcessFunction());
    
            process.println();//  
        }
    }
    
  • SortDowComplete.java
  • /**
     * @Author: fseast
     * @Date: 2020/3/24   8:32
     * @Description:
     */
    public class SortDowComplete{
        private String gameId;
        private long ts;
        private String imei;
        private String haloVersion;
        private String haloChannel;
        
        public SortDowComplete(){}
    
        public SortDowComplete(String gameId, int ts, String imei, String haloVersion, String haloChannel) {
            this.gameId = gameId;
            this.ts = ts;
            this.imei = imei;
            this.haloVersion = haloVersion;
            this.haloChannel = haloChannel;
        }
    
        public String getGameId() {
            return gameId;
        }
    
        public void setGameId(String gameId) {
            this.gameId = gameId;
        }
    
        public Long getTs() {
            return ts;
        }
    
        public void setTs(int ts) {
            this.ts = ts;
        }
    
        public String getImei() {
            return imei;
        }
    
        public void setImei(String imei) {
            this.imei = imei;
        }
    
        public String getHaloVersion() {
            return haloVersion;
        }
    
        public void setHaloVersion(String haloVersion) {
            this.haloVersion = haloVersion;
        }
    
        public String getHaloChannel() {
            return haloChannel;
        }
    
        public void setHaloChannel(String haloChannel) {
            this.haloChannel = haloChannel;
        }
    
        @Override
        public String toString() {
            return "SortDowComplete{" +
                    "gameId='" + gameId + '\'' +
                    ", ts=" + ts +
                    ", haloVersion='" + haloVersion + '\'' +
                    ", haloChannel='" + haloChannel + '\'' +
                    ", imei='" + imei + '\'' +
                    '}';
        }
        
    }
    
    
  • SortKeyByFunction.java
  • import org.apache.flink.api.java.functions.KeySelector;
    
    /**
     * @Author: fseast
     * @Date: 2020/3/24   11:14
     * @Description:
     */
    public class SortKeyByFunction implements KeySelector<SortDowComplete,Long> {
        @Override
        public Long getKey(SortDowComplete sortDowComplete) throws Exception {
            return sortDowComplete.getTs();
        }
    }
    
  • KeyTimeProcessFunction.java
  • import com.mongodb.client.model.UpdateOneModel;
    import com.mongodb.client.model.WriteModel;
    import org.apache.flink.api.common.state.MapState;
    import org.apache.flink.api.common.state.MapStateDescriptor;
    import org.apache.flink.api.common.state.ValueState;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    import org.apache.flink.util.Collector;
    import org.bson.Document;
    
    import java.util.*;
    
    /**
     * @Author: fseast
     * @Date: 2020/3/24   5:23
     * @Description:
     */
    public class KeyTimeProcessFunction extends KeyedProcessFunction<Long,SortDowComplete,List<WriteModel<? extends Document>>> {
        //private ValueState gameState1;
        //                 
        //    state,           ,state    MapState,key gameId,value (ga,   )
        private transient MapState<String,Integer> gameState;
        //    state,          ,
        private transient ValueState<Long> timerState;
        //          ,   、   imei,   gameID      
        private transient MapState<String,Integer> distinctState;
        
        
        //List list ;
    
        //Map map;
        
        @Override
        public void open(Configuration parameters) throws Exception {
            //System.out.println("====");
            //super.open(parameters);
            gameState =  getRuntimeContext().getMapState(new MapStateDescriptor<>("gameState", String.class,Integer.class));
            timerState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timerState",Long.class));
            distinctState = getRuntimeContext().getMapState(new MapStateDescriptor<String, Integer>("distinctState", String.class, Integer.class));
            
            //map = new HashMap<>();
            //list = new ArrayList<>();
            
        }
    
        @Override
        public void processElement(SortDowComplete sortDowComplete, Context context, Collector<List<WriteModel<? extends Document>>> collector) throws Exception {
            
            //set        
            TreeSet<SortGameDownloads> set = new TreeSet<>();
            
            //    ,  1000     。
            long oneTime = sortDowComplete.getTs() * 1000L;
            long watermark = context.timerService().currentWatermark() + 1L;
            
            //System.out.println(day);//null  1
            System.out.println("  process  :" + context.timerService().currentProcessingTime() + ",watermark    :"+ (context.timerService().currentWatermark()+1));
            System.out.println("sortDowComplete      :" + oneTime);
            //System.out.println("    state:"+ timerState.value());
            //         ,   value null
            //watermark                 ,         ,      。
            //watermark                     ,                    state           。
            //           ,          ,        ,
            if (oneTime > watermark && timerState.value() == null) {
                //                ,         
                long timerTs = oneTime + 129600000L;// 36   ,    3    ,5 0    
                context.timerService().registerEventTimeTimer(timerTs);
                //       state ,
                timerState.update(timerTs);
                System.out.println("        :" + timerTs);
                
            }
            
            //System.out.println("    state:"+ timerState.value());
            
            
            /*Iterator> iterator = gameState.iterator();
            while (iterator.hasNext()) {
                Map.Entry next = iterator.next();
                System.out.println("gameState key:"+next.getKey()+"==,value:"+next.getValue());
            }
            gameState.put(sortDowComplete.getGameId(),101);*/
           
           //        24  ,         ,  3    , 5     ,         
            long judge = oneTime + 172800000;
            
            //System.out.println("    :"+judge+" ,watermark:"+watermark);
            if (judge <= watermark){
                System.out.println("   ");
            }else {
                System.out.println("  ");
                //     ID         。
                Integer gameDowNum = gameState.get(sortDowComplete.getGameId());
                String stateKey = sortDowComplete.getGameId() + "_" + sortDowComplete.getImei();
                System.out.println(stateKey+",  "+distinctState.get(stateKey));
    
                //              ,        
                if (gameDowNum != null){
                    Integer dist = distinctState.get(stateKey);
                    //               ,       
                    if (dist == null || dist == 0){
                        gameDowNum +=1;
                        //            
                        gameState.put(sortDowComplete.getGameId(),gameDowNum);
                        //      
                        distinctState.put(stateKey,1);//     imei           
                    }
                } else if (gameDowNum == null || gameDowNum == 0){//                   ,      1,
                    gameState.put(sortDowComplete.getGameId(),1);
                    distinctState.put(stateKey,1);//     imei           
                }
    
                
    
                Iterator<Map.Entry<String, Integer>> iterator = gameState.iterator();
                
                while (iterator.hasNext()){
                    Map.Entry<String, Integer> next = iterator.next();
                    //System.out.println("key:"+next.getKey()+",value:"+next.getValue());
    
                    SortGameDownloads sortGameDownloads = new SortGameDownloads(sortDowComplete.getTs(),next.getKey(), next.getValue());
                    set.add(sortGameDownloads);
                    
                    
                }
                
                List<WriteModel<? extends Document>> writeModels = new ArrayList<>();
                
                //        
                SortGameDownloads first = set.first();
                first.setTop(1);
                writeModels.add(new UpdateOneModel(first.getQueryDoc(),first.getUpdateDoc(),first.getOption()));
                /*                  ,            
                Iterator iteSort = set.iterator();
                Integer i = 1;
                Integer temp = -1;
                while (iteSort.hasNext()) {
                    SortGameDownloads sortNext = iteSort.next();
                    if (i == 1){
                        temp = sortNext.getDownloads();
                        //    
                        sortNext.setTop(1);
                        //writeModels.add(new DeleteManyModel(sortNext.deleteFilter()));
                        writeModels.add(new UpdateOneModel(sortNext.getQueryDoc(),sortNext.getUpdateDoc(),sortNext.getOption()));
                        i = 2;
                    } else if (i == 2){//     
                        if (temp == sortNext.getDownloads()){//          
                            //    
                            sortNext.setTop(1);
                            writeModels.add(new UpdateOneModel(sortNext.getQueryDoc(),sortNext.getUpdateDoc(),sortNext.getOption()));
                            i = 2;
                        } else {
                            i ++;
                            break;
                        }
                    }
                    
                }*/
                System.out.println("set  ");
                System.out.println(first);
                System.out.println(set);
                System.out.println("set  ");
    
                
                /*for (SortGameDownloads sortGameDow : set) {
                    writeModels.add(new UpdateOneModel(sortGameDow.getQueryDoc(),sortGameDow.getUpdateDoc(),sortGameDow.getOption()));
                }*/
                System.out.println(writeModels);
                System.out.println("===1111111");
                
                collector.collect(writeModels);
            }
    
            
    
            System.out.println();
        }
    
        //  watermark               ,    
        //  keyBy   key            ,               
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<List<WriteModel<? extends Document>>> out) throws Exception {
            //super.onTimer(timestamp, ctx, out);
            //                  keyBy   state,        。
            gameState.clear();
            timerState.clear();
            distinctState.clear();
            System.out.println("==       :"+timestamp);
        }
    }
    
    
  • SortGameDownloads.java
  • import com.mongodb.client.model.UpdateOptions;
    import org.bson.Document;
    
    import java.util.Date;
    
    /**
     * @Author: fseast
     * @Date: 2020/3/26   5:24
     * @Description:
     */
    public class SortGameDownloads implements Comparable , IMongoUpdate {
        private Long time;
        private String gameId;
        private Integer downloads;
        private Integer top = -1;
    
        public SortGameDownloads(Long time, String gameId, Integer downloads) {
            this.time = time;
            this.gameId = gameId;
            this.downloads = downloads;
        }
    
        public Long getTime() {
            return time;
        }
    
        public void setTime(Long time) {
            this.time = time;
        }
    
        public String getGameId() {
            return gameId;
        }
    
        public void setGameId(String gameId) {
            this.gameId = gameId;
        }
    
        public Integer getDownloads() {
            return downloads;
        }
    
        public void setDownloads(Integer downloads) {
            this.downloads = downloads;
        }
    
        public Integer getTop() {
            return top;
        }
    
        public void setTop(Integer top) {
            this.top = top;
        }
    
        @Override
        public String toString() {
            return "SortGameDownloads{" +
                    "time=" + time +
                    ", gameId='" + gameId + '\'' +
                    ", downloads=" + downloads +
                    ", top=" + top +
                    '}';
        }
    
        @Override
        public int compareTo(Object o) {
            if (o instanceof SortGameDownloads){
                SortGameDownloads sort = (SortGameDownloads) o;
                //       ,    id  
                int num = -(this.downloads - sort.downloads);
                if (num == 0){
                    return this.gameId.compareTo(sort.gameId);
                }
                return num;
            }
            return 0;
        }
    
        @Override
        public Document getQueryDoc() {
            Document query  = new Document();
            //query.append("game_id",gameId);
            query.append("time",new Date(time * 1000L));
            
            return query;
        }
    
        @Override
        public Document getUpdateDoc() {
            Document inc = new Document();
            inc.append("downloads",downloads);
            inc.append("top",top);
            inc.append("game_id",gameId);
            return new Document("$set",inc);
        }
    
        @Override
        public UpdateOptions getOption() {
            return new UpdateOptions().upsert(true);
        }
        
        //    
        /*public Document deleteFilter(){
            Document delete = new Document();
            delete.append("time",new Date(time * 1000L));
            return delete;
        }*/
    }
    
    
  • IMongoUpdate.java
  • import com.mongodb.client.model.UpdateOptions;
    import org.bson.Document;
    
    
    public interface IMongoUpdate {
    
    	Document getQueryDoc();
    	Document getUpdateDoc();
    	UpdateOptions getOption();
    
    }