Flink Data Stream Manager(管理)Operator Stateの簡単な使用

18559 ワード

Flink Data Stream Manager(管理)Operator Stateの簡単な使用
Manager(管理)Operator Stateを使用するには、CheckpointedFunctionインタフェースまたはListCheckpointedインタフェースを実装する必要があります.
一、CheckpointFunctionCheckpointedFunctionインタフェースは、異なる再配布Non-Keyed Stateを有するアクセス状態を提供する.彼は2つの方法を実現する必要があります.
void snapshotState(FunctionSnapshotContext context) throws Exception;

void initializeState(FunctionInitializationContext context) throws Exception;

checkpointを実行する必要があるたびに、snapshotState()メソッドが呼び出されます.initializeState()メソッドは、関数が最初に初期化されたとき、または初期のcheckpointから復元されたときに呼び出される.
現在、リストスタイルの管理Operator Stateがサポートされています.ステータスは、互いに独立したListのシーケンス化されたオブジェクトであり、再スケールで再割り当てされることが予想される.すなわち、これらのオブジェクトは、非キー状態を再割り当てできる最も微細な粒度である.ステータス・アクセス・メソッドに基づいて、次の再配布スキームが定義されます.
  • **偶発分裂再割当:**各演算子はステータス要素のリストを返します.ステータス全体は論理的にすべてのリストの直列です.リカバリ/再配布時に、リストは平均的に並列演算子と同じ数のサブリストに分割されます.各演算子は、空または1つ以上の要素を含むサブリストを取得します.例えば、並列性1が使用される場合、演算子のチェックポイント状態は要素element1およびelement2を含み、並列性が2に増加すると、element1は最終的に演算子インスタンス0パーティションにあり、element2は演算子インスタンス1パーティションに移動する.
  • **統合再割り当て:**各演算子はステータス要素のリストを返します.ステータス全体は論理的にすべてのリストの直列です.各演算子は、リカバリ/再割り当て時に完全なステータス要素のリストを取得します.

  • 次に、要素を外部世界に送信する前にバッファリングするためのステータスのある例SinkFunctionを示す.基本的な偶数分割再配布リストのステータスを示します.
    public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, CheckpointedFunction {
    
        private final int threshold;
    
        private transient ListState<Tuple2<String, Integer>> checkpointedState;
    
        private List<Tuple2<String, Integer>> bufferedElements;
    
        public BufferingSink(int threshold) {
            this.threshold = threshold;
            this.bufferedElements = new ArrayList<>();
        }
    
        @Override
        public void invoke(Tuple2<String, Integer> value, Context contex) throws Exception {
            bufferedElements.add(value);
            if (bufferedElements.size() == threshold) {
                for (Tuple2<String, Integer> element : bufferedElements) {
                    // send it to the sink
                }
                bufferedElements.clear();
            }
        }
    
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            checkpointedState.clear();
            for (Tuple2<String, Integer> element : bufferedElements) {
                checkpointedState.add(element);
            }
        }
    
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            ListStateDescriptor<Tuple2<String, Integer>> descriptor = new ListStateDescriptor<>("buffered-elements", TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {
            }));
    
            checkpointedState = context.getOperatorStateStore().getListState(descriptor);
    
            if (context.isRestored()) {// isRestored:          
                for (Tuple2<String, Integer> element : checkpointedState.get()) {
                    bufferedElements.add(element);
                }
            }
        }
    }
    

    リストア時にコンビネーション再配布スキームのリストステータスを使用する場合は、CheckpointedFunctionメソッドを使用して取得できます.getUnoinListState(descriptor)の場合、基本的な双晶分裂再分配スキームが使用されることを示すだけである.
    二、ListCheckpointedgetListState(descriptor)インターフェースはCheckpointedFunctionの変種である.リカバリまたは分裂した再割り当てスキームのリスト式の状態のみをサポートします.2つの方法が必要です.
    List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
    
    void restoreState(List<T> state) throws Exception;
    
    ListCheckpointedの操作者が対象に応じてチェックポイントのリストを返し、snapshotState()は処理後にこのようなリストを復元する.ステータスが再パーティション化されていない場合は、restoreStateCollections.singletonList(MY_STATE)にいつでも戻ることができます.
    以下、カスタムSourceFunctionを使用してListCheckpointedインタフェースを実装します.
    public class CustomSourceFunction extends RichParallelSourceFunction<Long> implements ListCheckpointed<Long> {
    
        /**
         * current offset for exactly once semantics
         */
        private Long offset = 0L;
    
        /**
         * flag for job cancellation
         */
        private volatile boolean isRunning = true;
    
        @Override
        public void run(SourceContext<Long> ctx) {
            final Object lock = ctx.getCheckpointLock();
    
            while (isRunning) {
                // output and state update are atomic
                synchronized (lock) {
                    ctx.collect(offset);
                    offset += 1;
                }
            }
        }
    
        @Override
        public void cancel() {
            isRunning = false;
        }
    
        @Override
        public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {
            return Collections.singletonList(offset);
        }
    
        @Override
        public void restoreState(List<Long> state) {
            for (Long s : state) {
                offset = s;
            }
        }
    }