go同時非ブロックキャッシュ

17399 ワード

このセクションでは、現実世界でコンカレント・プログラムが現れても既存のライブラリが解決できない問題を解決するのに役立つ、ブロックされていないキャッシュを作成します.この問題をキャッシュ関数(memoizing)といいます.(訳注:Memoizationの定義:memoizationという言葉はDonald Michieがラテン語memorandumに基づいて捏造した言葉です.対応する動詞、過去分詞、ing形式はmemoiz、memoized、memoizing.)です.つまり、関数の戻り結果をキャッシュする必要があります.これにより、関数を呼び出すときに、計算を1回だけ必要とし、その後、計算結果を返すだけでいいのです.デルのソリューションは、同時セキュリティであり、キャッシュ全体のロックを回避し、すべての操作がロックの設計を争うことを回避します.
キャッシュする必要がある関数の一例として、次のhttpGetBody関数を使用します.この関数は、HTTP GETリクエストを行い、http応答bodyを取得します.この関数の呼び出し自体にオーバーヘッドが大きいので,不要なときに繰り返し呼び出さないようにする.
func httpGetBody(url string) (interface{}, error) {
    resp, err := http.Get(url)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    return ioutil.ReadAll(resp.Body)
}

最後の行には少し細部が隠されています.ReadAllは2つの結果を返し、1つの[]byte配列と1つのエラーを返しますが、この2つのオブジェクトはhttpGetBodyの戻り宣言のinterface{}とerrorタイプに割り当てられるので、結果を返すことができ、追加の作業を必要としません.この戻りタイプをhttpGetBodyで選択したのは、キャッシュと一致させるためです.
次に、私たちが設計するcacheの最初の「下書き」を示します.
gopl.io/ch9/memo1
// Package memo provides a concurrency-unsafe
// memoization of a function of type Func.
package memo

// A Memo caches the results of calling a Func.
type Memo struct {
    f     Func
    cache map[string]result
}

// Func is the type of the function to memoize.
type Func func(key string) (interface{}, error)

type result struct {
    value interface{}
    err   error
}

func New(f Func) *Memo {
    return &Memo{f: f, cache: make(map[string]result)}
}

// NOTE: not concurrency-safe!
func (memo *Memo) Get(key string) (interface{}, error) {
    res, ok := memo.cache[key]
    if !ok {
        res.value, res.err = memo.f(key)
        memo.cache[key] = res
    }
    return res.value, res.err
}

Memoインスタンスには、キャッシュが必要な関数f(タイプはFunc)と、キャッシュ内容(stringからresultマッピングまでのmap)が記録されます.各resultは、単純な関数が返す値のペアであり、値とエラー値です.続けてMemoの変種を示しますが、すべての例は上記の点に従います.
以下に、Memoを使用した例を示します.流入するURLの各要素についてGetを呼び出し、呼び出しの遅延とその返されるデータサイズのlogを印刷します.
m := memo.New(httpGetBody)
for url := range incomingURLs() {
    start := time.Now()
    value, err := m.Get(url)
    if err != nil {
        log.Print(err)
    }
    fmt.Printf("%s, %s, %d bytes
"
, url, time.Since(start), len(value.([]byte))) }

テストパッケージ(11章のトピック)を使用して、キャッシュの効果を系統的に同定することができます.次のテスト出力から、URLストリームにはいくつかの重複が含まれていることがわかります.URLの(*Memo).Getの呼び出しごとに数百ミリ秒かかりますが、2回目は1ミリ秒で完全なデータを返すことができます.
$ go test -v gopl.io/ch9/memo1
=== RUN   Test
https://golang.org, 175.026418ms, 7537 bytes
https://godoc.org, 172.686825ms, 6878 bytes
https://play.golang.org, 115.762377ms, 5767 bytes
http://gopl.io, 749.887242ms, 2856 bytes
https://golang.org, 721ns, 7537 bytes
https://godoc.org, 152ns, 6878 bytes
https://play.golang.org, 205ns, 5767 bytes
http://gopl.io, 326ns, 2856 bytes
--- PASS: Test (1.21s)
PASS
ok  gopl.io/ch9/memo1   1.257s

