golangのsyncを1回覚えてください.Map同時作成、読み込みの問題

6861 ワード

背景:


zmq 4を用いて通信するgoを用いたプロジェクトがあり,簡単なrpcプロセスであり,初期の遠位端はmapを用いてipと具体的なsocketのマッピングを行った.

に質問


たぶんそうだろう
struct SocketMap {
    sync.Mutex
    sockets map[string]*zmq4.Socket
}

呼び出し時のコードは、次のようになります.
func (pushList *SocketMap) push(ip string, data []byte) {
    pushList.Lock()
    defer pushList.UnLock()
    socket := pushList.sockets[string]
    if socket == nil {
      socket := zmq4.NewSocket()
      //do some initial operation like connect
      pushList.sockets[ip] = socket
    }
    socket.Send(data)
}

pushが同時アクセスされると(実際にはpushが頻繁に同時アクセスされる)、この大きなロックの存在により、臨界領域で1つのコヒーレンスしか動作せず、効率が大幅に低下するという問題が見られると信じています.

ソリューション:crashの最適化をもたらす


syncを使うことにしましたMapはこの設計に取って代わって、それから第1版のコードを出して、書くのはとても簡単で、簡単な置換だけをしました:
struct SocketMap {
    sockets sync.Map
}

func (pushList *SocketMap) push(ip string, data []byte) {
    var socket *zmq4.Socket    
    socketInter, ok = pushList.sockets.Load(ip)
    if !ok {
      socket = zmq4.NewSocket()
      //do some initial operation like connect
      pushList.sockets.Store(ip, socket)
    } else {
      socket = socketInter.(*zmq4.Socket)
    }
    socket.Send(data)
}

一見問題ないように見えますか?しかし、走るといつも爆発し、ロゴを見ると、不法な住所があることを示した.後でgithubで見ましたzmq 4.Socketはスレッドが安全ではありません.上のコードは、複数のスレッドがsocketインスタンスを同時に取得し、crashになります.

解決策2:ロックを1つ追加しても止められない競合


そしてどうしますか?鍵をかけるしかないようですが、今回はmap全体に鍵をかけることはできません.そうしないと性能に問題があります.鍵の粒度を減らすことを考えましょう.鍵を使ってsocketを包装します.この時、私たちのコードも叫びました.
struct SocketMutex{
    sync.Mutex
    socket *zmq4.Socket
}
struct SocketMap {
    sockets sync.Map
}

func (pushList *SocketMap) push(ip string, data []byte) {
    var socket *SocketMutex    
    socketInter, ok = pushList.sockets.Load(ip)
    if !ok {
        socket = &{
          socket: zmq4.NewSocket()
        }
        //do some initial operation like connect
       pushList.sockets.Store(ip, newSocket)
    } else {
      socket = socketInter.(*SocketMutex)
    }
    socket.Lock()
    defer socket.Unlock()
    socket.socket.Send(data)
}

しかし、これはまだ問題があります.経験豊富な兄が一目で見ることができると信じています.問題はsocketInter, ok = pushList.sockets.Load(ip)という行のコードにあります.mapにこの値がなく、複数の協程が同時にこの行のコードにアクセスしている場合、明らかにこれらの協程のokはfalseに設定されていますが、その後、最初のifコードブロックに入り、複数のsocketインスタンスを作成し、元の値を上書きしようとしています.単純にこの問題を解決するのも簡単で、sync.Map.LoadOrStore(key interface{}, value interface{}) (v interface{}, loaded bool)というapiを使って、原子的に読み書きをします.しかし、これはまだ終わっていません.私たちの新しい値を書き込む操作はapiを呼び出してsocketを作成するだけでなく、一連の初期化操作もあります.初期化が完了する前に、このインスタンスをロードで取得した他のスレッドがsocketの実例に本当にアクセスできないことを保証しなければなりません.明らかにMapが持っているメカニズムはもうこの問題を解決できないので、ロックするかsyncするか、他の手段を探さなければなりません.WaitGroupかwhateverの他に何かありますか.

