State Backends

4339 ワード

テキストリンク
Data Stream APIを使用して作成されたプログラムは、通常、異なる形式でステータスを保持します.
  • トリガ状態格納
  • まで、ウィンドウ内で要素を収集または集約する.
  • 変換関数は、key/value状態インタフェースを使用して要素
  • を格納することができる.
  • 変換関数は、CheckpointedFunctionインタフェースを実装して、それらのローカル変数が誤りを許容することができる.

  • 「Streams APIガイド」の「ステータス」セクションを参照してください.
    checkpointがアクティブ化されると、ステータスはcheckpointに永続化され、データ損失やシームレスなリカバリを防止します.ステータスが内部でどのように組織され、どのように永続化されるかは、選択したステータスバックエンドに依存します.
    オプションのステータスバックエンド
    Flinkの内部には、これらのステータスバックエンドがあります.
  • MemoryStateBackend
  • FsStateBackend
  • RocksDBStateBackend

  • 他の構成がない場合は、MemoryStateBackendが使用されます.
    MemoryStateBackend
    MemoryStateBackendは内部のデータをJavaスタックに保存します.Key/value状態とウィンドウオペレータは,格納値,トリガなどのハッシュテーブルを持つ.このステータスバックエンドは、checkpointを実行すると、現在のステータスをスナップショットし、checkpoint ACKメッセージの一部としてJobManager(master)に送信し、そのスタックに格納します.MemoryStateBackendでは、非同期スナップショットを使用する方法を設定できます.非同期スナップショットを使用してパイプの詰まりを回避することを強く奨励しますが、これは新しい特性であり、デフォルトでは有効ではありません.この状態を有効にするために、ユーザは、MemoryStateBackendの初期化時に、コンストラクション関数内の対応するブール識別子をtrueに設定することができる.
        new MemoryStateBackend(MAX_MEM_STATE_SIZE, true);
    

    MemoryStateBackendの限界:
  • 単一状態のサイズはデフォルトで最大5 MBです.この値はMemoryStateBackendコンストラクション関数によって増加することができる.
  • 構成の最大状態サイズにかかわらず、状態のサイズはakkaフレームサイズ(Configuration参照)
  • を超えてはならない.
  • 集約状態は、JobManagerのメモリに
  • を格納する必要があります.
    MemoryStateBackend:
  • ローカル開発およびデバッグ
  • は、record-at-a-time関数(Map,FlatMap,Filter,...)のみからなる小さな状態のジョブのみである.Kafka消費者は非常に小さな状態しか必要ありません.

  • FsStateBackend
    FsStateBackendでは、ファイルシステムのURL(タイプ、アドレス、パス)を使用します.たとえば、「hdfs://namenode:40010/flink/checkpoints」または「file:///data/flink/checkpoints”.
    FsStateBackendは、TaskManagerのメモリにin-flightデータを格納します.checkpointを行うと、ステータススナップショットが構成されたファイルシステムとディレクトリに書き込まれます.最小メタデータは、JobManagerのメモリに格納されます(または、高可用性モードでは、メタデータcheckpointに格納されます).
    FsStateBackendのデフォルトでは、書き込みステータスcheckpointで処理パイプがブロックされないように、非同期スナップショットが使用されます.このプロパティを無効にするには、MemoryStateBackendを初期化するときに、コンストラクション関数の対応するブールIDをfalseに設定します.たとえば、次のようにします.
        new FsStateBackend(path, false);
    

    FsStateBackendは次のように適用されます.
  • 大きな状態、長いウィンドウ、大きなkey/value状態を有するジョブ
  • すべての高可用性設定
  • RocksDBStateBackend
    RocksDBSStateBackendでは、ファイルシステムのURL(タイプ、アドレス、パス)を使用します.たとえば、「hdfs://namenode:40010/flink/checkpoints」または「file:///data/flink/checkpoints”.
    RocksDBSStateBackendはin-flightデータをRocksDBデータベースに格納し、TaskManagerのdataディレクトリに格納します(デフォルト).checkpointの場合、RocksDBデータベース全体が構成されたファイルシステムとディレクトリの下にcheckpointされます.最小メタデータは、JobManagerのメモリに格納されます(または、高可用性モードでは、メタデータcheckpointに格納されます).
    RocksDBSStateBackendは常に非同期スナップショットを実行します.
    RocksDBSStateBackendの制限:
  • RocksDBとしてのJNIブリッジAPIはbyte[]に基づいており、各keyおよびvalueの最大サポートサイズは2^31バイトである.重要:RocksDBでマージ操作を使用した状態(例えば、ListState)は、値のsizeが2^31バイトよりも大きく、次回の検索で失敗することがあります.これは現在のRocksDB JNIの制限です.

  • RocksDBSStateBackendは次のように使用できます.
  • 大きな状態、長いウィンドウ、大きなkey/value状態を有するジョブ
  • すべての高可用性設定
  • 注意:保持できる状態の数は、ディスクの空き領域のサイズに限られます.これにより、メモリに状態を保存するFsStateBackendよりも、非常に大きな状態を維持できます.しかしながら、これは、ステータスバックエンドのスループットよりも、達成可能な最大のスループットが低いことを意味する.
    RocksDBSStateBackendは、現在、インクリメンタルcheckpointを提供する唯一のステータスバックエンドです(ここを参照).
    ステータスバックエンドの構成
    指定しない場合は、デフォルトのステータスバックエンドはJobManagerです.クラスタ内のすべてのジョブにデフォルト以外のステータスバックエンドを作成する場合は、flink-conf.yamlで新しいデフォルトバックエンドを指定できます.デフォルトのステータスバックエンドは、以下に示すように、ジョブごとに上書きすることができる.
    ジョブ・レベルのステータス・バックエンドの設定
    作業の状態後端は、以下の例に示すように、作業中のStreamExecutionEnvironmentにより設定される.
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
    

    デフォルトのバックエンドの設定
    デフォルトのステータスバックエンドは、flink-conf.yamlstate.backendの値を設定することによって指定できます.
    可能な構成項目はjobmanager(MemoryStateBackend)、filesystem(FsStateBackend)、rocksdb(RocksDBSStateBackend)、またはステータスバックエンドファクトリFsStateBackendFactoryを実現したクラスの完全限定クラス名です.たとえば、RocksDBSStateBackendはorg.apache.flink.contrib.streaming.state.RocksDBStateBackendFactoryに設定されています.
    プロファイルの例は次のとおりです.
    # The backend that will be used to store operator state checkpoints
    
    state.backend: filesystem
    
    # Directory for storing checkpoints
    
    state.backend.fs.checkpointdir: hdfs://namenode:40010/flink/checkpoints