Flink Data Streamブロードキャストステータスモード
34560 ワード
Flink Data Streamブロードキャストステータスモード
Stateを用いてOperator Stateを記述し,リカバリ時に並列度再割当Operator State(偶発分裂再割当方式)を変更したり,Union方式(連合再配布)を用いて並列タスクを復元したりすることができる.
Operator Stateにはブロードキャスト状態モード(Broadcast State)もあります.
ブロードキャスト状態の導入は、1つのストリームからのいくつかのデータがすべての下流タスクにブロードキャストされる必要がある例をサポートするためであり、ローカルに格納され、別のストリーム上のすべての受信要素を処理するために使用される.ブロードキャスト状態は自然フィッティングの例として現れることができ,ルールのセットを含む低スループットストリームを想像することができ,別のストリームからのすべての要素を評価することを望んでいる.
上記のタイプの使用例を考慮すると、ブロードキャスト状態が他のキャリア状態と異なる点は、以下の点である.彼は です特定のOperatorを入力ブロードキャストストリームおよび非ブロードキャストストリーム としてのみ適用する.のような事業者は、異なる名称の複数のブロードキャスト状態を有することができる.
1.Keyed Stream接続ブロードキャストストリームの例:
2、Non-Keyd Stream接続ブロードキャストストリーム
Stateを用いてOperator Stateを記述し,リカバリ時に並列度再割当Operator State(偶発分裂再割当方式)を変更したり,Union方式(連合再配布)を用いて並列タスクを復元したりすることができる.
Operator Stateにはブロードキャスト状態モード(Broadcast State)もあります.
ブロードキャスト状態の導入は、1つのストリームからのいくつかのデータがすべての下流タスクにブロードキャストされる必要がある例をサポートするためであり、ローカルに格納され、別のストリーム上のすべての受信要素を処理するために使用される.ブロードキャスト状態は自然フィッティングの例として現れることができ,ルールのセットを含む低スループットストリームを想像することができ,別のストリームからのすべての要素を評価することを望んでいる.
上記のタイプの使用例を考慮すると、ブロードキャスト状態が他のキャリア状態と異なる点は、以下の点である.
MapState
Keyed Stream
またはNon-Keyed Stream
を1つのBroadcastStream
に接続し、非ブロードキャストストリームは、connect()
を呼び出すことによって完了し、そのBroadcastStream
をパラメータとして使用することができる.これはBroadcastConnectedStream
を返し、process()
の方法で論理を処理することができます.Keyed Stream
接続ブロードキャストストリームであれば、process()
のパラメータはKeyedBroadcastProcessFunction
である必要があります.Non-Keyed Stream
がブロードキャストストリームに接続されている場合、process()
のパラメータはBroadcastProcessFunction
です.1.Keyed Stream接続ブロードキャストストリームの例:
public class KeyedBroadcastStream {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties p = new Properties();
p.setProperty("bootstrap.servers", "localhost:9092");
SingleOutputStreamOperator<User> user = env
.addSource(new FlinkKafkaConsumer010<String>("user", new SimpleStringSchema(), p))
.map((MapFunction<String, User>) value -> new Gson().fromJson(value, User.class));
user.print("user: ");
KeyedStream<Order, String> order = env
.addSource(new FlinkKafkaConsumer010<String>("order", new SimpleStringSchema(), p))
.map((MapFunction<String, Order>) value -> new Gson().fromJson(value, Order.class))
.keyBy((KeySelector<Order, String>) value -> value.userId);
order.print("order: ");
MapStateDescriptor<String, User> descriptor = new MapStateDescriptor<String, User>("user", String.class, User.class);
org.apache.flink.streaming.api.datastream.BroadcastStream<User> broadcast = user.broadcast(descriptor);
order
.connect(broadcast)
.process(new KeyedBroadcastProcessFunction<String, Order, User, String>() {
@Override
public void processElement(Order value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
ReadOnlyBroadcastState<String, User> broadcastState = ctx.getBroadcastState(descriptor);
// key value
User user = broadcastState.get(value.userId);
if (user != null) {
Tuple8<String, String, String, Long, String, String, String, Long> result = new Tuple8<>(
value.userId,
value.orderId,
value.price,
value.timestamp,
user.name,
user.age,
user.sex,
user.createTime
);
String s = result.toString();
out.collect(s);
}
}
@Override
public void processBroadcastElement(User value, Context ctx, Collector<String> out) throws Exception {
BroadcastState<String, User> broadcastState = ctx.getBroadcastState(descriptor);
broadcastState.put(value.userId, value);
}
})
.print("");
env.execute("broadcast: ");
}
}
2、Non-Keyd Stream接続ブロードキャストストリーム
public class BroadcastStream {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties p = new Properties();
p.setProperty("bootstrap.servers", "localhost:9092");
SingleOutputStreamOperator<User> user = env
.addSource(new FlinkKafkaConsumer010<String>("user", new SimpleStringSchema(), p))
.map(new MapFunction<String, User>() {
@Override
public User map(String value) throws Exception {
return new Gson().fromJson(value, User.class);
}
});
user.print("user: ");
SingleOutputStreamOperator<Order> order = env
.addSource(new FlinkKafkaConsumer010<String>("order", new SimpleStringSchema(), p))
.map(new MapFunction<String, Order>() {
@Override
public Order map(String value) throws Exception {
return new Gson().fromJson(value, Order.class);
}
});
order.print("order: ");
MapStateDescriptor<String, User> descriptor = new MapStateDescriptor<String, User>("user", String.class, User.class);
org.apache.flink.streaming.api.datastream.BroadcastStream<User> broadcast = user.broadcast(descriptor);
BroadcastConnectedStream<Order, User> connect = order.connect(broadcast);
connect
.process(new BroadcastProcessFunction<Order, User, String>() {
@Override
public void processElement(Order value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
ReadOnlyBroadcastState<String, User> broadcastState = ctx.getBroadcastState(descriptor);
// key value
User user = broadcastState.get(value.userId);
if (user != null) {
Tuple8<String, String, String, Long, String, String, String, Long> result = new Tuple8<>(
value.userId,
value.orderId,
value.price,
value.timestamp,
user.name,
user.age,
user.sex,
user.createTime
);
String s = result.toString();
out.collect(s);
}
}
@Override
public void processBroadcastElement(User value, Context ctx, Collector<String> out) throws Exception {
BroadcastState<String, User> broadcastState = ctx.getBroadcastState(descriptor);
broadcastState.put(value.userId, value);
}
})
.print("result: ");
env.execute("broadcast: ");
}
}