ソリューション3:パッケージを閉じることによる不思議な体験


その後、私はencoderにいました.goには次のコードが表示されます.
 346 func typeEncoder(t reflect.Type) encoderFunc {                                 
 347     if fi, ok := encoderCache.Load(t); ok {                                     
 348         return fi.(encoderFunc)                                                
 349     }                                                                          
 350                                                                                
 351     // To deal with recursive types, populate the map with an                  
 352     // indirect func before we build it. This type waits on the                
 353     // real func (f) to be ready and then calls it. This indirect              
 354     // func is only used for recursive types.                                  
 355     var (                                                                      
 356         wg sync.WaitGroup                                                      
 357         f  encoderFunc                                                         
 358     )                                                                          
 359     wg.Add(1)                                                                  
 360     fi, loaded := encoderCache.LoadOrStore(t, encoderFunc(func(e *encodeState, v reflect.Value, opts encOpts) {
 361         wg.Wait()                                                              
 362         f(e, v, opts)                                                          
 363     }))                                                                        
 364     if loaded {                                                                
 365         return fi.(encoderFunc)                                                
 366     }                                                                          
 367                                                                                
 368     // Compute the real encoder and replace the indirect func with it.         
 369     f = newTypeEncoder(t, true)                                                
 370     wg.Done()                                                                  
 371     encoderCache.Store(t, f)                                                   
 372     return f                                                                   
 373 }          

私たちはsyncにいることができます.Mapには閉パッケージ関数が格納され、閉パッケージ関数でローカルsyncを待つ.WaitGroupが完了してからインスタンスを返します.すると最終的なコードも成形されます.
struct SocketMutex{
    sync.Mutex
    socket *zmq4.Socket
}
struct SocketMap {
    sockets sync.Map
}

func (pushList *SocketMap) push(ip string, data []byte) {
    type SocketFunc func()*SocketMutex
    var (
        socket *SocketMutex
        w sync.WaitGroup
    )
    socket = &SocketMutex {
      socket : zmq4.NewSocket()
    }    
    w.Add(1)
    socketf, ok = pushList.sockets.LoadOrStore(ip, SocketFunc(func()*SocketMutex) {
        w.Wait()
        return socket
    })
    if !ok {
        socket = &{
          socket: zmq4.NewSocket()
        }
        //do some initial operation like connect
       w.Done()
    } else {
      socket = socketInter.(*SockeFunc)()
    }
    socket.Lock()
    defer socket.Unlock()
    socket.socket.Send(data)
}

まとめ:


コンカレントコードにおける競合問題は,各行のコードの再入性を熟考しなければならないだろう.総じて以下のガイドラインを維持します.
(1)アクセス不可のシステムリソース、例えばsocketfd,filefd,signalfd(実際にはこのようなシステムリソースの多くは再アクセス不可)などは、ロック構造のないコンテナ、読み書きロックパッケージのコンテナを使用する場合、各リソースに個別にロックをかけるか、またはその他の手段を用いてシステムリソースが臨界領域で有効に保護されることを保証する必要がある.
(2)読み取りがある場合、空であれば書き込むロジックは、原子性保証を提供できるLoadOrSave呼び出しを使用する必要があり、あるいはなければ、自ら実現しても読み取りと書き込みプロセス全体の原子性を保証しなければならない.同時にLoad呼び出しにアクセスすることを防止すると、複数のスレッドがNoを返して複数のインスタンスが作成され、保存時に互いに上書きされます.-この原則は,メンバーがシステムリソースである場合にのみ有効であるだけでなく,他のものが格納されている場合にも同様に適用される.
(3)リソースの作成が完了する、他の初期化プロセスが必要な場合は、容器内に閉パケットを配置することを考慮し、初期化プロセスはsyncを用いる.WaitGroup保護は、閉パッケージでWaitメソッドを呼び出して初期化が完了するまで待機し、他のスレッドに初期化されたインスタンスを返します.初期化プロセスが完了すると、閉パッケージ関数を置換し、Waitメソッドを呼び出さずに、可能なオーバーヘッドを削減できます.