reactor解剖術(2)
前章のコードを見て、reactorはreader/writerに積極的に参加したことがないようですが、reactorはどのようにsocketを操作しますか?
reactorがrunの前に何をしたか再想像しますか?
そうだ、接続/接続を確立!
reactorのようにlistenTCP
def listenTCP(self, port, factory, backlog=50, interface=''):
p = tcp.Port(port, factory, backlog, interface, self)
p.startListening()
return p
tcpを見てPortのデザイン
tcp.Portはbaseに継承する.BasePortとtcp.SocketCloser,
そしてBasePortはabstractに継承する.FileDescriptor、抽象的なファイルオペレータクラス
tcp.Portインスタンス化ではあまり動作しませんでしたが、メソッドstartListeningに焦点を当てました.
# tcp.Port.startListening socket
# , startReading
def startListening(self):
try:
skt = self.createInternetSocket()
skt.bind((self.interface, self.port))
except socket.error, le:
raise CannotListenError, (self.interface, self.port, le)
# Make sure that if we listened on port 0, we update that to
# reflect what the OS actually assigned us.
self._realPortNumber = skt.getsockname()[1]
log.msg("%s starting on %s" % (self.factory.__class__, self._realPortNumber))
# The order of the next 6 lines is kind of bizarre. If no one
# can explain it, perhaps we should re-arrange them.
self.factory.doStart()
skt.listen(self.backlog)
self.connected = True
self.socket = skt
self.fileno = self.socket.fileno
self.numberAccepts = 100
self.startReading()
# abstract.FileDescriptor.startReading
# reactor.addReader
def startReading(self):
"""Start waiting for read availability.
"""
self.reactor.addReader(self)
# selectreactor.SelectReactor.addReader
# tcp.Port reader reactor reads
def addReader(self, reader):
"""
Add a FileDescriptor for notification of data available to read.
"""
self._reads[reader] = 1
ここにいたのか?ListencTCPはreaderキューに組み込まれる.
振り返ってみろSelectReactor.doSelectでは、クラスファイルオペレータの状態が変化するとdoRead/doWriterメソッドが実行される.ではreaderとしてのtcpを見てみましょう.PortのdoReadメソッド
# tcp.Port socket ,
# self.factory.buildProtocol portocol
# self.transport tcp.Server
def doRead(self):
try:
if platformType == "posix":
numAccepts = self.numberAccepts
else:
# win32 event loop breaks if we do more than one accept()
# in an iteration of the event loop.
numAccepts = 1
for i in range(numAccepts):
# we need this so we can deal with a factory's buildProtocol
# calling our loseConnection
if self.disconnecting:
return
try:
skt, addr = self.socket.accept()
except socket.error, e:
if e.args[0] in (EWOULDBLOCK, EAGAIN):
self.numberAccepts = i
break
elif e.args[0] == EPERM:
# Netfilter on Linux may have rejected the
# connection, but we get told to try to accept()
# anyway.
continue
elif e.args[0] in (EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABORTED):
log.msg("Could not accept new connection (%s)" % (
errorcode[e.args[0]],))
break
raise
protocol = self.factory.buildProtocol(self._buildAddr(addr))
if protocol is None:
skt.close()
continue
s = self.sessionno
self.sessionno = s+1
transport = self.transport(skt, protocol, addr, self, s, self.reactor)
transport = self._preMakeConnection(transport)
protocol.makeConnection(transport)
else:
self.numberAccepts = self.numberAccepts+20
except:
log.deferr()
まだぼんやりしていますが、protocolオブジェクトがここで生まれたことがわかりました.では、このtransportインスタンスの具体的な役割は何でしょうか.
まずprotocolを見てください.makeConnection
# protocol.BaseProtocol
def makeConnection(self, transport):
self.connected = 1
self.transport = transport
self.connectionMade()
おなじみの方法connectionMadeを見ました!
protocolの3つのイベントメソッドconnectionMade,dataReceived,connectionLostはprotocolの最も重要な3つのメソッドです.
その1つが現れ、残りの2つはどこでトリガーされたのでしょうか.
焦らないで、transportがどうなっているか見てみましょう.
tcp.Serverは親tcpから来る.Connection. 接続はabstractに継承されます.FileDescriptor、またクラスファイルです.
tcp.Serverインスタンスではちょっとしたアクションをしました
# tcp.Server
def __init__(self, sock, protocol, client, server, sessionno, reactor):
Connection.__init__(self, sock, protocol, reactor)
self.server = server
self.client = client
self.sessionno = sessionno
self.hostname = client[0]
self.logstr = "%s,%s,%s" % (self.protocol.__class__.__name__,
sessionno,
self.hostname)
self.repstr = "<%s #%s on %s>" % (self.protocol.__class__.__name__,
self.sessionno,
self.server._realPortNumber)
self.startReading()
self.connected = 1
self.startReading abstractからFileDescriptorでは、このインスタンスをreaderとしてreactorキューに組み込むことが知られている.
tcpを見てみましょうServerのdoReadメソッド
# tcp.Connection
def doRead(self):
"""Calls self.protocol.dataReceived with all available data.
This reads up to self.bufferSize bytes of data from its socket, then
calls self.dataReceived(data) to process it. If the connection is not
lost through an error in the physical recv(), this function will return
the result of the dataReceived call.
"""
try:
data = self.socket.recv(self.bufferSize)
except socket.error, se:
if se.args[0] == EWOULDBLOCK:
return
else:
return main.CONNECTION_LOST
if not data:
return main.CONNECTION_DONE
return self.protocol.dataReceived(data)
目の前が明るくなると、dataReceivedメソッド!