同時のタスクの数百万のタイミングホイールを実装します.


遅延のあるプロセスタスクの多くを含むシステムに対して.我々はタスクを処理するためにタイマーの多くを使用する場合は、アイドルGoroutinesとたくさんのメモリが消費されます.グルテンの多くはまた、それらをスケジュールするより多くのCPUを消費する.
この記事ではTimeWheelgo-zero , これは開発者が多くの遅延タスクをスケジュールすることができます.遅延タスクについては、2つのオプションが通常利用可能です.
  • Timer , タイマーは1回限りのタスクに使用されます.それは将来の単一のイベントを表します.あなたはどのくらい待つのタイマーを教えてください、それはその時に通知されるチャンネルを提供します.
  • TimingWheel , これは、タスクグループの配列を維持し、各スロットは、格納されたタスクのチェーンを維持します.実行が開始されると、タイマは所定の間隔で1スロットのタスクを実行する.
  • オプション2のタスクのメンテナンスを減らすpriority queue O(nlog(n)) to bidirectional linked table O(1) , また、タスクの実行には、ある時点でタスクのポーリングだけが必要ですO(N) , 元素を入れて取り除くO(nlog(n)) , 優先キューの場合と同様.
    我々の自身の使用を見ましょうTimingWheel インgo-zero :

    timingwheel in cache


    使い始めましょうTimingWheelcache of collection .
    timingWheel, err := NewTimingWheel(time.Second, slots, func(k, v interface{}) {
      key, ok := k.(string)
      if !ok {
        return
      }
      cache.Del(key)
    })
    if err ! = nil {
      return nil, err
    }
    
    cache.timingWheel = timingWheel
    
    これはcache どちらが初期化TimingWheel 期限切れのキーをきれいにするために.
  • interval : タスクをチェックする時間間隔
  • numSlots : タイムスロットの数
  • execute : タスクを処理する関数
  • の実行関数cache は終了したキーを削除し、この有効期限の呼び出しはTimingWheel 進める.

    初期設定


    // The initialization of TimingWheel
    func newTimingWheelWithClock(interval time.Duration, numSlots int, execute Execute, ticker timex.Ticker) (
      *TimingWheel, error) {
      tw := &TimingWheel{
        interval: interval, // time frame interval
        ticker: ticker, // the ticker to trigger the execution
        slots: make([]*list.List, numSlots), // the slots to put tasks
        timers: NewSafeMap(), // map to store task with {key, value}
        tickedPos: numSlots - 1, // the previous ticked position
        execute: execute, // execute function
        numSlots: numSlots, // the number of slots
        setChannel: make(chan timingEntry), // the channel to set tasks
        moveChannel: make(chan baseEntry), // the channel to move tasks
        removeChannel: make(chan interface{}), // the channel to remove tasks
        drainChannel: make(chan func(key, value interface{})), // the channel to drain tasks
        stopChannel: make(chan lang.PlaceholderType), // the channel to stop TimingWheel
      }
      // Prepare all the lists stored in the slot
      tw.initSlots()
      // start asynchronous concurrent process, use channel for task communication and passing
      go tw.run()
    
      return tw, nil
    }
    

    上記の「タイムホイール」のより視覚的な表現ですTimingWheel , 詳細は後述する.go tw.run() tick通知を行うためにGoroutineを作成します.
    func (tw *TimingWheel) run() {
      for {
        select {
        // Timer does time push -> scanAndRunTasks()
        case <-tw.ticker.Chan():
          tw.onTick()
        // add task will input task to setChannel
        case task := <-tw.setChannel:
          tw.setTask(&task)
          ...
        }
      }
    }
    
    ご覧の通り、timer 実行は初期設定で始まり、internal タイムスロット、そして下にタスクをフェッチし続けるlistslot そしてそれをexecute 実行.

    タスク操作


    次のステップはcache key .
    func (c *Cache) Set(key string, value interface{}) {
      c.lock.
      _, ok := c.data[key]
      c.data[key] = value
      c.lruCache.add(key)
      c.lock.Unlock()
    
      expiry := c.unstableExpiry.AroundDuration(c.expiry)
      if ok {
        c.timingWheel.MoveTimer(key, expiry)
      } else {
        c.timingWheel.SetTimer(key, value, expiry)
      }
    }
    
  • キーが存在するかどうかを調べますdata map
  • もし存在するならexpire 呼び出しによってMoveTimer()
  • さもなければ、呼び出しによって満員でキーをセットしてくださいSetTimer()
  • だからTimingWheel 明らかに、開発者はadd or update 彼らのニーズによって.
    また、ソースコードを読むならばSetTimer() , MoveTimer() はタスクをチャネルに送信し、channel Goroutineによって継続的に取り出されますrun() .

    SetTimer() -> setTask().

    • not exist task: getPostion -> pushBack to list -> setPosition
    • exist task: get from timers -> moveTask()

    MoveTimer() -> moveTask()


    上記の呼び出しチェーンから、すべてで呼ばれる機能があります:moveTask()
    func (tw *TimingWheel) moveTask(task baseEntry) {
      // timers: Map => get [positionEntry "pos, task"] by key
      val, ok := tw.timers.Get(task.key)
      if !ok {
        return
      }
    
      timer := val.(*positionEntry)
      // {delay < interval} => delay is less than a time frame interval,
      // the task should be executed immediately
      if task.delay < tw.interval {
        threading.GoSafe(func() {
          tw.execute(timer.item.key, timer.item.value)
        })
        return
      }
      // If > interval, calculate the new pos, circle out of the time wheel by delaying the time
      pos, circle := tw.getPositionAndCircle(task.delay)
      if pos >= timer.pos {
        timer.item.circle = circle
        // Record the offset of the move
        timer.item.diff = pos - timer.pos
      } else if circle > 0 {
        // move to the level of (circle-1)
        circle --
        timer.item.circle = circle
        // because it's an array, add numSlots [which is the equivalent of going to the next level]
        timer.item.diff = tw.numSlots + pos - timer.pos
      } else {
        // If offset is ahead of schedule, task is still on the first level
        // mark the old task for deletion and requeue it for execution
        timer.item.removed = true
        newItem := &timingEntry{
          baseEntry: task,
          value: timer.item.value,
        }
        tw.slots[pos].PushBack(newItem)
        tw.setTimerPosition(pos, newItem)
      }
    }
    
    以上の工程は以下のような場合である.
  • delay < internal : 単一の精度であるので、このタスクが直ちに実行される必要があることを意味します
  • delay を返します.
  • new >= old : <newPos, newCircle, diff>
  • newCircle > 0 : diffを計算し、次のレベルに円を変換します
  • 遅延が簡単に短縮された場合は、古いタスクマーカーを削除し、リストに戻り、ループの次のラウンドを待つ
  • 実行する


    以前は初期化では、タイマーrun() 前進し続ける、主にリストのタスクを通過するために進行するプロセスはexecute func . タイマーの実行から始めましょう.
    // Timer 'execute every internal'
    func (tw *TimingWheel) onTick() {
      // update the current tick position on every execution
      tw.tickedPos = (tw.tickedPos + 1) % tw.numSlots
      // Get the chain of tasks stored in the tick position at this time
      l := tw.slots[tw.tickedPos]
      tw.scanAndRunTasks(l)
    }
    
    この直後に実行する方法ですexecute .
    func (tw *TimingWheel) scanAndRunTasks(l *list.List) {
      // store the tasks {key, value} that currently need to be executed [the arguments needed by execute, passed in turn to execute]
      var tasks []timingTask
    
      for e := l.Front(); e ! = nil; {
        task := e.Value.(*timingEntry)
        // mark for deletion, do the real deletion in scan, delete data from map
        if task.removed {
          next := e.Next()
          l.Remove(e)
          tw.timers.Del(task.key)
          e = next
          continue
        } else if task.circle > 0 {
          // the current execution point has expired, but at the same time it's not in the first level,
          // so even though the current level is done, it drops to the next level
          // but it doesn't modify pos
          task.circle--
          e = e.Next()
          continue
        } else if task.diff > 0 {
          // because the diff has already been marked, it needs to go into the queue again
          next := e.Next()
          l.Remove(e)
          pos := (tw.tickedPos + task.diff) % tw.numSlots
          tw.slots[pos].PushBack(task)
          tw.setTimerPosition(pos, task)
          task.diff = 0
          e = next
          continue
        }
        // the above cases are all non-executable cases, those that can be executed will be added to tasks
        tasks = append(tasks, timingTask{
          key: task.key,
          value: task.value,
        })
        next := e.Next()
        l.Remove(e)
        tw.timers.Del(task.key)
        e = next
      }
      // for range tasks, then just execute each task->execute
      tw.runTasks(tasks)
    }
    
    特定の分岐状況はコメントで説明されますmoveTask() , どこcircle 下降するdiff 計算は、関連する2つの関数のフォーカスです.
    に関してはdiff 計算、それは計算が含まれますpos, circle .
    // interval: 4min, d: 60min, numSlots: 16, tickedPos = 15
    // step = 15, pos = 14, circle = 0
    func (tw *TimingWheel) getPositionAndCircle(d time.Duration) (pos int, circle int) {
      steps := int(d / tw.interval)
      pos = (tw.tickedPos + steps) % tw.numSlots
      circle = (steps - 1) / tw.numSlots
      return
    }
    

    The above process can be simplified to the following.

    steps = d / interval
    pos = step % numSlots - 1
    circle = (step - 1) / numSlots
    

    要約する

  • TimingWheel タイマーに依存して前方からの時間を駆動するにはdoubly linked list 現在の時刻枠で、execute を実行します.運転するからinternal 固定された時間ステップ、60年代のタスクがあります.internal = 1s , それで、それは59回のNOOPを走らせます.
  • 拡張時間内にcircle レイヤリング、あなたは常にオリジナルを再利用することができますnumSlots , タイマーが絶えずloop サークルでサークル.タスクの任意の数の固定サイズのslots . この設計は、追加のデータ構造を作成することなく、長い時間制限を破ることができます.
  • Also in go-zero there are many practical toolkits, using them for improving service performance and development efficiency.


    プロジェクトアドレス


    https://github.com/zeromicro/go-zero
    使用にようこそgo-zero 私たちをサポートするスター!