PythonのTwistedフレームワークにおけるreactorイベント理解

16712 ワード

PythonのTwistedフレームワークにおけるreactorイベントマネージャの使い方を詳しく理解する
本文は主にPythonのTwistedフレームワークのreactorイベントマネージャの使い方を詳しく説明し、Twistedは人気のある非同期Python開発フレームワークである.
多くの実践の中で、私たちはいつも同様の方法で非同期プログラミングを使用しているようです.
リスニングイベントイベント発生実行対応コールバック関数コールバック完了(新しいイベントがリスニングキューに追加される可能性がある)1に戻ると、リスニングイベント従って、このような非同期モードをReactorモードと呼ぶ.例えばiOS開発におけるRun Loopの概念は、実際にはReactor loopと非常に類似しており、メインスレッドのRun Loopは画面UIイベントをリスニングし、UIイベントが発生すると対応するイベント処理コードを実行し、GCDなどの方法でメインスレッドにイベントを生成して実行することもできる.
上図はboostのReactorモードの描写で、Twistedの設計はこのようなReactorモードに基づいて、Twistedプログラムはイベントを待って、イベントを処理する過程で絶えず循環します.
?
from twisted.internet import reactor
reactor.run()

reactorはTwistedプログラムの単一のオブジェクトです.
reactor reactorはイベントマネージャで、イベントを登録、ログアウトし、イベントループを実行し、イベントが発生したときにコールバック関数処理を呼び出す.reactorについて以下の結論があります.
Twistedのreactorはreactorを呼び出すしかない.run()で起動します.
reactorループは、開始したプロセスで実行されます.つまり、メインプロセスで実行されます.
いったん起動すると、ずっと動いていきます.reactorは、プログラムの制御下(または、そのスレッドを起動する制御下)にあります.
ReactorサイクルはCPUのリソースを消費しません.
明示的にreactorを作成する必要はなく、導入するだけでOKです.
最後の1つははっきり説明する必要がある.Twistedでは、reactorはSingleton(すなわち、単一のモード)であり、1つのプログラムに1つのreactorしか存在せず、それを導入すればそれに応じて1つを作成する.上記の方法はtwistedのデフォルトで使用される方法です.もちろん、twistedにはreactorを導入できる他の方法があります.例えばtwisted.internet.pollreactorのシステム呼び出しはselectメソッドの代わりにpollを呼び出す.
他のreactorを使用する場合はtwistedを導入する必要があります.internet.reactorの前にインストールします.次にpollreactorをインストールする方法を示します.
?
from twisted.internet import pollreactor
pollreactor.install()

他の特殊なreactorをインストールしていない場合はtwistedを導入します.internet.reactor、Twistedはオペレーティングシステムに基づいてデフォルトのreactorをインストールします.そのため、デフォルトのreactorをインストールしないように、最上位のモジュールにreactorを導入するのではなく、reactorを使用する領域にインストールするのが習慣です.次はpollreactorを使用して上のプログラムを書き直します.
?
from twited.internet import pollreactor
pollreactor.install()
from twisted.internet import reactor
reactor.run()

ではreactorはどのようにして単例を実現しますか?ちょっと見てインターネットimport reactorはどんなことをしたのか分かります.
次はtwisted/internet/reactorです.pyの部分コード:
# twisted/internet/reactor.py
import sys
del sys.modules['twisted.internet.reactor']
from twisted.internet import default
default.install()

?
注意:Pythonのメモリにロードされているすべてのモジュールはsysに配置されています.modules、グローバル辞書です.importモジュールが1つある場合、まずこのリストでモジュールがロードされているかどうかを検索します.ロードされている場合は、importを呼び出しているモジュールのネーミングスペースにモジュールの名前を追加するだけです.ロードされていない場合はsysから.pathディレクトリではモジュール名に従ってモジュールファイルを検索し、見つけたらモジュールをメモリにロードしsysに追加する.modulesで、名前を現在のネーミングスペースにインポートします.
もし私たちが初めてfrom twistedを実行したら.インターネットimport reactor、sys.modulesにはまだtwistedがありません.internet.reactor、だからreactoryを実行します.pyのコードで、デフォルトのreactorをインストールします.その後、導入するとsys.modulesには既にモジュールが存在するのでsysを直接modulesのtwisted.internet.reactorは現在のネーミングスペースにインポートされます.
defaultのinstall:
# twisted/internet/selectreactor.py:
def install():
  """Configure the twisted mainloop to be run using the select() reactor.
  """
  #   
  reactor = SelectReactor()
  from twisted.internet.main import installReactor
  installReactor(reactor)
 
