golangはどのように多重非同期ioをブロックioに変換するか
6831 ワード
package main
import (
"net"
)
func handleConnection(c net.Conn) {
//
buffer := make([]byte, 1024)
c.Read(buffer)
c.Write([]byte("Hello from server"))
}
func main() {
l, err := net.Listen("tcp", "host:port")
if err != nil {
return
}
defer l.Close()
for {
c, err := l.Accept()
if err!= nil {
return
}
go handleConnection(c)
}
}
私たちが上のコードを書くのは簡単です.確かにgolangのネットワーク部分は私たちに多くのものを隠しています.私たちはc++のように下位のsocket関数を呼び出す必要はありません.epollなどの複雑なio多重化に関する論理を使う必要はありません.しかし、上のコードは本当に私たちが見たようにacceptとreadを呼び出すときにブロックされますか.
// Multiple goroutines may invoke methods on a Conn simultaneously.
// : goroutines ,
// goroutines , goroutines ,
// , 。
type Conn interface {
Read(b []byte) (n int, err error)
Write(b []byte) (n int, err error)
Close() error
LocalAddr() Addr
RemoteAddr() Addr
SetDeadline(t time.Time) error
SetReadDeadline(t time.Time) error
SetWriteDeadline(t time.Time) error
}
type conn struct {
fd *netFD
}
この中にもう一つのConnインタフェースがあります.次のconnはこのインタフェースを実現しました.中にはメンバーnetFDが一人しかいません.
// Network file descriptor.
type netFD struct {
// locking/lifetime of sysfd + serialize access to Read and Write methods
fdmu fdMutex
// immutable until Close
sysfd int
family int
sotype int
isConnected bool
net string
laddr Addr
raddr Addr
// wait server
pd pollDesc
}
func (fd *netFD) accept() (netfd *netFD, err error) {
//................
for {
s, rsa, err = accept(fd.sysfd)
if err != nil {
nerr, ok := err.(*os.SyscallError)
if !ok {
return nil, err
}
switch nerr.Err {
/* EAGAIN Socket ,
fd.pd.WaitRead,*/
case syscall.EAGAIN:
if err = fd.pd.waitRead(); err == nil {
continue
}
case syscall.ECONNABORTED:
continue
}
return nil, err
}
break
}
//.........
// , go ,runtime fd_unix.go
return netfd, nil
}
上記のコードセグメントはaccept部分であり、ここではacceptにエラーが発生した場合、このエラーがsyscallであるかどうかを確認することに注意する.EAGAIN、もしそうであれば、WaitReadを呼び出して、このfd上の読み取りイベントが再び発生するまで、このfdを現在読んでいるgoroutineをここで待つ.このsocketに新しいデータが来ると、WaitRead呼び出しが返され、forループの実行が継続され、netFDを呼び出すReadの場所が同期「ブロック」になります.興味のあるnetFDの読み書き方法は、同じように実現されています.
ここまですべての疑問がpollDescに集中していますが、それはいったい何なのでしょうか.
const (
pdReady uintptr = 1
pdWait uintptr = 2
)
// Network poller descriptor.
type pollDesc struct {
link *pollDesc // in pollcache, protected by pollcache.lock
lock mutex // protects the following fields
fd uintptr
closing bool
seq uintptr // protects from stale timers and ready notifications
rg uintptr // pdReady, pdWait, G waiting for read or nil
rt timer // read deadline timer (set if rt.f != nil)
rd int64 // read deadline
wg uintptr // pdReady, pdWait, G waiting for write or nil
wt timer // write deadline timer
wd int64 // write deadline
user uint32 // user settable cookie
}
type pollCache struct {
lock mutex
first *pollDesc
}
pollDescネットワークポーリングは、Golangでsocketファイル記述子ごとに確立されたポーリングメカニズムです.ここでのポーリングは一般的なポーリングではなく、GolangのruntimeがgoroutineまたはGCのスケジューリングが完了した後または指定された時間内にepoll_を呼び出すwaitはIOイベントを生成するすべてのsocketファイル記述子を取得する.もちろんruntimeポーリングの前に、socketファイル記述子と現在のgoroutineに関する情報をepollメンテナンスのデータ構造に追加し、現在のgoroutineを保留する必要があります.IOが準備ができたら、epollから返されたファイル記述子とその中に付属するgoroutineの情報を通じて、現在のgoroutineの実行を再復元します.ここでpollDescには2つの変数wgとrgがありますが、実際には信号量と見なすことができます.この2つの変数にはいくつかの異なる状態があります.
pdReady:io準備完了
pdWait:現在のgoroutineは信号量に掛ける準備をしていますが、まだ掛けていません.
G pointer:現在のgoroutineへのポインタに変更すると、現在のgoroutineが停止します.
続けて上のWaitRead呼び出しに続き、goはここでいったい何をして現在のgoroutineを掛けたのだろうか.
func net_runtime_pollWait(pd *pollDesc, mode int) int {
err := netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
// As for now only Solaris uses level-triggered IO.
if GOOS == "solaris" {
netpollarm(pd, mode)
}
for !netpollblock(pd, int32(mode), false) {
err = netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
// Can happen if timeout has fired and unblocked us,
// but before we had a chance to run, timeout has been reset.
// Pretend it has not happened and retry.
}
return 0
}
// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
// pollDesc
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
for {
old := *gpp
// true
if old == pdReady {
*gpp = 0
return true
}
if old != 0 {
throw("netpollblock: double wait")
}
// gpp pdWait
if atomic.Casuintptr(gpp, 0, pdWait) {
break
}
}
if waitio || netpollcheckerr(pd, mode) == 0 {
gopark(netpollblockcommit, unsafe.Pointer(gpp), "IO wait", traceEvGoBlockNet, 5)
}
old := atomic.Xchguintptr(gpp, 0)
if old > pdWait {
throw("netpollblock: corrupted state")
}
return old == pdReady
}
WaitReadが呼び出されると、上のnet_がアセンブリされて最も再呼び出されます.runtime_pollWait関数は、netpollblock関数をループ呼び出し、trueはioが準備されていることを示し、falseはエラーまたはタイムアウトを示し、netpollblockでgopark関数を呼び出し、gopark関数はmcallの関数を呼び出し、この関数はアセンブリで実現され、具体的な機能は現在のgoroutineを保留し、他の実行可能なgoroutineを実行することである.ここまでgoroutineがハングアップされる過程は終わりましたが、goroutineが読めるようになったときにどのようにgoroutineに通知するのか、これがepollのおかげです.
func netpoll(block bool) *g {
if epfd == -1 {
return nil
}
waitms := int32(-1)
if !block {
waitms = 0
}
var events [128]epollevent
retry:
// 128
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
if n != -_EINTR {
println("runtime: epollwait on fd", epfd, "failed with", -n)
throw("epollwait failed")
}
goto retry
}
var gp guintptr
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
var mode int32
//
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
//
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
// epoll data pollDesc
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
netpollready(&gp, pd, mode)
}
}
if block && gp == 0 {
goto retry
}
return gp.ptr()
}
ここはおなじみのコードですが、epollの使用は、親民的に見えます.pd:=*(**pollDesc)(unsafe.Pointer(&ev.data))これは最も重要な文で、ここで現在の可読時間のpollDescを手に入れます.pollDescの読み書き信号量がG pointerに保存されると、現在のgoroutineがハングアップします.ここではnetpollready関数を呼び出し,関数では対応する読み書き信号量Gポインタを消去しpdReady,G-pointer状態を消去し,現在goroutineのGポインタを実行可能なキューに配置することでgoroutineが起動する.
tcp serverを書くとブロックされたネットワークモデルのように見えるが,その下層では実際には非同期多重化のメカニズムに基づいて実現され,ブロックioに似た開発モデルにカプセル化されているだけで,異歩io,多重化などの複雑な概念や混乱したコールバック関数に注目する必要がないことがわかる.