PythonのTwistedフレームワークにおけるreactorイベント理解
16712 ワード
PythonのTwistedフレームワークにおけるreactorイベントマネージャの使い方を詳しく理解する
本文は主にPythonのTwistedフレームワークのreactorイベントマネージャの使い方を詳しく説明し、Twistedは人気のある非同期Python開発フレームワークである.
多くの実践の中で、私たちはいつも同様の方法で非同期プログラミングを使用しているようです.
リスニングイベントイベント発生実行対応コールバック関数コールバック完了(新しいイベントがリスニングキューに追加される可能性がある)1に戻ると、リスニングイベント従って、このような非同期モードをReactorモードと呼ぶ.例えばiOS開発におけるRun Loopの概念は、実際にはReactor loopと非常に類似しており、メインスレッドのRun Loopは画面UIイベントをリスニングし、UIイベントが発生すると対応するイベント処理コードを実行し、GCDなどの方法でメインスレッドにイベントを生成して実行することもできる.
上図はboostのReactorモードの描写で、Twistedの設計はこのようなReactorモードに基づいて、Twistedプログラムはイベントを待って、イベントを処理する過程で絶えず循環します.
?
reactorはTwistedプログラムの単一のオブジェクトです.
reactor reactorはイベントマネージャで、イベントを登録、ログアウトし、イベントループを実行し、イベントが発生したときにコールバック関数処理を呼び出す.reactorについて以下の結論があります.
Twistedのreactorはreactorを呼び出すしかない.run()で起動します.
reactorループは、開始したプロセスで実行されます.つまり、メインプロセスで実行されます.
いったん起動すると、ずっと動いていきます.reactorは、プログラムの制御下(または、そのスレッドを起動する制御下)にあります.
ReactorサイクルはCPUのリソースを消費しません.
明示的にreactorを作成する必要はなく、導入するだけでOKです.
最後の1つははっきり説明する必要がある.Twistedでは、reactorはSingleton(すなわち、単一のモード)であり、1つのプログラムに1つのreactorしか存在せず、それを導入すればそれに応じて1つを作成する.上記の方法はtwistedのデフォルトで使用される方法です.もちろん、twistedにはreactorを導入できる他の方法があります.例えばtwisted.internet.pollreactorのシステム呼び出しはselectメソッドの代わりにpollを呼び出す.
他のreactorを使用する場合はtwistedを導入する必要があります.internet.reactorの前にインストールします.次にpollreactorをインストールする方法を示します.
?
他の特殊なreactorをインストールしていない場合はtwistedを導入します.internet.reactor、Twistedはオペレーティングシステムに基づいてデフォルトのreactorをインストールします.そのため、デフォルトのreactorをインストールしないように、最上位のモジュールにreactorを導入するのではなく、reactorを使用する領域にインストールするのが習慣です.次はpollreactorを使用して上のプログラムを書き直します.
?
ではreactorはどのようにして単例を実現しますか?ちょっと見てインターネットimport reactorはどんなことをしたのか分かります.
次はtwisted/internet/reactorです.pyの部分コード:
?
注意:Pythonのメモリにロードされているすべてのモジュールはsysに配置されています.modules、グローバル辞書です.importモジュールが1つある場合、まずこのリストでモジュールがロードされているかどうかを検索します.ロードされている場合は、importを呼び出しているモジュールのネーミングスペースにモジュールの名前を追加するだけです.ロードされていない場合はsysから.pathディレクトリではモジュール名に従ってモジュールファイルを検索し、見つけたらモジュールをメモリにロードしsysに追加する.modulesで、名前を現在のネーミングスペースにインポートします.
もし私たちが初めてfrom twistedを実行したら.インターネットimport reactor、sys.modulesにはまだtwistedがありません.internet.reactor、だからreactoryを実行します.pyのコードで、デフォルトのreactorをインストールします.その後、導入するとsys.modulesには既にモジュールが存在するのでsysを直接modulesのtwisted.internet.reactorは現在のネーミングスペースにインポートされます.
defaultのinstall:
明らかにdefaultではプラットフォームに基づいて対応するinstallが取得されます.Linuxではまずepollreactorが使用され、カーネルがサポートされていない場合はpollreactorしか使用できません.Macプラットフォームはpollreactor、windowsはselectreactorを使用します.各installの実装はそれほど悪くありませんが、ここではselectreactorのinstallを抽出してみましょう.
implementerはSelectReactorがIreactorFDSetインタフェースを実現した方法を表し、ここではzopeを用いる.interface、それはpythonの中のインタフェースの実現で、興味のある学生は見に行くことができます.
IreactorFDSetインタフェースは主に記述子の取得、追加、削除などの操作の方法である.これらの方法は名前を見れば意味がわかるので、注釈をつけませんでした.
例のreactor.ListenTCP()は、親クラスPosixReactorBaseのメソッドであるリスニングイベントを登録します.
論理全体が簡単で、通常のserver側と同様に、ソケット、バインド、リスニングを作成します.異なるのは、reactorの読み取りセットにソケットの記述子を追加することです.ではclientが接続されているとreactorが監視し、イベントハンドラがトリガーされます.
reacotr.run()イベントメインループ
mianLoopは最終的な主ループであり,ループではdoIterationメソッドを呼び出して読み書き記述子の集合を監視し,記述子が読み書きの準備ができていることを発見すると対応するイベントハンドラを呼び出す.
クライアントに接続要求があると、リードセット中のtcpが呼び出される.PortのdoReadメソッド.
?
doReadメソッドでは、acceptを呼び出してクライアントデータを受信するためのソケットを生成し、ソケットをtransportにバインドし、transportをreactorの読み取りセットに追加する.クライアントにデータが来ると、transportのdoReadメソッドを呼び出してデータ読み込みを行います.
Connectionは、サーバ(transportインスタンスのクラス)の親であり、doReadメソッドを実装します.
_DataReceivedでは、例でカスタマイズしたEchoProtocolのdataReceivedメソッドを呼び出してデータを処理します.
この文書では、Twisted reactorプロセスを概略的に分析し、リスニングイベントの作成からクライアントデータの受信までを終了します.
本文は主にPythonのTwistedフレームワークのreactorイベントマネージャの使い方を詳しく説明し、Twistedは人気のある非同期Python開発フレームワークである.
多くの実践の中で、私たちはいつも同様の方法で非同期プログラミングを使用しているようです.
リスニングイベントイベント発生実行対応コールバック関数コールバック完了(新しいイベントがリスニングキューに追加される可能性がある)1に戻ると、リスニングイベント従って、このような非同期モードをReactorモードと呼ぶ.例えばiOS開発におけるRun Loopの概念は、実際にはReactor loopと非常に類似しており、メインスレッドのRun Loopは画面UIイベントをリスニングし、UIイベントが発生すると対応するイベント処理コードを実行し、GCDなどの方法でメインスレッドにイベントを生成して実行することもできる.
上図はboostのReactorモードの描写で、Twistedの設計はこのようなReactorモードに基づいて、Twistedプログラムはイベントを待って、イベントを処理する過程で絶えず循環します.
?
from twisted.internet import reactor
reactor.run()
reactorはTwistedプログラムの単一のオブジェクトです.
reactor reactorはイベントマネージャで、イベントを登録、ログアウトし、イベントループを実行し、イベントが発生したときにコールバック関数処理を呼び出す.reactorについて以下の結論があります.
Twistedのreactorはreactorを呼び出すしかない.run()で起動します.
reactorループは、開始したプロセスで実行されます.つまり、メインプロセスで実行されます.
いったん起動すると、ずっと動いていきます.reactorは、プログラムの制御下(または、そのスレッドを起動する制御下)にあります.
ReactorサイクルはCPUのリソースを消費しません.
明示的にreactorを作成する必要はなく、導入するだけでOKです.
最後の1つははっきり説明する必要がある.Twistedでは、reactorはSingleton(すなわち、単一のモード)であり、1つのプログラムに1つのreactorしか存在せず、それを導入すればそれに応じて1つを作成する.上記の方法はtwistedのデフォルトで使用される方法です.もちろん、twistedにはreactorを導入できる他の方法があります.例えばtwisted.internet.pollreactorのシステム呼び出しはselectメソッドの代わりにpollを呼び出す.
他のreactorを使用する場合はtwistedを導入する必要があります.internet.reactorの前にインストールします.次にpollreactorをインストールする方法を示します.
?
from twisted.internet import pollreactor
pollreactor.install()
他の特殊なreactorをインストールしていない場合はtwistedを導入します.internet.reactor、Twistedはオペレーティングシステムに基づいてデフォルトのreactorをインストールします.そのため、デフォルトのreactorをインストールしないように、最上位のモジュールにreactorを導入するのではなく、reactorを使用する領域にインストールするのが習慣です.次はpollreactorを使用して上のプログラムを書き直します.
?
from twited.internet import pollreactor
pollreactor.install()
from twisted.internet import reactor
reactor.run()
ではreactorはどのようにして単例を実現しますか?ちょっと見てインターネットimport reactorはどんなことをしたのか分かります.
次はtwisted/internet/reactorです.pyの部分コード:
# twisted/internet/reactor.py
import sys
del sys.modules['twisted.internet.reactor']
from twisted.internet import default
default.install()
?
注意:Pythonのメモリにロードされているすべてのモジュールはsysに配置されています.modules、グローバル辞書です.importモジュールが1つある場合、まずこのリストでモジュールがロードされているかどうかを検索します.ロードされている場合は、importを呼び出しているモジュールのネーミングスペースにモジュールの名前を追加するだけです.ロードされていない場合はsysから.pathディレクトリではモジュール名に従ってモジュールファイルを検索し、見つけたらモジュールをメモリにロードしsysに追加する.modulesで、名前を現在のネーミングスペースにインポートします.
もし私たちが初めてfrom twistedを実行したら.インターネットimport reactor、sys.modulesにはまだtwistedがありません.internet.reactor、だからreactoryを実行します.pyのコードで、デフォルトのreactorをインストールします.その後、導入するとsys.modulesには既にモジュールが存在するのでsysを直接modulesのtwisted.internet.reactorは現在のネーミングスペースにインポートされます.
defaultのinstall:
# twisted/internet/selectreactor.py:
def install():
"""Configure the twisted mainloop to be run using the select() reactor.
"""
#
reactor = SelectReactor()
from twisted.internet.main import installReactor
installReactor(reactor)
# twisted/internet/main.py:
def installReactor(reactor):
"""
Install reactor C{reactor}.
@param reactor: An object that provides one or more IReactor* interfaces.
"""
# this stuff should be common to all reactors.
import twisted.internet
import sys
if 'twisted.internet.reactor' in sys.modules:
raise error.ReactorAlreadyInstalledError("reactor already installed")
twisted.internet.reactor = reactor
sys.modules['twisted.internet.reactor'] = reactor?
明らかにdefaultではプラットフォームに基づいて対応するinstallが取得されます.Linuxではまずepollreactorが使用され、カーネルがサポートされていない場合はpollreactorしか使用できません.Macプラットフォームはpollreactor、windowsはselectreactorを使用します.各installの実装はそれほど悪くありませんが、ここではselectreactorのinstallを抽出してみましょう.
# twisted/internet/selectreactor.py:
def install():
"""Configure the twisted mainloop to be run using the select() reactor.
"""
#
reactor = SelectReactor()
from twisted.internet.main import installReactor
installReactor(reactor)
# twisted/internet/main.py:
def installReactor(reactor):
"""
Install reactor C{reactor}.
@param reactor: An object that provides one or more IReactor* interfaces.
"""
# this stuff should be common to all reactors.
import twisted.internet
import sys
if 'twisted.internet.reactor' in sys.modules:
raise error.Reacto
installReactorでsys.modules twistedを追加internet.reactorキー、値は再installで作成された単一のreactorです.後でreactorを使用すると、この例がインポートされます.SelectReactor
# twisted/internet/selectreactor.py
@implementer(IReactorFDSet)
class SelectReactor(posixbase.PosixReactorBase, _extraBase)
implementerはSelectReactorがIreactorFDSetインタフェースを実現した方法を表し、ここではzopeを用いる.interface、それはpythonの中のインタフェースの実現で、興味のある学生は見に行くことができます.
IreactorFDSetインタフェースは主に記述子の取得、追加、削除などの操作の方法である.これらの方法は名前を見れば意味がわかるので、注釈をつけませんでした.
# twisted/internet/interfaces.py
class IReactorFDSet(Interface):
def addReader(reader):
def addWriter(writer):
def removeReader(reader):
def removeWriter(writer):
def removeAll():
def getReaders():
def getWriters():
reactor.listenTCP()
例のreactor.ListenTCP()は、親クラスPosixReactorBaseのメソッドであるリスニングイベントを登録します.
# twisted/internet/posixbase.py
@implementer(IReactorTCP, IReactorUDP, IReactorMulticast)
class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin,
ReactorBase):
def listenTCP(self, port, factory, backlog=50, interface=''):
p = tcp.Port(port, factory, backlog, interface, self)
p.startListening()
return p
# twisted/internet/tcp.py
@implementer(interfaces.IListeningPort)
class Port(base.BasePort, _SocketCloser):
def __init__(self, port, factory, backlog=50, interface='', reactor=None):
"""Initialize with a numeric port to listen on.
"""
base.BasePort.__init__(self, reactor=reactor)
self.port = port
self.factory = factory
self.backlog = backlog
if abstract.isIPv6Address(interface):
self.addressFamily = socket.AF_INET6
self._addressType = address.IPv6Address
self.interface = interface
...
def startListening(self):
"""Create and bind my socket, and begin listening on it.
, 。
This is called on unserialization, and must be called after creating a
server to begin listening on the specified port.
"""
if self._preexistingSocket is None:
# Create a new socket and make it listen
try:
#
skt = self.createInternetSocket()
if self.addressFamily == socket.AF_INET6:
addr = _resolveIPv6(self.interface, self.port)
else:
addr = (self.interface, self.port)
#
skt.bind(addr)
except socket.error as le:
raise CannotListenError(self.interface, self.port, le)
#
skt.listen(self.backlog)
else:
# Re-use the externally specified socket
skt = self._preexistingSocket
self._preexistingSocket = None
# Avoid shutting it down at the end.
self._shouldShutdown = False
# Make sure that if we listened on port 0, we update that to
# reflect what the OS actually assigned us.
self._realPortNumber = skt.getsockname()[1]
log.msg("%s starting on %s" % (
self._getLogPrefix(self.factory), self._realPortNumber))
# The order of the next 5 lines is kind of bizarre. If no one
# can explain it, perhaps we should re-arrange them.
self.factory.doStart()
self.connected = True
self.socket = skt
self.fileno = self.socket.fileno
self.numberAccepts = 100
# startReading reactor addReader Port
self.startReading()
論理全体が簡単で、通常のserver側と同様に、ソケット、バインド、リスニングを作成します.異なるのは、reactorの読み取りセットにソケットの記述子を追加することです.ではclientが接続されているとreactorが監視し、イベントハンドラがトリガーされます.
reacotr.run()イベントメインループ
# twisted/internet/posixbase.py
@implementer(IReactorTCP, IReactorUDP, IReactorMulticast)
class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin,
ReactorBase)
# twisted/internet/base.py
class _SignalReactorMixin(object):
def startRunning(self, installSignalHandlers=True):
"""
PosixReactorBase _SignalReactorMixin ReactorBase ,
_SignalReactorMixin , mro , _SignalReactorMixin 。
"""
self._installSignalHandlers = installSignalHandlers
ReactorBase.startRunning(self)
def run(self, installSignalHandlers=True):
self.startRunning(installSignalHandlers=installSignalHandlers)
self.mainLoop()
def mainLoop(self):
while self._started:
try:
while self._started:
# Advance simulation time in delayed event
# processors.
self.runUntilCurrent()
t2 = self.timeout()
t = self.running and t2
# doIteration ,select,poll,epool
self.doIteration(t)
except:
log.msg("Unexpected error in main loop.")
log.err()
else:
log.msg('Main loop terminated.')
mianLoopは最終的な主ループであり,ループではdoIterationメソッドを呼び出して読み書き記述子の集合を監視し,記述子が読み書きの準備ができていることを発見すると対応するイベントハンドラを呼び出す.
# twisted/internet/selectreactor.py
@implementer(IReactorFDSet)
class SelectReactor(posixbase.PosixReactorBase, _extraBase):
def __init__(self):
"""
Initialize file descriptor tracking dictionaries and the base class.
"""
self._reads = set()
self._writes = set()
posixbase.PosixReactorBase.__init__(self)
def doSelect(self, timeout):
"""
Run one iteration of the I/O monitor loop.
This will run all selectables who had input or output readiness
waiting for them.
"""
try:
# select ,
r, w, ignored = _select(self._reads,
self._writes,
[], timeout)
except ValueError:
# Possibly a file descriptor has gone negative?
self._preenDescriptors()
return
except TypeError:
# Something *totally* invalid (object w/o fileno, non-integral
# result) was passed
log.err()
self._preenDescriptors()
return
except (select.error, socket.error, IOError) as se:
# select(2) encountered an error, perhaps while calling the fileno()
# method of a socket. (Python 2.6 socket.error is an IOError
# subclass, but on Python 2.5 and earlier it is not.)
if se.args[0] in (0, 2):
# windows does this if it got an empty list
if (not self._reads) and (not self._writes):
return
else:
raise
elif se.args[0] == EINTR:
return
elif se.args[0] == EBADF:
self._preenDescriptors()
return
else:
# OK, I really don't know what's going on. Blow up.
raise
_drdw = self._doReadOrWrite
_logrun = log.callWithLogger
for selectables, method, fdset in ((r, "doRead", self._reads),
(w,"doWrite", self._writes)):
for selectable in selectables:
# if this was disconnected in another thread, kill it.
# ^^^^ --- what the !@#*? serious! -exarkun
if selectable not in fdset:
continue
# This for pausing input when we're not ready for more.
# _doReadOrWrite
_logrun(selectable, _drdw, selectable, method)
doIteration = doSelect
def _doReadOrWrite(self, selectable, method):
try:
# method,doRead doWrite,
# selectable tcp.Port
why = getattr(selectable, method)()
except:
why = sys.exc_info()[1]
log.err()
if why:
self._disconnectSelectable(selectable, why, method=="doRead")
クライアントに接続要求があると、リードセット中のtcpが呼び出される.PortのdoReadメソッド.
?
# twisted/internet/tcp.py
@implementer(interfaces.IListeningPort)
class Port(base.BasePort, _SocketCloser):
def doRead(self):
"""Called when my socket is ready for reading.
This accepts a connection and calls self.protocol() to handle the
wire-level protocol.
"""
try:
if platformType == "posix":
numAccepts = self.numberAccepts
else:
numAccepts = 1
for i in range(numAccepts):
if self.disconnecting:
return
try:
# accept
skt, addr = self.socket.accept()
except socket.error as e:
if e.args[0] in (EWOULDBLOCK, EAGAIN):
self.numberAccepts = i
break
elif e.args[0] == EPERM:
continue
elif e.args[0] in (EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABORTED):
log.msg("Could not accept new connection (%s)" % (
errorcode[e.args[0]],))
break
raise
fdesc._setCloseOnExec(skt.fileno())
protocol = self.factory.buildProtocol(self._buildAddr(addr))
if protocol is None:
skt.close()
continue
s = self.sessionno
self.sessionno = s+1
# transport , reactor ,
# , doRead
transport = self.transport(skt, protocol, addr, self, s, self.reactor)
protocol.makeConnection(transport)
else:
self.numberAccepts = self.numberAccepts+20
except:
log.deferr()
# twisted/internet/tcp.py
@implementer(interfaces.IListeningPort)
class Port(base.BasePort, _SocketCloser):
def doRead(self):
"""Called when my socket is ready for reading.
This accepts a connection and calls self.protocol() to handle the
wire-level protocol.
"""
try:
if platformType == "posix":
numAccepts = self.numberAccepts
else:
numAccepts = 1
for i in range(numAccepts):
if self.disconnecting:
return
try:
# accept
skt, addr = self.socket.accept()
except socket.error as e:
if e.args[0] in (EWOULDBLOCK, EAGAIN):
self.numberAccepts = i
break
elif e.args[0] == EPERM:
continue
elif e.args[0] in (EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABORTED):
log.msg("Could not accept new connection (%s)" % (
errorcode[e.args[0]],))
break
raise
fdesc._setCloseOnExec(skt.fileno())
protocol = self.factory.buildProtocol(self._buildAddr(addr))
if protocol is None:
skt.close()
continue
s = self.sessionno
self.sessionno = s+1
# transport , reactor ,
# , doRead
transport = self.transport(skt, protocol, addr, self, s, self.reactor)
protocol.makeConnection(transport)
else:
self.numberAccepts = self.numberAccepts+20
except:
log.deferr()
doReadメソッドでは、acceptを呼び出してクライアントデータを受信するためのソケットを生成し、ソケットをtransportにバインドし、transportをreactorの読み取りセットに追加する.クライアントにデータが来ると、transportのdoReadメソッドを呼び出してデータ読み込みを行います.
Connectionは、サーバ(transportインスタンスのクラス)の親であり、doReadメソッドを実装します.
# twisted/internet/tcp.py
@implementer(interfaces.ITCPTransport, interfaces.ISystemHandle)
class Connection(_TLSConnectionMixin, abstract.FileDescriptor, _SocketCloser,
_AbortingMixin):
def doRead(self):
try:
#
data = self.socket.recv(self.bufferSize)
except socket.error as se:
if se.args[0] == EWOULDBLOCK:
return
else:
return main.CONNECTION_LOST
return self._dataReceived(data)
def _dataReceived(self, data):
if not data:
return main.CONNECTION_DONE
# protocol dataReceived
rval = self.protocol.dataReceived(data)
if rval is not None:
offender = self.protocol.dataReceived
warningFormat = (
'Returning a value other than None from %(fqpn)s is '
'deprecated since %(version)s.')
warningString = deprecate.getDeprecationWarningString(
offender, versions.Version('Twisted', 11, 0, 0),
format=warningFormat)
deprecate.warnAboutFunction(offender, warningString)
return rval
_DataReceivedでは、例でカスタマイズしたEchoProtocolのdataReceivedメソッドを呼び出してデータを処理します.
この文書では、Twisted reactorプロセスを概略的に分析し、リスニングイベントの作成からクライアントデータの受信までを終了します.