FlinkのTriggerとEvector

10943 ワード

一、概説
Flinkでevent-timeモードを使用する場合、デフォルトで提供されるwindowには、TumblingEventTimeWindows、SlidingEventTimeWindows、EventTimeSessionWindowなどがあります.これらはwindow operatorの一部であり、window assignerと呼ばれます.Windows operatorには4つのコンポーネントが含まれており、window assignerのほか、trigger、evictor、window processも含まれています.その作用は以下の通りである.
  • window assignerデータストリームのデータがどのwindow
  • に属するかを示す
  • triggerは、どの条件下でwindow計算をトリガするかを示し、データの処理時の時間およびイベントの特定の属性、
  • に基づいている.
  • evictorオプションコンポーネントは、windowが計算を実行する前または後に、
  • を使用するなど、windowのデータを削除します.
  • globalWindowの場合、このwindowのデフォルトtriggerは永遠にトリガーされないため、カスタムtriggerもevictorも実現し、計算済みのデータの一部を削除する必要があります.Windows process flinkのデフォルトにはReduceFunction,AggragateFunctionがあります.Windows ProcessFunction
  • をカスタマイズすることもできます.
    二、Triggerトリガ
  • ウィンドウトリガは、ウィンドウがウィンドウ内の要素をいつウィンドウ関数で処理するかを決定する.各ウィンドウディスペンサには、デフォルトのトリガがあります.
  • TriggerResult 4つの値:CONTINUE、FIRE、FIRE_AND_PURGE、PURGE; FIRE、FIRE_AND_PURGE区別:FIREトリガ計算が空でないウィンドウデータ、FIRE_AND_PURGE:計算をトリガーし、ウィンドウデータをクリアする;後のFunctionなどがユーザ自身のインクリメンタルメンテナンス状態を計算する場合、インクリメンタルデータのみを受け入れることができる場合はFIRE_AND_PURGE;
  • FIRE以降のFunctionではウィンドウ全体のデータを受けFIRE_AND_PURGEはインクリメンタルデータしか受信しません.特に、一部の大きなウィンドウのビッグデータ量のケースでは、データをクリーンアップしないとoom
  • になる可能性があります.
  • Flinkには、EventTimeTriggerウィンドウのデフォルトのTriigerが組み込まれており、watermarksメトリックのイベント時間の進捗に基づいてトリガーされます.ProcessingTimeTriggerウィンドウのデフォルトのTriigerは、処理時間に基づいてトリガーされます.CountTriggerは、ウィンドウ内の要素の数が所定の制限を超えるとトリガーされ、FIREはデータをクリーンアップしません.ContinuousEventTimeTriggerは一定時間ごとにトリガーされ、FIREはデータをクリーンアップしません.PurgingTriggerは、別のトリガのパラメータとして、消去機能(transforms it into a purging one)に変換します.

  • 次のコードは、各データが計算をトリガーし、ウィンドウデータを空にすることを示しています.
    class UtcTrigger() extends Trigger[MyTime, TimeWindow] {
      //             
      override def onElement(t: MyTime, l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
        println("   onElement")
        TriggerResult.FIRE_AND_PURGE
      }
    
      //                 
      override def onProcessingTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
        println("   onProcessingTime")
        TriggerResult.CONTINUE
      }
    
      //                 
      override def onEventTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
        println("   onEventTime")
        TriggerResult.FIRE_AND_PURGE
      }
    
      //   (removal)     
      override def clear(w: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = {
        println("   clear")
      }
    }
    

    三、Evictor駆逐器
  • Flinkウィンドウモデルでは、ウィンドウディスペンサとトリガのほかにオプションの駆逐器(Evector)を指定することもできます.evictor(...)メソッドを使用して完了できます.
  • 駆逐器は、トリガがトリガーされた後、ウィンドウ関数が使用される前または後にウィンドウから要素を消去することができる.evictBefore()は、ウィンドウ関数の前に使用します.evictAfter()は、ウィンドウ関数の後に使用されます.ウィンドウ関数を使用する前に追い出された要素は処理されません.
  • Flinkには、3つの内蔵駆逐器があります.CountEveictor:ウィンドウでユーザーが指定した数の要素を維持し、ユーザーが指定した数より多い場合は、ウィンドウバッファの先頭から余分な要素を破棄します.DeltaEveictor:DeltaFunctionとしきい値を使用して、ウィンドウバッファ内の最後の要素と残りの各要素との差を計算し、しきい値以上の要素を削除します.TimeEveictor:ミリ秒単位の時間間隔(interval)をパラメータとして、与えられたウィンドウについて、要素の最大のタイムスタンプmax_tsを見つけ、max_ts-intervalより小さいタイムスタンプのすべての要素を削除します.
  • デフォルトでは、ウィンドウ関数の前に組み込まれたすべての駆逐器が使用されます.ウィンドウ内のすべての要素がウィンドウ計算の前に駆逐器に渡される必要があるため、駆逐器を指定すると、プレ集約(pre-aggregation)を回避できます.
  • Flinkは、ウィンドウ内の要素の順序を保証しません.これは、駆逐器がウィンドウの先頭から要素を除去できるが、これらの要素が先に着くか後に着くかは必ずしもそうではないことを意味する.
  • class MyEvictor() extends Evictor[MyTime, TimeWindow] {
      override def evictBefore(iterable: lang.Iterable[TimestampedValue[MyTime]], i: Int, w: TimeWindow, evictorContext: Evictor.EvictorContext): Unit = {
        val ite: util.Iterator[TimestampedValue[MyTime]] = iterable.iterator()
        while (ite.hasNext) {
          val elment: TimestampedValue[MyTime] = ite.next()
          //                
          println("         :" + elment.getTimestamp)
          //          
          if (elment.getValue.timestamp <= 0) {
            ite.remove()
          }
        }
      }
    
      override def evictAfter(iterable: lang.Iterable[TimestampedValue[MyTime]], i: Int, w: TimeWindow, evictorContext: Evictor.EvictorContext): Unit = {
    
      }
    }