Flink Data Streamブロードキャストステータスモード

34560 ワード

Flink Data Streamブロードキャストステータスモード
Stateを用いてOperator Stateを記述し,リカバリ時に並列度再割当Operator State(偶発分裂再割当方式)を変更したり,Union方式(連合再配布)を用いて並列タスクを復元したりすることができる.
Operator Stateにはブロードキャスト状態モード(Broadcast State)もあります.
ブロードキャスト状態の導入は、1つのストリームからのいくつかのデータがすべての下流タスクにブロードキャストされる必要がある例をサポートするためであり、ローカルに格納され、別のストリーム上のすべての受信要素を処理するために使用される.ブロードキャスト状態は自然フィッティングの例として現れることができ,ルールのセットを含む低スループットストリームを想像することができ,別のストリームからのすべての要素を評価することを望んでいる.
上記のタイプの使用例を考慮すると、ブロードキャスト状態が他のキャリア状態と異なる点は、以下の点である.
  • 彼はMapState
  • です
  • 特定のOperatorを入力ブロードキャストストリームおよび非ブロードキャストストリーム
  • としてのみ適用する.
  • のような事業者は、異なる名称の複数のブロードキャスト状態を有することができる.
  • Keyed StreamまたはNon-Keyed Streamを1つのBroadcastStreamに接続し、非ブロードキャストストリームは、connect()を呼び出すことによって完了し、そのBroadcastStreamをパラメータとして使用することができる.これはBroadcastConnectedStreamを返し、process()の方法で論理を処理することができます.Keyed Stream接続ブロードキャストストリームであれば、process()のパラメータはKeyedBroadcastProcessFunctionである必要があります.Non-Keyed Streamがブロードキャストストリームに接続されている場合、process()のパラメータはBroadcastProcessFunctionです.
    1.Keyed Stream接続ブロードキャストストリームの例:
    public class KeyedBroadcastStream {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            Properties p = new Properties();
            p.setProperty("bootstrap.servers", "localhost:9092");
    
            SingleOutputStreamOperator<User> user = env
                    .addSource(new FlinkKafkaConsumer010<String>("user", new SimpleStringSchema(), p))
                    .map((MapFunction<String, User>) value -> new Gson().fromJson(value, User.class));
    
            user.print("user: ");
    
            KeyedStream<Order, String> order = env
                    .addSource(new FlinkKafkaConsumer010<String>("order", new SimpleStringSchema(), p))
                    .map((MapFunction<String, Order>) value -> new Gson().fromJson(value, Order.class))
                    .keyBy((KeySelector<Order, String>) value -> value.userId);
    
            order.print("order: ");
    
            MapStateDescriptor<String, User> descriptor = new MapStateDescriptor<String, User>("user", String.class, User.class);
            org.apache.flink.streaming.api.datastream.BroadcastStream<User> broadcast = user.broadcast(descriptor);
    
            order
                    .connect(broadcast)
                    .process(new KeyedBroadcastProcessFunction<String, Order, User, String>() {
                        @Override
                        public void processElement(Order value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
                            ReadOnlyBroadcastState<String, User> broadcastState = ctx.getBroadcastState(descriptor);
                            //          key value
                            User user = broadcastState.get(value.userId);
                            if (user != null) {
                                Tuple8<String, String, String, Long, String, String, String, Long> result = new Tuple8<>(
                                        value.userId,
                                        value.orderId,
                                        value.price,
                                        value.timestamp,
                                        user.name,
                                        user.age,
                                        user.sex,
                                        user.createTime
                                );
                                String s = result.toString();
                                out.collect(s);
                            }
                        }
    
                        @Override
                        public void processBroadcastElement(User value, Context ctx, Collector<String> out) throws Exception {
                            BroadcastState<String, User> broadcastState = ctx.getBroadcastState(descriptor);
                            broadcastState.put(value.userId, value);
                        }
                    })
                    .print("");
    
            env.execute("broadcast: ");
        }
    }
    

    2、Non-Keyd Stream接続ブロードキャストストリーム
    public class BroadcastStream {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            Properties p = new Properties();
            p.setProperty("bootstrap.servers", "localhost:9092");
    
            SingleOutputStreamOperator<User> user = env
                    .addSource(new FlinkKafkaConsumer010<String>("user", new SimpleStringSchema(), p))
                    .map(new MapFunction<String, User>() {
                        @Override
                        public User map(String value) throws Exception {
                            return new Gson().fromJson(value, User.class);
                        }
                    });
    
            user.print("user: ");
    
            SingleOutputStreamOperator<Order> order = env
                    .addSource(new FlinkKafkaConsumer010<String>("order", new SimpleStringSchema(), p))
                    .map(new MapFunction<String, Order>() {
                        @Override
                        public Order map(String value) throws Exception {
                            return new Gson().fromJson(value, Order.class);
                        }
                    });
    
            order.print("order: ");
    
            MapStateDescriptor<String, User> descriptor = new MapStateDescriptor<String, User>("user", String.class, User.class);
            org.apache.flink.streaming.api.datastream.BroadcastStream<User> broadcast = user.broadcast(descriptor);
            BroadcastConnectedStream<Order, User> connect = order.connect(broadcast);
    
            connect
                    .process(new BroadcastProcessFunction<Order, User, String>() {
                        @Override
                        public void processElement(Order value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
                            ReadOnlyBroadcastState<String, User> broadcastState = ctx.getBroadcastState(descriptor);
                            //          key value
                            User user = broadcastState.get(value.userId);
                            if (user != null) {
                                Tuple8<String, String, String, Long, String, String, String, Long> result = new Tuple8<>(
                                        value.userId,
                                        value.orderId,
                                        value.price,
                                        value.timestamp,
                                        user.name,
                                        user.age,
                                        user.sex,
                                        user.createTime
                                );
                                String s = result.toString();
                                out.collect(s);
                            }
                        }
    
                        @Override
                        public void processBroadcastElement(User value, Context ctx, Collector<String> out) throws Exception {
                            BroadcastState<String, User> broadcastState = ctx.getBroadcastState(descriptor);
                            broadcastState.put(value.userId, value);
                        }
                    })
                    .print("result: ");
    
            env.execute("broadcast: ");
        }
    }