このテストはすべての呼び出しを順番に行います.
このような相互に独立したHTTPリクエストはうまく同時化できるので,このテストを同時化形式に変更することができる.syncを使用できます.WaitGroupは、すべてのリクエストが完了してから戻るのを待っています.
m := memo.New(httpGetBody)
var n sync.WaitGroup
for url := range incomingURLs() {
    n.Add(1)
    go func(url string) {
        start := time.Now()
        value, err := m.Get(url)
        if err != nil {
            log.Print(err)
        }
        fmt.Printf("%s, %s, %d bytes
"
, url, time.Since(start), len(value.([]byte))) n.Done() }(url) } n.Wait()

今回のテストはもっと速く走ったが、残念なことに、このテストは毎回正常に動作するわけではないようだ.予期せぬcache miss(キャッシュがヒットしていない)や、キャッシュにヒットしたが誤った値を返したり、直接クラッシュしたりすることに気づきました.
しかし、さらに悪いことに、このプログラムが正しく実行されることもあるので、このプログラムにバグがあることに気づかないかもしれません.しかし、-raceというflagを使用してプログラムを実行することができ、競合検出器(§9.6)は次のようなレポートを印刷します.
$ go test -run=TestConcurrent -race -v gopl.io/ch9/memo1
=== RUN   TestConcurrent
...
WARNING: DATA RACE
Write by goroutine 36:
  runtime.mapassign1()
      ~/go/src/runtime/hashmap.go:411 +0x0
  gopl.io/ch9/memo1.(*Memo).Get()
      ~/gobook2/src/gopl.io/ch9/memo1/memo.go:32 +0x205
  ...
Previous write by goroutine 35:
  runtime.mapassign1()
      ~/go/src/runtime/hashmap.go:411 +0x0
  gopl.io/ch9/memo1.(*Memo).Get()
      ~/gobook2/src/gopl.io/ch9/memo1/memo.go:32 +0x205
...
Found 1 data race(s)
FAIL    gopl.io/ch9/memo1   2.393s

memo.goの32行が2回出現し,2つのgoroutineが同期介入なしにcache mapを更新したことを示した.これはGetが同時セキュリティではなく,データ競合があることを示している.
28  func (memo *Memo) Get(key string) (interface{}, error) {
29      res, ok := memo.cache(key)
30      if !ok {
31          res.value, res.err = memo.f(key)
32          memo.cache[key] = res
33      }
34      return res.value, res.err
35  }

最も簡単なcacheを同時セキュリティにする方法は、モニタリングベースの同期を使用することです.Memoにmutexを追加し、Getの最初に反発ロックを取得し、returnのときにロックを解放すれば、cacheの操作を臨界領域で発生させることができます.
gopl.io/ch9/memo2
type Memo struct {
    f     Func
    mu    sync.Mutex // guards cache
    cache map[string]result
}

// Get is concurrency-safe.
func (memo *Memo) Get(key string) (value interface{}, err error) {
    memo.mu.Lock()
    res, ok := memo.cache[key]
    if !ok {
        res.value, res.err = memo.f(key)
        memo.cache[key] = res
    }
    memo.mu.Unlock()
    return res.value, res.err
}

テストは並行して行われたが、今回は競争検査器が「沈黙」した.残念なことに、Memoに対するこの変更は、同時的なパフォーマンスの利点を完全に失いました.fへの呼び出しのたびにロックが保持され,Getは本来並列に実行可能なI/O動作をシリアル化する.この章の目的は、現在のようにすべてのリクエストをシリアル化する関数のキャッシュではなく、ロックされていないキャッシュを完了することです.
次のGetの実装では、Getを呼び出すgoroutineは2回ロックを取得します.検索フェーズは1回取得され、検索が何も返されない場合は、更新フェーズに入って再び取得されます.この2回のロック取得の中間段階では、他のgoroutineはcacheを自由に使用することができます.
gopl.io/ch9/memo3
func (memo *Memo) Get(key string) (value interface{}, err error) {
    memo.mu.Lock()
    res, ok := memo.cache[key]
    memo.mu.Unlock()
    if !ok {
        res.value, res.err = memo.f(key)

        // Between the two critical sections, several goroutines
        // may race to compute f(key) and update the map.
        memo.mu.Lock()
        memo.cache[key] = res
        memo.mu.Unlock()
    }
    return res.value, res.err
}