# twisted/internet/main.py:
def installReactor(reactor):
  """
  Install reactor C{reactor}.
 
  @param reactor: An object that provides one or more IReactor* interfaces.
  """
  # this stuff should be common to all reactors.
  import twisted.internet
  import sys
  if 'twisted.internet.reactor' in sys.modules:
    raise error.ReactorAlreadyInstalledError("reactor already installed")
  twisted.internet.reactor = reactor
  sys.modules['twisted.internet.reactor'] = reactor?

明らかにdefaultではプラットフォームに基づいて対応するinstallが取得されます.Linuxではまずepollreactorが使用され、カーネルがサポートされていない場合はpollreactorしか使用できません.Macプラットフォームはpollreactor、windowsはselectreactorを使用します.各installの実装はそれほど悪くありませんが、ここではselectreactorのinstallを抽出してみましょう.
# twisted/internet/selectreactor.py:
def install():
  """Configure the twisted mainloop to be run using the select() reactor.
  """
  #   
  reactor = SelectReactor()
  from twisted.internet.main import installReactor
  installReactor(reactor)
 
# twisted/internet/main.py:
def installReactor(reactor):
  """
  Install reactor C{reactor}.
 
  @param reactor: An object that provides one or more IReactor* interfaces.
  """
  # this stuff should be common to all reactors.
  import twisted.internet
  import sys
  if 'twisted.internet.reactor' in sys.modules:
    raise error.Reacto
installReactorでsys.modules twistedを追加internet.reactorキー、値は再installで作成された単一のreactorです.後でreactorを使用すると、この例がインポートされます.
SelectReactor
# twisted/internet/selectreactor.py
@implementer(IReactorFDSet)
class SelectReactor(posixbase.PosixReactorBase, _extraBase)

implementerはSelectReactorがIreactorFDSetインタフェースを実現した方法を表し、ここではzopeを用いる.interface、それはpythonの中のインタフェースの実現で、興味のある学生は見に行くことができます.
IreactorFDSetインタフェースは主に記述子の取得、追加、削除などの操作の方法である.これらの方法は名前を見れば意味がわかるので、注釈をつけませんでした.
# twisted/internet/interfaces.py
class IReactorFDSet(Interface):
 
  def addReader(reader):
 
  def addWriter(writer):
 
  def removeReader(reader):
 
  def removeWriter(writer):
 
  def removeAll():
 
  def getReaders():
 
  def getWriters():
reactor.listenTCP()

例のreactor.ListenTCP()は、親クラスPosixReactorBaseのメソッドであるリスニングイベントを登録します.
# twisted/internet/posixbase.py
@implementer(IReactorTCP, IReactorUDP, IReactorMulticast)
class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin,
            ReactorBase):
 
  def listenTCP(self, port, factory, backlog=50, interface=''):
    p = tcp.Port(port, factory, backlog, interface, self)
    p.startListening()
    return p
 
# twisted/internet/tcp.py
@implementer(interfaces.IListeningPort)
class Port(base.BasePort, _SocketCloser):
  def __init__(self, port, factory, backlog=50, interface='', reactor=None):
    """Initialize with a numeric port to listen on.
    """
    base.BasePort.__init__(self, reactor=reactor)
    self.port = port
    self.factory = factory
    self.backlog = backlog
    if abstract.isIPv6Address(interface):
      self.addressFamily = socket.AF_INET6
      self._addressType = address.IPv6Address
    self.interface = interface
  ...
 
  def startListening(self):
    """Create and bind my socket, and begin listening on it.
             ,    。
 
    This is called on unserialization, and must be called after creating a
    server to begin listening on the specified port.
    """
    if self._preexistingSocket is None:
      # Create a new socket and make it listen
      try:
        #      
        skt = self.createInternetSocket()
        if self.addressFamily == socket.AF_INET6:
          addr = _resolveIPv6(self.interface, self.port)
        else:
          addr = (self.interface, self.port)
        #   
        skt.bind(addr)
      except socket.error as le:
        raise CannotListenError(self.interface, self.port, le)
      #   
      skt.listen(self.backlog)
    else:
      # Re-use the externally specified socket
      skt = self._preexistingSocket
      self._preexistingSocket = None
      # Avoid shutting it down at the end.
      self._shouldShutdown = False
 
    # Make sure that if we listened on port 0, we update that to
    # reflect what the OS actually assigned us.
    self._realPortNumber = skt.getsockname()[1]
 
    log.msg("%s starting on %s" % (
        self._getLogPrefix(self.factory), self._realPortNumber))
 
    # The order of the next 5 lines is kind of bizarre. If no one
    # can explain it, perhaps we should re-arrange them.
    self.factory.doStart()
    self.connected = True
    self.socket = skt
    self.fileno = self.socket.fileno
    self.numberAccepts = 100
 
    # startReading  reactor addReader   Port     
    self.startReading()

