PythonのTwistedフレームの核心特性を分析します。

9116 ワード

一.レクチャー
twistedの核心はreactorであり、reactorといえば同期/非同期、ブロック/非ブロックが避けられないが、Daveの第一章概念紹介では同期/非同期の限界が少しぼやけています。同期/非同期については、ブロック/ブロック/非ブロックについては、知覚的な議論を参照してください。proactorとreactorについては、ここで紹介しているブログがあります。
reactorモードのネットワークIOは、同期IOであって、非同期IOではないはずです。Dave第一章で言及した非同期は、オペレーティングシステムによってランダムに停止されるのではなく、タスクに対するコントロールを明示的に放棄することであり、プログラマはタスクをシーケンスに組織して交替するステップによって完成しなければならない。したがって、一つのタスクが他のタスクの出力に使用される場合、依存するタスク(すなわち出力を受信するタスク)は、一連のビットまたはスライスを一度に受信するのではなく、受信するように設計される必要がある。
明示的に自発的に任務を放棄する制御権は協働の考え方に似ています。reactorは協働のスケジューラと見なすことができます。reactorはイベントサイクルです。私たちはreactorに自分の興味のあるイベント(例えば、ソケット読み取り/書き込み可能)とプロセッサ(例えば、読み書き操作を実行する)を登録してください。reactorはイベントが発生した時にプロセッサに連絡して、プロセッサが実行された後、ワークステーションに相当します。reactor自体には同期イベントのマルチプレックスがあり、select/epollなどの仕組みで実現できます。もちろんtwisted reactorのイベントトリガはIOに基づくものではなく、タイマーなどの他のメカニズムでトリガすることもできます。
twistedのreactorは、イベントとコールバック関数を積極的に登録する必要はなく、多状態(特定のクラスを継承し、関心のあるイベントインターフェースを実現し、twisted reactorに転送する)で実現します。twistedのreactorについて、いくつか注意すべき点があります。
twisted.internet.reactorは単一の例のモードで、各プログラムは一つのreactorしかありません。
できるだけ早くreactorコール関数で操作を完了してください。ブロックされたタスクを実行しないでください。reactorの本質はシングルスレッドです。ユーザーコールバックコードとtwistedコードは同じコンテキストで実行されます。
reactorは常に動作します。reactor.stopを通じて(通って)それを停止すると表示されない限り、reactor.stop()を呼び出します。つまり、アプリケーションは終了します。
二.twistedを簡単に使う
twistedの本質はreactorです。twistedの一番下のAPI(twistedを避けて便利な高層抽象)を使ってreactorを使うことができます。

#     twisted  API   
from twisted.internet import reacto
from twisted.internet import main
from twisted.internet.interfaces import IReadDescriptor
import socket

class MySocket(IReadDescriptor):
  def __init__(self, address):
    #      
    self.address = address
    self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    self.sock.connect(address)
    self.sock.setblocking(0)

    # tell the Twisted reactor to monitor this socket for reading
    reactor.addReader(self)
 
 #   :   reactor          
  def fileno(self):
    try:
      return self.sock.fileno()
    except socket.error:
      return -1
      
 #   :          
  def connectionLost(self, reason):
    self.sock.close()

    reactor.removeReader(self)
 
 #              :
    # reactor.stop()

 #   :              
  def doRead(self):
    bytes = ''

 #          
    while True:
      try:
        bytesread = self.sock.recv(1024)
        if not bytesread:
          break
        else:
          bytes += bytesread
      except socket.error, e:
        if e.args[0] == errno.EWOULDBLOCK:
          break
        return main.CONNECTION_LOST

    if not bytes: 
      return main.CONNECTION_DONE
    else:
      #             
      print bytes

一例としては、twistedのreactorの本質がよく見られます。傍受ディスクリプタを追加し、読み取り可能/書き込み可能なイベントを傍受し、イベントが到来したときには、コールバック関数が、コールが完了した後、イベントの傍受を継続します。
注意が必要です
ソケットは非閉塞で、閉塞するとreactorの意味がなくなります。
私たちはIreadDescriptorを継承してreactorに必要なインターフェースを提供します。
reactor.addReaderでソケット類をreactorの傍受対象に加える。
メイン.C.ONECTIONLOSTはtwistedによって予め定義された値です。これらの値によって次のステップのフィードバックをある程度制御できます。
しかし、上のMySocket類はあまり良くないです。主に以下の欠点があります。
私達は自分でデータを読みに行きます。フレームではなく、自分達のために読んでください。そして異常を処理します。
ネットワークIOとデータ処理が混ざって剥がれませんでした。
三.twisted抽象
twistedはreactorの基礎の上で、より高い抽象を創立して、1つのネットの接続にとって、twistedは次の3つの概念を創立しました。
Transports:ネットワーク接続層は、ネットワーク接続と読み書きのみを担当します。
Protocls:プロトコル層、サービス業務に関するネットワークプロトコルは、バイトストリームをアプリケーションに必要なデータに変換する。
Protocol Factores:プロトコル工場で、Protoclsの作成を担当しています。各ネットワーク接続にはProtoclsオブジェクトがあります。(プロトコル解析状態を保存するため)
twistedのこれらの概念は、erlangにおけるLanchネットワークのフレームワークと似ています。ranchフレームもTransportsとProtocolsの概念を抽象化しています。新しいネットワーク接続がある時、LanchはTransportsとProtocolsを自動的に作成します。ここでProtocolsはユーザがローンチを起動する時に導入します。protocol behaviourのモジュールは、Protocls初期化時に、この接続に対応するTransportsを受信します。そうすると、Protoclsでバイトストリームデータを処理して、私たちのプロトコルに従ってデータを解析して処理することができます。同時にTransportsでデータを送ることができます。
rachと同様に、twistedも新しい接続が到着した時にProttocolsを作成してTransportを伝えてくれます。twistedは私達にバイトフローのデータを読み取ります。私達はdata Receivedインターフェースでバイトフローのデータを処理すればいいです。この時のtwistedはネットIOでは本当の非同期と言えます。これはネットIOと出会うかもしれない異常を処理してくれました。そしてネットIOとデータ処理を分離して、TransportsとProtocolsと抽象的にして、プログラムの明晰性と壮健性を高めました。