これらの変更により、パフォーマンスは再び向上しましたが、いくつかのURLが2回取得されました.このことは、2つ以上のgoroutineが同じ時刻にGetを呼び出して同じURLを要求した場合に発生する.複数のgoroutineがcacheを一緒にクエリーし、値がないことを発見し、fという遅い関数を一緒に呼び出します.結果が得られたらmapも更新されます.いずれかの結果は、別の結果を上書きします.
理想的には余計な仕事は避けるべきだ.この「回避」作業は、一般にduplicate suppression(繰返し抑制/回避)と呼ばれる.次のバージョンのMemoの各map要素は、1つのエントリを指すポインタです.各エントリには、関数f呼び出し結果のコンテンツキャッシュが含まれる.これまでとは違って、今回のentryにはreadyというchannelが含まれています.エントリの結果が設定されると、このchannelは閉じられ、他のgoroutineブロードキャスト(§8.9)にエントリ内の結果を読み出すのは安全です.
gopl.io/ch9/memo4
type entry struct {
    res   result
    ready chan struct{} // closed when res is ready
}

func New(f Func) *Memo {
    return &Memo{f: f, cache: make(map[string]*entry)}
}

type Memo struct {
    f     Func
    mu    sync.Mutex // guards cache
    cache map[string]*entry
}

func (memo *Memo) Get(key string) (value interface{}, err error) {
    memo.mu.Lock()
    e := memo.cache[key]
    if e == nil {
        // This is the first request for this key.
        // This goroutine becomes responsible for computing
        // the value and broadcasting the ready condition.
        e = &entry{ready: make(chan struct{})}
        memo.cache[key] = e
        memo.mu.Unlock()

        e.res.value, e.res.err = memo.f(key)

        close(e.ready) // broadcast ready condition
    } else {
        // This is a repeat request for this key.
        memo.mu.Unlock()

        // wait for ready condition
    }
    return e.res.value, e.res.err
}

Get関数には、共有変数cache mapを保護するために反発ロックを取得し、mapに指定されたエントリがあるかどうかをクエリーし、見つからない場合は新しいエントリを挿入し、反発ロックを解放するステップが含まれます.エントリが存在し、その値が書き込み完了していない場合(すなわち、fという遅い関数を呼び出す他のgoroutineがある場合)、goroutineは値readyを待ってからエントリの結果を読む必要があります.一方、readyの有無を知りたければ、ready channelから直接読み取ることができます.この読み取り操作はchannelが閉じるまでブロックされていたからです.
エントリがない場合は、mapに準備されていないエントリを挿入する必要があります.現在呼び出されているgoroutineは、スロー関数の呼び出し、エントリの更新、および他のすべてのgoroutineへのエントリのready読み取り可能なメッセージのブロードキャストを担当する必要があります.
エントリのe.res.value変数とe.res.err変数は、複数のgoroutine間で共有されます.エントリを作成するgoroutineは、エントリの値も設定します.他のgoroutineは、readyのブロードキャストメッセージを受信すると、すぐにエントリの値を読み取ります.複数のgoroutineに同時にアクセスされますが、反発ロックは必要ありません.ready channelのクローズは、他のgoroutineがブロードキャストイベントを受信する前に必ず発生するので、最初のgoroutineのこれらの変数に対する書き込み操作は、これらの読み取り操作の前に必ず発生する.データ競合は発生しません.
これで同時、繰り返さず、ブロックのないcacheが完成します.
このようなMemoの実装では、複数のgoroutineがGetを呼び出したときの共有map変数を保護するために、1つの反発量が使用される.この設計を、Getを呼び出すときにメッセージを送信する必要があるmap変数を個別のmonitor goroutineに制限するスキームと比較してみてください.
Func、result、entryの宣言は、以前と一致しています.
// Func is the type of the function to memoize.
type Func func(key string) (interface{}, error)