論理全体が簡単で、通常のserver側と同様に、ソケット、バインド、リスニングを作成します.異なるのは、reactorの読み取りセットにソケットの記述子を追加することです.ではclientが接続されているとreactorが監視し、イベントハンドラがトリガーされます.
reacotr.run()イベントメインループ
# twisted/internet/posixbase.py
@implementer(IReactorTCP, IReactorUDP, IReactorMulticast)
class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin,
            ReactorBase)
 
# twisted/internet/base.py
class _SignalReactorMixin(object):
 
  def startRunning(self, installSignalHandlers=True):
    """
    PosixReactorBase   _SignalReactorMixin ReactorBase     ,  
    _SignalReactorMixin  ,  mro    ,    _SignalReactorMixin  。
    """
    self._installSignalHandlers = installSignalHandlers
    ReactorBase.startRunning(self)
 
  def run(self, installSignalHandlers=True):
    self.startRunning(installSignalHandlers=installSignalHandlers)
    self.mainLoop()
 
  def mainLoop(self):
    while self._started:
      try:
        while self._started:
          # Advance simulation time in delayed event
          # processors.
          self.runUntilCurrent()
          t2 = self.timeout()
          t = self.running and t2
          # doIteration   ,select,poll,epool      
          self.doIteration(t)
      except:
        log.msg("Unexpected error in main loop.")
        log.err()
      else:
        log.msg('Main loop terminated.')

mianLoopは最終的な主ループであり,ループではdoIterationメソッドを呼び出して読み書き記述子の集合を監視し,記述子が読み書きの準備ができていることを発見すると対応するイベントハンドラを呼び出す.
# twisted/internet/selectreactor.py
@implementer(IReactorFDSet)
class SelectReactor(posixbase.PosixReactorBase, _extraBase):
 
  def __init__(self):
    """
    Initialize file descriptor tracking dictionaries and the base class.
    """
    self._reads = set()
    self._writes = set()
    posixbase.PosixReactorBase.__init__(self)
 
  def doSelect(self, timeout):
    """
    Run one iteration of the I/O monitor loop.
 
    This will run all selectables who had input or output readiness
    waiting for them.
    """
    try:
      #   select        ,           
      r, w, ignored = _select(self._reads,
                  self._writes,
                  [], timeout)
    except ValueError:
      # Possibly a file descriptor has gone negative?
      self._preenDescriptors()
      return
    except TypeError:
      # Something *totally* invalid (object w/o fileno, non-integral
      # result) was passed
      log.err()
      self._preenDescriptors()
      return
    except (select.error, socket.error, IOError) as se:
      # select(2) encountered an error, perhaps while calling the fileno()
      # method of a socket. (Python 2.6 socket.error is an IOError
      # subclass, but on Python 2.5 and earlier it is not.)
      if se.args[0] in (0, 2):
        # windows does this if it got an empty list
        if (not self._reads) and (not self._writes):
          return
        else:
          raise
      elif se.args[0] == EINTR:
        return
      elif se.args[0] == EBADF:
        self._preenDescriptors()
        return
      else:
        # OK, I really don't know what's going on. Blow up.
        raise
 
    _drdw = self._doReadOrWrite
    _logrun = log.callWithLogger
    for selectables, method, fdset in ((r, "doRead", self._reads),
                      (w,"doWrite", self._writes)):
      for selectable in selectables:
        # if this was disconnected in another thread, kill it.
        # ^^^^ --- what the !@#*? serious! -exarkun
        if selectable not in fdset:
          continue
        # This for pausing input when we're not ready for more.
 
        #   _doReadOrWrite  
        _logrun(selectable, _drdw, selectable, method)
 
  doIteration = doSelect
 
  def _doReadOrWrite(self, selectable, method):
    try:
      #   method,doRead   doWrite,
      #    selectable        tcp.Port
      why = getattr(selectable, method)()
    except:
      why = sys.exc_info()[1]
      log.err()
    if why:
      self._disconnectSelectable(selectable, why, method=="doRead")

クライアントに接続要求があると、リードセット中のtcpが呼び出される.PortのdoReadメソッド.
?
# twisted/internet/tcp.py
 
