Flink Data Stream Manager(管理)Operator Stateの簡単な使用
18559 ワード
Flink Data Stream Manager(管理)Operator Stateの簡単な使用
Manager(管理)
一、CheckpointFunction
checkpointを実行する必要があるたびに、
現在、リストスタイルの管理Operator Stateがサポートされています.ステータスは、互いに独立した**偶発分裂再割当:**各演算子はステータス要素のリストを返します.ステータス全体は論理的にすべてのリストの直列です.リカバリ/再配布時に、リストは平均的に並列演算子と同じ数のサブリストに分割されます.各演算子は、空または1つ以上の要素を含むサブリストを取得します.例えば、並列性1が使用される場合、演算子のチェックポイント状態は要素 **統合再割り当て:**各演算子はステータス要素のリストを返します.ステータス全体は論理的にすべてのリストの直列です.各演算子は、リカバリ/再割り当て時に完全なステータス要素のリストを取得します.
次に、要素を外部世界に送信する前にバッファリングするためのステータスのある例
リストア時にコンビネーション再配布スキームのリストステータスを使用する場合は、
二、ListCheckpointed
以下、カスタムSourceFunctionを使用してListCheckpointedインタフェースを実装します.
Manager(管理)
Operator State
を使用するには、CheckpointedFunction
インタフェースまたはListCheckpointed
インタフェースを実装する必要があります.一、CheckpointFunction
CheckpointedFunction
インタフェースは、異なる再配布Non-Keyed State
を有するアクセス状態を提供する.彼は2つの方法を実現する必要があります.void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
checkpointを実行する必要があるたびに、
snapshotState()
メソッドが呼び出されます.initializeState()
メソッドは、関数が最初に初期化されたとき、または初期のcheckpointから復元されたときに呼び出される.現在、リストスタイルの管理Operator Stateがサポートされています.ステータスは、互いに独立した
List
のシーケンス化されたオブジェクトであり、再スケールで再割り当てされることが予想される.すなわち、これらのオブジェクトは、非キー状態を再割り当てできる最も微細な粒度である.ステータス・アクセス・メソッドに基づいて、次の再配布スキームが定義されます.element1
およびelement2
を含み、並列性が2に増加すると、element1
は最終的に演算子インスタンス0パーティションにあり、element2
は演算子インスタンス1パーティションに移動する.次に、要素を外部世界に送信する前にバッファリングするためのステータスのある例
SinkFunction
を示す.基本的な偶数分割再配布リストのステータスを示します.public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, CheckpointedFunction {
private final int threshold;
private transient ListState<Tuple2<String, Integer>> checkpointedState;
private List<Tuple2<String, Integer>> bufferedElements;
public BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Tuple2<String, Integer> value, Context contex) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Tuple2<String, Integer> element : bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
for (Tuple2<String, Integer> element : bufferedElements) {
checkpointedState.add(element);
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Tuple2<String, Integer>> descriptor = new ListStateDescriptor<>("buffered-elements", TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {
}));
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
if (context.isRestored()) {// isRestored:
for (Tuple2<String, Integer> element : checkpointedState.get()) {
bufferedElements.add(element);
}
}
}
}
リストア時にコンビネーション再配布スキームのリストステータスを使用する場合は、
CheckpointedFunction
メソッドを使用して取得できます.getUnoinListState(descriptor)
の場合、基本的な双晶分裂再分配スキームが使用されることを示すだけである.二、ListCheckpointed
getListState(descriptor)
インターフェースはCheckpointedFunctionの変種である.リカバリまたは分裂した再割り当てスキームのリスト式の状態のみをサポートします.2つの方法が必要です.List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;
ListCheckpointed
の操作者が対象に応じてチェックポイントのリストを返し、snapshotState()
は処理後にこのようなリストを復元する.ステータスが再パーティション化されていない場合は、restoreState
のCollections.singletonList(MY_STATE)
にいつでも戻ることができます.以下、カスタムSourceFunctionを使用してListCheckpointedインタフェースを実装します.
public class CustomSourceFunction extends RichParallelSourceFunction<Long> implements ListCheckpointed<Long> {
/**
* current offset for exactly once semantics
*/
private Long offset = 0L;
/**
* flag for job cancellation
*/
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<Long> ctx) {
final Object lock = ctx.getCheckpointLock();
while (isRunning) {
// output and state update are atomic
synchronized (lock) {
ctx.collect(offset);
offset += 1;
}
}
}
@Override
public void cancel() {
isRunning = false;
}
@Override
public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {
return Collections.singletonList(offset);
}
@Override
public void restoreState(List<Long> state) {
for (Long s : state) {
offset = s;
}
}
}