// A result is the result of calling a Func.
type result struct {
    value interface{}
    err   error
}

type entry struct {
    res   result
    ready chan struct{} // closed when res is ready
}

しかし、Memoタイプにはrequestsというchannelが含まれており、Getの呼び出し元はこのchannelでmonitor goroutineと通信している.requests channelの要素タイプはrequestです.Getの呼び出し元はこの構造の2つのkeyのセットを埋め尽くし、実際にはこの2つの変数で関数をキャッシュします.もう一つのresponseというchannelが応答結果を送信します.このchannelは単独の値だけを返します.
gopl.io/ch9/memo5
// A request is a message requesting that the Func be applied to key.
type request struct {
    key      string
    response chan// the client wants a single result
}

type Memo struct{ requests chan request }
// New returns a memoization of f.  Clients must subsequently call Close.
func New(f Func) *Memo {
    memo := &Memo{requests: make(chan request)}
    go memo.server(f)
    return memo
}

func (memo *Memo) Get(key string) (interface{}, error) {
    response := make(chan result)
    memo.requests return res.value, res.err
}

func (memo *Memo) Close() { close(memo.requests) }

上のGetメソッドでは、response channelが作成され、request構造に格納され、monitor goroutineに送信され、すぐに受信されます.
Cache変数はmonitor goroutine `(*Memo).serverに制限されており、以下に示す.monitorは、request channelがCloseメソッドによって閉じられるまで、リクエストをループ中に読み取り続けます.各リクエストはcacheをクエリーし、エントリが見つからない場合は新しいエントリを作成/挿入します.
func (memo *Memo) server(f Func) {
    cache := make(map[string]*entry)
    for req := range memo.requests {
        e := cache[req.key]
        if e == nil {
            // This is the first request for this key.
            e = &entry{ready: make(chan struct{})}
            cache[req.key] = e
            go e.call(f, req.key) // call f(key)
        }
        go e.deliver(req.response)
    }
}

func (e *entry) call(f Func, key string) {
    // Evaluate the function.
    e.res.value, e.res.err = f(key)
    // Broadcast the ready condition.
    close(e.ready)
}

func (e *entry) deliver(response chan// Wait for the ready condition.
    // Send the result to the client.
    response 

反発量ベースのバージョンと同様に、あるkeyに対する最初の要求は、関数fを呼び出してこのkeyに転送し、結果をエントリに存在させ、ready channelを閉じてエントリのreadyメッセージをブロードキャストする必要がある.(*entry).callを使用して、上記の作業を完了します.
次に、同じkeyのリクエストに対してmapにすでに存在するエントリが発見され、結果がreadyになるのを待ってresponseからクライアントのgoroutenに結果を送信します.上記の作業は(*entry).deliverで行われた.callメソッドとdeliverメソッドの呼び出しは、monitor goroutinesがそのためにブロックされず、新しいリクエストを処理できないことを保証するために、独自のgoroutineで行わなければならない.
この例は,ロックを用いても通信を用いても同時プログラムを確立することが可能であることを示している.
上記の2つの案は、特定のシナリオの下でどちらが良いかは言えませんが、彼らを理解するには価値があります.ある方法から別の方法に切り替えると、コードをより簡潔にすることができます.(注:golangは通信併発を推奨すると約束したのではないでしょうか)
練習9.3:Funcタイプと(*Memo).Getメソッドを拡張し、呼び出し元がオプションのdone channelを提供することをサポートし、このchannelを通じて操作全体をキャンセルする能力を備えさせる(§8.9).キャンセルされたFuncの呼び出し結果はキャッシュされるべきではありません.