Flinkのブロードキャスト変数とブロードキャストステータス

6972 ワード

1、dataStreamingのbroadcast
要素をすべてのパーティションにブロードキャストすると、データは繰り返し処理されます.
dataStream.broadcast()
2、機械レベルの放送
ブロードキャスト変数を使用すると、プログラマはtasksに変数のコピーを転送するのではなく、各マシンに読み取り専用のキャッシュ変数を1つ保持できます.ブロードキャスト変数が作成されると、クラスタノードに複数回渡す必要がなく、クラスタ内の任意のfunctionで実行できます.また、各ノードが取得した値が一致することを保証するために、ブロードキャスト変数を変更するべきではないことを覚えておいてください.一言で説明すると、共通の共有変数と理解でき、datasetデータセットをブロードキャストし、異なるtaskがノード上で取得でき、このデータは各ノード上に1部しか存在しない.broadcastを使用しない場合は、各ノードのtaskごとにdatasetデータセットをコピーする必要があり、メモリが浪費されます(つまり、1つのノードに複数のdatasetデータが存在する可能性があります).
3、放送状態
ブロードキャスト・ステータスは、2つのイベント・ストリームを特定の方法で組み合わせて処理するために使用することができる.最初のストリームのイベントは、キャリアのすべての並列インスタンスにブロードキャストされ、これらのインスタンスはそれらを状態に維持する.
4、使い方
(1)バッチ処理
public static void main(String[] args) throws Exception{
    ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
    //        
    ArrayList> broadData=new ArrayList<>();
    broadData.add(new Tuple2<>("python",18));
    broadData.add(new Tuple2<>("scala",20));
    broadData.add(new Tuple2<>("java",17));
    DataSource> dataBroadSource = env.fromCollection(broadData);

    DataSet> baseData =dataBroadSource.map(new MapFunction, Map>() {
        @Override
        public Map map(Tuple2 stringIntegerTuple2) throws Exception {
            Map map=new HashMap<>();
            map.put(stringIntegerTuple2._1,stringIntegerTuple2._2);
            return map;
        }
    });
    DataSet  dataSource = env.fromElements("python", "java","java","kafka","scala","redis");

    DataSet  result =dataSource.map(new RichMapFunction() {
        Map allMap = new HashMap ();
        List> broadCastMap = new ArrayList>();
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            this.broadCastMap = getRuntimeContext().getBroadcastVariable("baseData");
            for (HashMap map : broadCastMap) {
                allMap.putAll(map);
            }
        }
        @Override
        public String map(String s) throws Exception {
            Integer age = allMap.get(s);
            return s + "," + age;
        }
    }).withBroadcastSet(baseData,"baseData");
    result.print();
}

計算結果:
python,18 java,17 java,17 kafka,null scala,20 redis,null
(2)ブロードキャストストリームを用いて,データストリームの動的構成を実現する(taskSlotはメモリ分離であるため,broadcastはTaskslotに1部ある)
public static void main(String[] args) throws Exception{
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStreamSource filterData = env.addSource(new RichSourceFunction() {
        private boolean isRunning = true;
        //     
        String[] data = new String[]{"java", "python", "scala"};
        /**
         *      , 1        ,       
         * @param cxt
         * @throws Exception
         */
        @Override
        public void run(SourceContext  cxt) throws Exception {
            int size = data.length;
            while (isRunning) {
                TimeUnit.MINUTES.sleep(1);
                int seed = (int) (Math.random() * size);
                //                 
                cxt.collect(data[seed]);
                System.out.println("       :" + data[seed]);
            }
        }
        @Override
        public void cancel() {
            isRunning = false;
        }
    });
    //1、         :
    MapStateDescriptor configFilter = new MapStateDescriptor("configFilter", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
    //2、 filterData    
    BroadcastStream broadcastConfig = filterData.setParallelism(1).broadcast(configFilter);
    //     
    DataStreamSource  dataStream = env.addSource(new RichSourceFunction () {
        private boolean isRunning = true;
        //     
        String[] data = new String[]{
                "java     ",
                "python    ,   ",
                "php web    ",
                "scala      ,            ",
                "go        、   、   ,              "
        };

        /**
         *      , 3s    
         * @param ctx
         * @throws Exception
         */
        @Override
        public void run(SourceContext  ctx) throws Exception {
            int size = data.length;
            while (isRunning) {
                TimeUnit.SECONDS.sleep(3);
                int seed = (int) (Math.random() * size);
                //                 
                ctx.collect(data[seed]);
                System.out.println("       :" + data[seed]);
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
        }
    });

    //3、dataStream          (  connect    )
    DataStream result = dataStream.connect(broadcastConfig).process(new BroadcastProcessFunction() {

        //      
        private String keyWords = null;

        /**
         * open        
         *             
         * 4、  keyWords    ,     :java.lang.NullPointerException
         * @param parameters
         * @throws Exception
         */
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            keyWords="java";
            System.out.println("   keyWords:java");
        }

        /**
         * 6、        
         * @param value
         * @param ctx
         * @param out
         * @throws Exception
         */
        @Override
        public void processElement(String value, ReadOnlyContext ctx, Collector out) throws Exception {
            if (value.contains(keyWords)) {
                out.collect("    :" + value + ",   :       :" + keyWords);
            }
        }

        /**
         *5、          
         * @param value
         * @param ctx
         * @param out
         * @throws Exception
         */
        @Override
        public void processBroadcastElement(String value, Context ctx, Collector  out) throws Exception {
            keyWords = value;
            System.out.println("     :" + value);
        }
    });
    result.print();
    env.execute(StreamBroadcastDemo.class.getSimpleName());
}