@implementer(interfaces.IListeningPort)
class Port(base.BasePort, _SocketCloser):
 
  def doRead(self):
    """Called when my socket is ready for reading.
                 
 
    This accepts a connection and calls self.protocol() to handle the
    wire-level protocol.
    """
    try:
      if platformType == "posix":
        numAccepts = self.numberAccepts
      else:
        numAccepts = 1
      for i in range(numAccepts):
        if self.disconnecting:
          return
        try:
          #   accept
          skt, addr = self.socket.accept()
        except socket.error as e:
          if e.args[0] in (EWOULDBLOCK, EAGAIN):
            self.numberAccepts = i
            break
          elif e.args[0] == EPERM:
            continue
          elif e.args[0] in (EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABORTED):
            log.msg("Could not accept new connection (%s)" % (
              errorcode[e.args[0]],))
            break
          raise
 
        fdesc._setCloseOnExec(skt.fileno())
        protocol = self.factory.buildProtocol(self._buildAddr(addr))
        if protocol is None:
          skt.close()
          continue
        s = self.sessionno
        self.sessionno = s+1
        # transport       ,       reactor     ,      
        #      ,       doRead              
        transport = self.transport(skt, protocol, addr, self, s, self.reactor)
        protocol.makeConnection(transport)
      else:
        self.numberAccepts = self.numberAccepts+20
    except:
      log.deferr()
# twisted/internet/tcp.py
 
@implementer(interfaces.IListeningPort)
class Port(base.BasePort, _SocketCloser):
 
  def doRead(self):
    """Called when my socket is ready for reading.
                 
 
    This accepts a connection and calls self.protocol() to handle the
    wire-level protocol.
    """
    try:
      if platformType == "posix":
        numAccepts = self.numberAccepts
      else:
        numAccepts = 1
      for i in range(numAccepts):
        if self.disconnecting:
          return
        try:
          #   accept
          skt, addr = self.socket.accept()
        except socket.error as e:
          if e.args[0] in (EWOULDBLOCK, EAGAIN):
            self.numberAccepts = i
            break
          elif e.args[0] == EPERM:
            continue
          elif e.args[0] in (EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABORTED):
            log.msg("Could not accept new connection (%s)" % (
              errorcode[e.args[0]],))
            break
          raise
 
        fdesc._setCloseOnExec(skt.fileno())
        protocol = self.factory.buildProtocol(self._buildAddr(addr))
        if protocol is None:
          skt.close()
          continue
        s = self.sessionno
        self.sessionno = s+1
        # transport       ,       reactor     ,      
        #      ,       doRead              
        transport = self.transport(skt, protocol, addr, self, s, self.reactor)
        protocol.makeConnection(transport)
      else:
        self.numberAccepts = self.numberAccepts+20
    except:
      log.deferr()

doReadメソッドでは、acceptを呼び出してクライアントデータを受信するためのソケットを生成し、ソケットをtransportにバインドし、transportをreactorの読み取りセットに追加する.クライアントにデータが来ると、transportのdoReadメソッドを呼び出してデータ読み込みを行います.
Connectionは、サーバ(transportインスタンスのクラス)の親であり、doReadメソッドを実装します.
# twisted/internet/tcp.py
@implementer(interfaces.ITCPTransport, interfaces.ISystemHandle)
class Connection(_TLSConnectionMixin, abstract.FileDescriptor, _SocketCloser,
         _AbortingMixin):
 
  def doRead(self):
    try:
      #     
      data = self.socket.recv(self.bufferSize)
    except socket.error as se:
      if se.args[0] == EWOULDBLOCK:
        return
      else:
        return main.CONNECTION_LOST
 
    return self._dataReceived(data)
 
  def _dataReceived(self, data):
    if not data:
      return main.CONNECTION_DONE
    #        protocol dataReceived      
    rval = self.protocol.dataReceived(data)
    if rval is not None:
      offender = self.protocol.dataReceived
      warningFormat = (
        'Returning a value other than None from %(fqpn)s is '
        'deprecated since %(version)s.')
      warningString = deprecate.getDeprecationWarningString(
        offender, versions.Version('Twisted', 11, 0, 0),
        format=warningFormat)
      deprecate.warnAboutFunction(offender, warningString)
    return rval

_DataReceivedでは、例でカスタマイズしたEchoProtocolのdataReceivedメソッドを呼び出してデータを処理します.
この文書では、Twisted reactorプロセスを概略的に分析し、リスニングイベントの作成からクライアントデータの受信までを終了します.