Flinkのブロードキャスト変数とブロードキャストステータス
6972 ワード
1、dataStreamingのbroadcast
要素をすべてのパーティションにブロードキャストすると、データは繰り返し処理されます.
dataStream.broadcast()
2、機械レベルの放送
ブロードキャスト変数を使用すると、プログラマはtasksに変数のコピーを転送するのではなく、各マシンに読み取り専用のキャッシュ変数を1つ保持できます.ブロードキャスト変数が作成されると、クラスタノードに複数回渡す必要がなく、クラスタ内の任意のfunctionで実行できます.また、各ノードが取得した値が一致することを保証するために、ブロードキャスト変数を変更するべきではないことを覚えておいてください.一言で説明すると、共通の共有変数と理解でき、datasetデータセットをブロードキャストし、異なるtaskがノード上で取得でき、このデータは各ノード上に1部しか存在しない.broadcastを使用しない場合は、各ノードのtaskごとにdatasetデータセットをコピーする必要があり、メモリが浪費されます(つまり、1つのノードに複数のdatasetデータが存在する可能性があります).
3、放送状態
ブロードキャスト・ステータスは、2つのイベント・ストリームを特定の方法で組み合わせて処理するために使用することができる.最初のストリームのイベントは、キャリアのすべての並列インスタンスにブロードキャストされ、これらのインスタンスはそれらを状態に維持する.
4、使い方
(1)バッチ処理
計算結果:
python,18 java,17 java,17 kafka,null scala,20 redis,null
(2)ブロードキャストストリームを用いて,データストリームの動的構成を実現する(taskSlotはメモリ分離であるため,broadcastはTaskslotに1部ある)
要素をすべてのパーティションにブロードキャストすると、データは繰り返し処理されます.
dataStream.broadcast()
2、機械レベルの放送
ブロードキャスト変数を使用すると、プログラマはtasksに変数のコピーを転送するのではなく、各マシンに読み取り専用のキャッシュ変数を1つ保持できます.ブロードキャスト変数が作成されると、クラスタノードに複数回渡す必要がなく、クラスタ内の任意のfunctionで実行できます.また、各ノードが取得した値が一致することを保証するために、ブロードキャスト変数を変更するべきではないことを覚えておいてください.一言で説明すると、共通の共有変数と理解でき、datasetデータセットをブロードキャストし、異なるtaskがノード上で取得でき、このデータは各ノード上に1部しか存在しない.broadcastを使用しない場合は、各ノードのtaskごとにdatasetデータセットをコピーする必要があり、メモリが浪費されます(つまり、1つのノードに複数のdatasetデータが存在する可能性があります).
3、放送状態
ブロードキャスト・ステータスは、2つのイベント・ストリームを特定の方法で組み合わせて処理するために使用することができる.最初のストリームのイベントは、キャリアのすべての並列インスタンスにブロードキャストされ、これらのインスタンスはそれらを状態に維持する.
4、使い方
(1)バッチ処理
public static void main(String[] args) throws Exception{
ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
//
ArrayList> broadData=new ArrayList<>();
broadData.add(new Tuple2<>("python",18));
broadData.add(new Tuple2<>("scala",20));
broadData.add(new Tuple2<>("java",17));
DataSource> dataBroadSource = env.fromCollection(broadData);
DataSet
計算結果:
python,18 java,17 java,17 kafka,null scala,20 redis,null
(2)ブロードキャストストリームを用いて,データストリームの動的構成を実現する(taskSlotはメモリ分離であるため,broadcastはTaskslotに1部ある)
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource filterData = env.addSource(new RichSourceFunction() {
private boolean isRunning = true;
//
String[] data = new String[]{"java", "python", "scala"};
/**
* , 1 ,
* @param cxt
* @throws Exception
*/
@Override
public void run(SourceContext cxt) throws Exception {
int size = data.length;
while (isRunning) {
TimeUnit.MINUTES.sleep(1);
int seed = (int) (Math.random() * size);
//
cxt.collect(data[seed]);
System.out.println(" :" + data[seed]);
}
}
@Override
public void cancel() {
isRunning = false;
}
});
//1、 :
MapStateDescriptor configFilter = new MapStateDescriptor("configFilter", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
//2、 filterData
BroadcastStream broadcastConfig = filterData.setParallelism(1).broadcast(configFilter);
//
DataStreamSource dataStream = env.addSource(new RichSourceFunction () {
private boolean isRunning = true;
//
String[] data = new String[]{
"java ",
"python , ",
"php web ",
"scala , ",
"go 、 、 , "
};
/**
* , 3s
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext ctx) throws Exception {
int size = data.length;
while (isRunning) {
TimeUnit.SECONDS.sleep(3);
int seed = (int) (Math.random() * size);
//
ctx.collect(data[seed]);
System.out.println(" :" + data[seed]);
}
}
@Override
public void cancel() {
isRunning = false;
}
});
//3、dataStream ( connect )
DataStream result = dataStream.connect(broadcastConfig).process(new BroadcastProcessFunction() {
//
private String keyWords = null;
/**
* open
*
* 4、 keyWords , :java.lang.NullPointerException
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
keyWords="java";
System.out.println(" keyWords:java");
}
/**
* 6、
* @param value
* @param ctx
* @param out
* @throws Exception
*/
@Override
public void processElement(String value, ReadOnlyContext ctx, Collector out) throws Exception {
if (value.contains(keyWords)) {
out.collect(" :" + value + ", : :" + keyWords);
}
}
/**
*5、
* @param value
* @param ctx
* @param out
* @throws Exception
*/
@Override
public void processBroadcastElement(String value, Context ctx, Collector out) throws Exception {
keyWords = value;
System.out.println(" :" + value);
}
});
result.print();
env.execute(StreamBroadcastDemo.class.getSimpleName());
}