同時のタスクの数百万のタイミングホイールを実装します.
30586 ワード
遅延のあるプロセスタスクの多くを含むシステムに対して.我々はタスクを処理するためにタイマーの多くを使用する場合は、アイドルGoroutinesとたくさんのメモリが消費されます.グルテンの多くはまた、それらをスケジュールするより多くのCPUを消費する.
この記事ではTimeWheel オプション2のタスクのメンテナンスを減らす
我々の自身の使用を見ましょう
使い始めましょう
の実行関数
上記の「タイムホイール」のより視覚的な表現です
次のステップは
キーが存在するかどうかを調べます もし存在するなら さもなければ、呼び出しによって満員でキーをセットしてください だから
また、ソースコードを読むならば
上記の呼び出しチェーンから、すべてで呼ばれる機能があります: 遅延が簡単に短縮された場合は、古いタスクマーカーを削除し、リストに戻り、ループの次のラウンドを待つ
以前は初期化では、タイマー
に関しては
拡張時間内に
https://github.com/zeromicro/go-zero
使用にようこそ
この記事ではTimeWheel
go-zero
, これは開発者が多くの遅延タスクをスケジュールすることができます.遅延タスクについては、2つのオプションが通常利用可能です.Timer
, タイマーは1回限りのタスクに使用されます.それは将来の単一のイベントを表します.あなたはどのくらい待つのタイマーを教えてください、それはその時に通知されるチャンネルを提供します.TimingWheel
, これは、タスクグループの配列を維持し、各スロットは、格納されたタスクのチェーンを維持します.実行が開始されると、タイマは所定の間隔で1スロットのタスクを実行する.priority queue O(nlog(n))
to bidirectional linked table O(1)
, また、タスクの実行には、ある時点でタスクのポーリングだけが必要ですO(N)
, 元素を入れて取り除くO(nlog(n))
, 優先キューの場合と同様.我々の自身の使用を見ましょう
TimingWheel
インgo-zero
: timingwheel in cache
使い始めましょう
TimingWheel
にcache
of collection
.timingWheel, err := NewTimingWheel(time.Second, slots, func(k, v interface{}) {
key, ok := k.(string)
if !ok {
return
}
cache.Del(key)
})
if err ! = nil {
return nil, err
}
cache.timingWheel = timingWheel
これはcache
どちらが初期化TimingWheel
期限切れのキーをきれいにするために.interval
: タスクをチェックする時間間隔numSlots
: タイムスロットの数execute
: タスクを処理する関数cache
は終了したキーを削除し、この有効期限の呼び出しはTimingWheel
進める.初期設定
// The initialization of TimingWheel
func newTimingWheelWithClock(interval time.Duration, numSlots int, execute Execute, ticker timex.Ticker) (
*TimingWheel, error) {
tw := &TimingWheel{
interval: interval, // time frame interval
ticker: ticker, // the ticker to trigger the execution
slots: make([]*list.List, numSlots), // the slots to put tasks
timers: NewSafeMap(), // map to store task with {key, value}
tickedPos: numSlots - 1, // the previous ticked position
execute: execute, // execute function
numSlots: numSlots, // the number of slots
setChannel: make(chan timingEntry), // the channel to set tasks
moveChannel: make(chan baseEntry), // the channel to move tasks
removeChannel: make(chan interface{}), // the channel to remove tasks
drainChannel: make(chan func(key, value interface{})), // the channel to drain tasks
stopChannel: make(chan lang.PlaceholderType), // the channel to stop TimingWheel
}
// Prepare all the lists stored in the slot
tw.initSlots()
// start asynchronous concurrent process, use channel for task communication and passing
go tw.run()
return tw, nil
}
上記の「タイムホイール」のより視覚的な表現です
TimingWheel
, 詳細は後述する.go tw.run()
tick通知を行うためにGoroutineを作成します.func (tw *TimingWheel) run() {
for {
select {
// Timer does time push -> scanAndRunTasks()
case <-tw.ticker.Chan():
tw.onTick()
// add task will input task to setChannel
case task := <-tw.setChannel:
tw.setTask(&task)
...
}
}
}
ご覧の通り、timer
実行は初期設定で始まり、internal
タイムスロット、そして下にタスクをフェッチし続けるlist
にslot
そしてそれをexecute
実行.タスク操作
次のステップは
cache key
.func (c *Cache) Set(key string, value interface{}) {
c.lock.
_, ok := c.data[key]
c.data[key] = value
c.lruCache.add(key)
c.lock.Unlock()
expiry := c.unstableExpiry.AroundDuration(c.expiry)
if ok {
c.timingWheel.MoveTimer(key, expiry)
} else {
c.timingWheel.SetTimer(key, value, expiry)
}
}
data map
expire
呼び出しによってMoveTimer()
SetTimer()
TimingWheel
明らかに、開発者はadd
or update
彼らのニーズによって.また、ソースコードを読むならば
SetTimer()
, MoveTimer()
はタスクをチャネルに送信し、channel
Goroutineによって継続的に取り出されますrun()
.
SetTimer() -> setTask()
.
- not exist task:
getPostion -> pushBack to list -> setPosition
- exist task:
get from timers -> moveTask()
MoveTimer() -> moveTask()
上記の呼び出しチェーンから、すべてで呼ばれる機能があります:
moveTask()
func (tw *TimingWheel) moveTask(task baseEntry) {
// timers: Map => get [positionEntry "pos, task"] by key
val, ok := tw.timers.Get(task.key)
if !ok {
return
}
timer := val.(*positionEntry)
// {delay < interval} => delay is less than a time frame interval,
// the task should be executed immediately
if task.delay < tw.interval {
threading.GoSafe(func() {
tw.execute(timer.item.key, timer.item.value)
})
return
}
// If > interval, calculate the new pos, circle out of the time wheel by delaying the time
pos, circle := tw.getPositionAndCircle(task.delay)
if pos >= timer.pos {
timer.item.circle = circle
// Record the offset of the move
timer.item.diff = pos - timer.pos
} else if circle > 0 {
// move to the level of (circle-1)
circle --
timer.item.circle = circle
// because it's an array, add numSlots [which is the equivalent of going to the next level]
timer.item.diff = tw.numSlots + pos - timer.pos
} else {
// If offset is ahead of schedule, task is still on the first level
// mark the old task for deletion and requeue it for execution
timer.item.removed = true
newItem := &timingEntry{
baseEntry: task,
value: timer.item.value,
}
tw.slots[pos].PushBack(newItem)
tw.setTimerPosition(pos, newItem)
}
}
以上の工程は以下のような場合である.delay < internal
: 単一の精度であるので、このタスクが直ちに実行される必要があることを意味しますdelay
を返します.new >= old
: <newPos, newCircle, diff>
newCircle > 0
: diffを計算し、次のレベルに円を変換します実行する
以前は初期化では、タイマー
run()
前進し続ける、主にリストのタスクを通過するために進行するプロセスはexecute func
. タイマーの実行から始めましょう.// Timer 'execute every internal'
func (tw *TimingWheel) onTick() {
// update the current tick position on every execution
tw.tickedPos = (tw.tickedPos + 1) % tw.numSlots
// Get the chain of tasks stored in the tick position at this time
l := tw.slots[tw.tickedPos]
tw.scanAndRunTasks(l)
}
この直後に実行する方法ですexecute
.func (tw *TimingWheel) scanAndRunTasks(l *list.List) {
// store the tasks {key, value} that currently need to be executed [the arguments needed by execute, passed in turn to execute]
var tasks []timingTask
for e := l.Front(); e ! = nil; {
task := e.Value.(*timingEntry)
// mark for deletion, do the real deletion in scan, delete data from map
if task.removed {
next := e.Next()
l.Remove(e)
tw.timers.Del(task.key)
e = next
continue
} else if task.circle > 0 {
// the current execution point has expired, but at the same time it's not in the first level,
// so even though the current level is done, it drops to the next level
// but it doesn't modify pos
task.circle--
e = e.Next()
continue
} else if task.diff > 0 {
// because the diff has already been marked, it needs to go into the queue again
next := e.Next()
l.Remove(e)
pos := (tw.tickedPos + task.diff) % tw.numSlots
tw.slots[pos].PushBack(task)
tw.setTimerPosition(pos, task)
task.diff = 0
e = next
continue
}
// the above cases are all non-executable cases, those that can be executed will be added to tasks
tasks = append(tasks, timingTask{
key: task.key,
value: task.value,
})
next := e.Next()
l.Remove(e)
tw.timers.Del(task.key)
e = next
}
// for range tasks, then just execute each task->execute
tw.runTasks(tasks)
}
特定の分岐状況はコメントで説明されますmoveTask()
, どこcircle
下降するdiff
計算は、関連する2つの関数のフォーカスです.に関しては
diff
計算、それは計算が含まれますpos, circle
.// interval: 4min, d: 60min, numSlots: 16, tickedPos = 15
// step = 15, pos = 14, circle = 0
func (tw *TimingWheel) getPositionAndCircle(d time.Duration) (pos int, circle int) {
steps := int(d / tw.interval)
pos = (tw.tickedPos + steps) % tw.numSlots
circle = (steps - 1) / tw.numSlots
return
}
The above process can be simplified to the following.
steps = d / interval pos = step % numSlots - 1 circle = (step - 1) / numSlots
要約する
TimingWheel
タイマーに依存して前方からの時間を駆動するにはdoubly linked list
現在の時刻枠で、execute
を実行します.運転するからinternal
固定された時間ステップ、60年代のタスクがあります.internal = 1s
, それで、それは59回のNOOPを走らせます.circle
レイヤリング、あなたは常にオリジナルを再利用することができますnumSlots
, タイマーが絶えずloop
サークルでサークル.タスクの任意の数の固定サイズのslots
. この設計は、追加のデータ構造を作成することなく、長い時間制限を破ることができます.Also in
go-zero
there are many practical toolkits, using them for improving service performance and development efficiency.
プロジェクトアドレス
https://github.com/zeromicro/go-zero
使用にようこそ
go-zero
私たちをサポートするスター!Reference
この問題について(同時のタスクの数百万のタイミングホイールを実装します.), 我々は、より多くの情報をここで見つけました https://dev.to/kevwan/implement-a-timing-wheel-for-millions-of-concurrent-tasks-30oiテキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol