golangのsyncを1回覚えてください.Map同時作成、読み込みの問題
6861 ワード
背景:
zmq 4を用いて通信するgoを用いたプロジェクトがあり,簡単なrpcプロセスであり,初期の遠位端はmapを用いてipと具体的なsocketのマッピングを行った.
に質問
たぶんそうだろう
struct SocketMap {
sync.Mutex
sockets map[string]*zmq4.Socket
}
呼び出し時のコードは、次のようになります.
func (pushList *SocketMap) push(ip string, data []byte) {
pushList.Lock()
defer pushList.UnLock()
socket := pushList.sockets[string]
if socket == nil {
socket := zmq4.NewSocket()
//do some initial operation like connect
pushList.sockets[ip] = socket
}
socket.Send(data)
}
pushが同時アクセスされると(実際にはpushが頻繁に同時アクセスされる)、この大きなロックの存在により、臨界領域で1つのコヒーレンスしか動作せず、効率が大幅に低下するという問題が見られると信じています.
ソリューション:crashの最適化をもたらす
syncを使うことにしましたMapはこの設計に取って代わって、それから第1版のコードを出して、書くのはとても簡単で、簡単な置換だけをしました:
struct SocketMap {
sockets sync.Map
}
func (pushList *SocketMap) push(ip string, data []byte) {
var socket *zmq4.Socket
socketInter, ok = pushList.sockets.Load(ip)
if !ok {
socket = zmq4.NewSocket()
//do some initial operation like connect
pushList.sockets.Store(ip, socket)
} else {
socket = socketInter.(*zmq4.Socket)
}
socket.Send(data)
}
一見問題ないように見えますか?しかし、走るといつも爆発し、ロゴを見ると、不法な住所があることを示した.後でgithubで見ましたzmq 4.Socketはスレッドが安全ではありません.上のコードは、複数のスレッドがsocketインスタンスを同時に取得し、crashになります.
解決策2:ロックを1つ追加しても止められない競合
そしてどうしますか?鍵をかけるしかないようですが、今回はmap全体に鍵をかけることはできません.そうしないと性能に問題があります.鍵の粒度を減らすことを考えましょう.鍵を使ってsocketを包装します.この時、私たちのコードも叫びました.
struct SocketMutex{
sync.Mutex
socket *zmq4.Socket
}
struct SocketMap {
sockets sync.Map
}
func (pushList *SocketMap) push(ip string, data []byte) {
var socket *SocketMutex
socketInter, ok = pushList.sockets.Load(ip)
if !ok {
socket = &{
socket: zmq4.NewSocket()
}
//do some initial operation like connect
pushList.sockets.Store(ip, newSocket)
} else {
socket = socketInter.(*SocketMutex)
}
socket.Lock()
defer socket.Unlock()
socket.socket.Send(data)
}
しかし、これはまだ問題があります.経験豊富な兄が一目で見ることができると信じています.問題は
socketInter, ok = pushList.sockets.Load(ip)
という行のコードにあります.mapにこの値がなく、複数の協程が同時にこの行のコードにアクセスしている場合、明らかにこれらの協程のokはfalseに設定されていますが、その後、最初のifコードブロックに入り、複数のsocketインスタンスを作成し、元の値を上書きしようとしています.単純にこの問題を解決するのも簡単で、sync.Map.LoadOrStore(key interface{}, value interface{}) (v interface{}, loaded bool)
というapiを使って、原子的に読み書きをします.しかし、これはまだ終わっていません.私たちの新しい値を書き込む操作はapiを呼び出してsocketを作成するだけでなく、一連の初期化操作もあります.初期化が完了する前に、このインスタンスをロードで取得した他のスレッドがsocketの実例に本当にアクセスできないことを保証しなければなりません.明らかにMapが持っているメカニズムはもうこの問題を解決できないので、ロックするかsyncするか、他の手段を探さなければなりません.WaitGroupかwhateverの他に何かありますか.ソリューション3:パッケージを閉じることによる不思議な体験
その後、私はencoderにいました.goには次のコードが表示されます.
346 func typeEncoder(t reflect.Type) encoderFunc {
347 if fi, ok := encoderCache.Load(t); ok {
348 return fi.(encoderFunc)
349 }
350
351 // To deal with recursive types, populate the map with an
352 // indirect func before we build it. This type waits on the
353 // real func (f) to be ready and then calls it. This indirect
354 // func is only used for recursive types.
355 var (
356 wg sync.WaitGroup
357 f encoderFunc
358 )
359 wg.Add(1)
360 fi, loaded := encoderCache.LoadOrStore(t, encoderFunc(func(e *encodeState, v reflect.Value, opts encOpts) {
361 wg.Wait()
362 f(e, v, opts)
363 }))
364 if loaded {
365 return fi.(encoderFunc)
366 }
367
368 // Compute the real encoder and replace the indirect func with it.
369 f = newTypeEncoder(t, true)
370 wg.Done()
371 encoderCache.Store(t, f)
372 return f
373 }
私たちはsyncにいることができます.Mapには閉パッケージ関数が格納され、閉パッケージ関数でローカルsyncを待つ.WaitGroupが完了してからインスタンスを返します.すると最終的なコードも成形されます.
struct SocketMutex{
sync.Mutex
socket *zmq4.Socket
}
struct SocketMap {
sockets sync.Map
}
func (pushList *SocketMap) push(ip string, data []byte) {
type SocketFunc func()*SocketMutex
var (
socket *SocketMutex
w sync.WaitGroup
)
socket = &SocketMutex {
socket : zmq4.NewSocket()
}
w.Add(1)
socketf, ok = pushList.sockets.LoadOrStore(ip, SocketFunc(func()*SocketMutex) {
w.Wait()
return socket
})
if !ok {
socket = &{
socket: zmq4.NewSocket()
}
//do some initial operation like connect
w.Done()
} else {
socket = socketInter.(*SockeFunc)()
}
socket.Lock()
defer socket.Unlock()
socket.socket.Send(data)
}
まとめ:
コンカレントコードにおける競合問題は,各行のコードの再入性を熟考しなければならないだろう.総じて以下のガイドラインを維持します.
(1)アクセス不可のシステムリソース、例えばsocketfd,filefd,signalfd(実際にはこのようなシステムリソースの多くは再アクセス不可)などは、ロック構造のないコンテナ、読み書きロックパッケージのコンテナを使用する場合、各リソースに個別にロックをかけるか、またはその他の手段を用いてシステムリソースが臨界領域で有効に保護されることを保証する必要がある.
(2)読み取りがある場合、空であれば書き込むロジックは、原子性保証を提供できるLoadOrSave呼び出しを使用する必要があり、あるいはなければ、自ら実現しても読み取りと書き込みプロセス全体の原子性を保証しなければならない.同時にLoad呼び出しにアクセスすることを防止すると、複数のスレッドがNoを返して複数のインスタンスが作成され、保存時に互いに上書きされます.-この原則は,メンバーがシステムリソースである場合にのみ有効であるだけでなく,他のものが格納されている場合にも同様に適用される.
(3)リソースの作成が完了する、他の初期化プロセスが必要な場合は、容器内に閉パケットを配置することを考慮し、初期化プロセスはsyncを用いる.WaitGroup保護は、閉パッケージでWaitメソッドを呼び出して初期化が完了するまで待機し、他のスレッドに初期化されたインスタンスを返します.初期化プロセスが完了すると、閉パッケージ関数を置換し、Waitメソッドを呼び出さずに、可能なオーバーヘッドを削減できます.