golang.org/x/time/rate使用説明


  • インタフェース紹介
  • type Limiter
  • func NewLimiter
  • func Limiter Allow
  • func Limiter AllowN
  • func Limiter Reserve
  • func Limiter ReserveN
  • func Limiter Wait
  • func Limiter WaitN
  • AllowN
  • のテスト
  • ReserveN
  • のテスト


    公式リンク本人簡書同期アドレス

    インタフェースの紹介


    type Limiter

    type Limiter struct {
        // contains filtered or unexported fields
    }

    Limterは時間の発生頻度を制限し,トークンプールのアルゴリズムを用いて実現した.この池の最初の容量はbで、b個のトークンをいっぱい入れて、それから毎秒中にr個のトークンを充填します.トークンプールには最大b個のトークンがあるため、1回に最大b個のイベントしか発生できず、1個のイベントに1個のトークンが費やされる.
    Limterは3つの主要な関数Allow,Reserve,and Waitを提供する.ほとんどの場合Waitを使用します.

    func NewLimiter

    func NewLimiter(r Limit, b int) *Limiter

    NewLimiterは新しいLimiterを返します.

    func (*Limiter) [Allow]

    func (lim *Limiter) Allow() bool

    Allowは関数AllowN(time.Now(),1)の簡略化関数である.

    func (*Limiter) AllowN

    func (lim *Limiter) AllowN(now time.Time, n int) bool

    AllowNは、時間nowのときにn個のイベントが同時に発生できるか否かを識別する(すなわち、nowのときにトークンプールからn個のトークンを取り出すことができるか否かを意味する).イベントが頻度を超えたときにイベントを破棄またはスキップする必要がある場合は、AllowNを使用します.そうでない場合はReserveまたはWaitを使用します.

    func (*Limiter) Reserve

    func (lim *Limiter) Reserve() *Reservation

    ReserveはReserveN(time.Now(),1).と呼びます.

    func (*Limiter) ReserveN

    func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation

    ReserveNは、n個のイベントが発生するまで、呼び出し元がどのくらい待たなければならないかを示すオブジェクトReservationを返します(つまり、トークンプールに少なくともn個のトークンが含まれているかを意味します).
    ReserveNが入力nがトークンプールの容量bより大きい場合、falseが戻る.使用例は次のとおりです.
    r := lim.ReserveN(time.Now(), 1)
    if !r.OK() {
      // Not allowed to act! Did you remember to set lim.burst to be > 0 ? 1 false, b 0?
      return
    }
    time.Sleep(r.Delay())
    Act()

    この方法は、イベントを失わずにイベントの発生速度を周波数に応じて制限し、低減することを望む場合に使用される.ここで言いたいのは、イベントの発生頻度が呼び出し者によって制御されていれば、イベントの発生速度を失わずにReserveNで制御できるという意味だと思います.contextの締め切りまたはcancelメソッドを使用する場合はWaitNを使用します.

    func (*Limiter) Wait

    func (lim *Limiter) Wait(ctx context.Context) (err error)

    WaitはWaitN(ctx,1)の簡略化された形式である.

    func (*Limiter) WaitN

    func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)

    WaitNはlimがn個のイベントの発生を許可するまでブロックします.-nがトークンプールの容量サイズを超えた場合、エラーが表示されます.-Contextがキャンセルされた場合、エラーが表示されます.-limの待ち時間がContextのタイムアウト時間を超えた場合はエラーとなります.


    AllowNのテスト

    package main
    
    import (
        "os"
        "time"
    
        "golang.org/x/time/rate"
    
        "github.com/op/go-logging"
    )
    
    var log = logging.MustGetLogger("example")
    
    // Example format string. Everything except the message has a custom color
    // which is dependent on the log level. Many fields have a custom output
    // formatting too, eg. the time returns the hour down to the milli second.
    var format = logging.MustStringFormatter(
        `%{color}%{time:15:04:05.000} %{shortfunc} ▶ %{level:.4s} %{id:03x}%{color:reset} %{message}`,
    )
    
    func main() {
    
        backend1 := logging.NewLogBackend(os.Stderr, "", 0)
        backend2 := logging.NewLogBackend(os.Stderr, "", 0)
        backend2Formatter := logging.NewBackendFormatter(backend2, format)
        backend1Leveled := logging.AddModuleLevel(backend1)
        backend1Leveled.SetLevel(logging.ERROR, "")
        logging.SetBackend(backend1Leveled, backend2Formatter)
    
        r := rate.Every(1)
        limit := rate.NewLimiter(r, 10)
        for {
            if limit.AllowN(time.Now(), 8) {
                log.Info("log:event happen")
            } else {
                log.Info("log:event not allow")
            }
    
        }
    
    }
    

    ReserveNのテスト


    参考YY兄さん
    package main
    
    import (
        "bytes"
        "fmt"
        "io"
        "time"
    
        "golang.org/x/time/rate"
    )
    
    type reader struct {
        r      io.Reader
        limiter *rate.Limiter
    }
    
    // Reader returns a reader that is rate limited by
    // the given token bucket. Each token in the bucket
    // represents one byte.
    func NewReader(r io.Reader, l *rate.Limiter) io.Reader {
        return &reader{
            r:      r,
            limiter:l,
        }
    }
    
    func (r *reader) Read(buf []byte) (int, error) {
        n, err := r.r.Read(buf)
        if n <= 0 {
            return n, err
        }
    
        now := time.Now()
        rv := r.limiter.ReserveN(now, n)
        if !rv.OK() {
            return 0, fmt.Errorf("Exceeds limiter's burst")
        }
        delay := rv.DelayFrom(now)
        //fmt.Printf("Read %d bytes, delay %d
    ", n, delay)
    time.Sleep(delay) return n, err } func main() { // Source holding 1MB src := bytes.NewReader(make([]byte, 1024*1024)) // Destination dst := &bytes.Buffer{} // Bucket adding 100KB every second, holding max 100KB limit := rate.NewLimiter(100*1024, 100*1024) start := time.Now() buf := make([]byte, 10*1024) // Copy source to destination, but wrap our reader with rate limited one //io.CopyBuffer(dst, NewReader(src, limit), buf) r := NewReader(src, limit) for{ if n, err := r.Read(buf); err == nil { dst.Write(buf[0:n]) }else{ break } } fmt.Printf("Copied %d bytes in %s
    "
    , dst.Len(), time.Since(start)) }