#     twisted     
from twisted.internet import reactor
from twisted.internet.protocol import Protocol, ClientFactory
class MyProtocol(Protocol):
 
 #   : Protocols      ,   Transports
 #    twisted    Protocols factory      ProtocolsFactory     
 #          factory  MyProtocolFactory  
 def makeConnection(self,trans):
    print 'make connection: get transport: ', trans
    print 'my factory is: ', self.factory
    
 #   :      
  def dataReceived(self, data):
    self.poem += data
    msg = 'Task %d: got %d bytes of poetry from %s'
    print msg % (self.task_num, len(data), self.transport.getPeer())
 
 #   :     
  def connectionLost(self, reason):
    #        


class MyProtocolFactory(ClientFactory):

 #   :   protocol          Protocols
  protocol = PoetryProtocol # tell base class what proto to build

  def __init__(self, address):
    self.poetry_count = poetry_count
    self.poems = {} # task num -> poem
    
 #   :    Protocols   
  def buildProtocol(self, address):
    proto = ClientFactory.buildProtocol(self, address)
    #     proto      ....
    return proto
    
 #   :   Server      
  def clientConnectionFailed(self, connector, reason):
    print 'Failed to connect to:', connector.getDestination()
    
def main(address):
 factory = MyClientFactory(address)
  host, port = address
  #         ProtocolsFactory
  reactor.connectTCP(host, port, factory) 
  reactor.run()

例2は、ネットワークIOを処理する必要がなく、論理的により明確であり、実際にはClientFactoryとProtocolは、より柔軟で強力な論理制御を実現するためのより多くのインターフェースを提供しており、具体的なインターフェースはtwistedソースコードを参照してもよい。
四.twisted Deferred
twisted Deferredオブジェクトはこのような問題を解決するために使用されます。Protocols Factoryに自分のフィードバックを埋め込む必要があります。Protocolsの中であるイベントが発生した場合(すべてのProtocsが処理された場合)、私達が指定した関数(Task Finishedなど)をフィードバックします。もし私達自身で返事をするなら、いくつかの問題を処理する必要があります。
どのようにリターンとエラーを区別しますか?私たちは非同期呼び出しを使う時には、特にエラーリターンの重要性に注意します。
正しい戻りとエラーの戻りが共通関数(接続をオフにするなど)を実行する必要がありますか?
もしこのコールバックが一回だけ呼び出されたと保証したら?
この問題を解決するためにDeferredオブジェクトが使用されています。正しい戻りとエラーの戻りにそれぞれ対応して、対応するチェーンの関数を順番に呼び出し、コールバックの一意性を保証します。

d = Deferred()
#            
d.addCallbacks(your_ok_callback, your_err_callback)
#         
d.addBoth(your_common_callback)

#            your_ok_callback(Res) -> common_callback(Res)
d.callback(Res)
#            your_err_callback(Err) -> common_callback(Err)
d.errback(Err)

#   ,    Defered  ,      ,          

twistedのdeferは非同期の一種の現金化方式であり、彼とthreadの違いは、彼が時間eventに基づいていることが理解できる。
deferredがあれば、タスクの実行を管理することができます。プログラムの運行を防止して、ある任務の完成を待つため渋滞に陥って停滞して、全体の運行の効率を高めます。
Deferredは、非同期コードを作成するのを助けることができますが、自動的に非同期またはブロックなしのコードを生成するわけではありません。同期関数をプログラムするためには、Deferredを関数に戻して正確に登録しなければなりません。
五.総合例
以下の例では、あなたたちは自分で走って、私が上で言ったのは全部ばらばらの例です。みんなは下の完全なのを照らし合わせて、もう一回歩きます。twistedは理解するのが実は少し面倒で、みんなは彼が事件に基づくのだと知っているのでさえすれば後で、ゆっくりと理解します。

#coding:utf-8
#xiaorui.cc
from twisted.internet import reactor, defer
from twisted.internet.threads import deferToThread
import os,sys
from twisted.python import threadable; threadable.init(1)
deferred =deferToThread.__get__
import time
def todoprint_(result):
  print result
def running():
  "Prints a few dots on stdout while the reactor is running."
#   sys.stdout.write("."); sys.stdout.flush()
  print '.'
  reactor.callLater(.1, running)
@deferred
def sleep(sec):
  "A blocking function magically converted in a non-blocking one."
  print 'start sleep %s'%sec
  time.sleep(sec)
  print '
end sleep %s'%sec return "ok" def test(n,m): print "fun test() is start" m=m vals = [] keys = [] for i in xrange(m): vals.append(i) keys.append('a%s'%i) d = None for i in xrange(n): d = dict(zip(keys, vals)) print "fun test() is end" return d if __name__== "__main__": #one sleep(10).addBoth(todoprint_) reactor.callLater(.1, running) reactor.callLater(3, reactor.stop) print "go go !!!" reactor.run() #two aa=time.time() de = defer.Deferred() de.addCallback(test) reactor.callInThread(de.callback,10000000,100 ) print time.time()-aa print " " print de print "go go end"