Tornado非同期フレームワーク学習


Tornadoは非常に優れた非ブロックウェブサービスフレームワークです.機能によってmoduleを次のように分けることができます.
Core web framework:
    web
Application(Web Applicationはpath解析とリクエスト配信を担当し、重要な方法'_call_')
RequestHandler(リクエスト処理基本、http verb処理リクエストの再ロード)
httpserver(TCPServerとルーティング読み書き要求の初期化)
Asynchronous networking:
ioloop(callbackのスケジューリングおよびsocket fdのポーリング、対応する読み書きまたはエラーイベントの処理)
iostream(非同期の読み書き処理)
Netutil(バインドソケットの作成、ポートの傍受、リンク待ち)
Other utilities
Tornadoの非同期実装はioloop(socket fd読み書きイベントをポーリング)とiostream(非同期読み書き)の基礎の上に構築される.
異なるosで使用されるポーリングモデルが異なるため、linuxカーネルがサポートするepollモデルのみを選択して説明します.
    We use epoll (Linux) or kqueue (BSD and Mac OS X; requires python 2.6+) if they are available, 
    or else we fall back on select(). If you are implementing a system that needs to handle thousands of
    simultaneous connections, you should use a system that supports either epoll or queue.
最良の学習参考資料はコードであり、以下はフレームワークの正常な使用プロセスに基づいて、各モジュールと方法の情報図を提供する.
    
import tornado.web
import tornado.ioloop

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        return 'Hello World'

application = tornado.web.Application([(r'/', MainHandler),],)

application.listen(8080)
tornado.ioloop.IOLoop.instance().start()
Application                                             HTTPServer
    __init__    
    add_handler(pattern, handler)
    listen(port, address, **kwargs)
        server = HTTPServer(self, **kwargs)             __init__(request_callback, ...)
        |                                                   self.request_callback = request_callback
        v                                                   TCPServer.__init__()
        server.listen(port, address)                    handle_stream(stream, address, self.request_callback)
    __call__(self, request)                                 HTTPConnection(...) 
TCPServer                                                               HTTPConnection
    __init__()                                                              __init__
    listen(port, address)                                                       self.request_callback = request_callback
        sockets = bind_socket(port, address)                                    self._header_callback = stack_context.wrap(self._on_headers)
            #getaddrinfo,create socket, setsockopt                              self.stream.read_until(r'\r
\r
', self._header_callback) #setblocking(0), bind, listen add_sockets(sockets) _on_headers(self, data) self.io_loop = IOLoop.instance() # parse http protocol add_accept_handler(sock, self._handle_connection, eol = data.find(r'\r
') self.io_loop) start_line = data[:eol] def accept_handler(fd, events) method, uri, version = start_line.split(' ') while True: headers = httputil.HTTPHeaders.parse(data[eol:]) connection, address = sock.accept() # construct a request object self._request = HTTPRequest(connection=self, method, uri, version, headers, remote_address) self._handle_connection(connection, address) content-length = headers.get('Content-Length') #If ssl_options wrap socket if content-length: stream = IOStream(connection, self.io_loop) self.stream.read_bytes(content-length, self._on_request_body) #HTTPServer implement handle_stream return self.handle_stream(stream, address) self.request_callback(self._request) -> HTTPServer.handle_stream _on_request_body(self, data) self.io_loop.add_handler(sock.fileno(), accept_handler, # parse content body, and set thire info to self._request.arguments IOLoop.READ) self.request_callback(self._request)

TcpServerは非常に重要なメソッドaddを呼び出します.accept_handler、接続コールバック関数と対応するsocketを渡して、最終的に
ioloopオブジェクト登録socket fdと対応するcallback操作とイベントを呼び出します.同時にここでno-blocking socketと非同期の
読み書きはIOStreamで接続されています.
IOLoop                                                                      IOStream
    __init__(self, impl=None) #Singleton                                        __init__(conn, self.io_loop)  
                                                                                    #max_buffer_size = 100MB, read_chunk_size=4096 byte
        self._impl = impl or _poll()                                                self._read_buffer = collections.deque()
                                                                                    self._write_buffer = collections.deque()
        self._waker = Waker()                                                   read_until(delimiter, callback)
        self.add_handler(self._waker.fileno(),                                      self._read_delimiter = delimiter
                lambda fd, events: self._waker.consume(),                           self._read_callback = callback
                self.READ)                                                          while True:
                                                                                        if self._read_from_buffer():
    add_handler(fd, handler, events)                                                        return 
        self._handlers[fd] = stack_context.wrap(handler)                                self.check_closed()
        self._impl.register(fd, events|IOLoop.ERROR)                                    if self._read_to_buffer() == 0:
                                                                                            break
    add_callback(callback)                                                              self._add_io_state(self.io_loop.READ)
        self._callbacks.append(stack_context(callback))                                                                        
                                                                                _read_to_buffer()
    start()                                                                         chunk = self.socket.recv(read_chunk_size) 
        while True:                                                                  self._read_buffer.append(chunk)
            poll_timeout = 0.2                                                       self._read_buffer_size += len(chunk)
            with self._callbacks_lock:
                callbacks, self._callbacks = self._callbacks, []                     # Check read buffer size
            for callback in callbacks:                                               return len(chunk)  
                self._run_callback(callback)                                    _read_from_buffer() 
                                                                                    _merge_prefix(self._read_buffer, sys.maxint)
            #deal with timeout handler                                              loc = self._read_buffer[0].find(self._read_delimiter)
            event_pairs = self._imol.poll(poll_timeout)                             if loc != -1:
            self._events.update(event_pairs)                                            callback = self._read_callback
            while self._events:                                                         # reset read control variable
                fd, event = self._events.popitem()                                      self._run_callback(callback, self._consume(loc + delimiter_len))
                self._handlers[fd](fd, event)                                           return True
                                                                                _run_callback(self, callback, *args)
                                                                                    def wrapper():
                                                                                        callback(*args)
                                                                                    with stack_context.NullContext():
                                                                                        self.io_loop.add_callback(wrapper)

IOLoopのstartメソッドが呼び出されると、IOLoopオブジェクトは、遅延したデータ処理callbackおよびtimeout callbackのループ処理を開始する.最終ポーリングsocket fd
fdに対応するcallbackメソッドを処理します.startメソッド処理self.callback、いったい何なのか、いつ追加されたのか.後でゆっくりベールを外します.
仮にself.imol.Poll(poll_timeout)では、クライアント接続のあるfdにreadイベントがあることが検出され、対応するcallbackメソッド(TCPServerオブジェクトのaccept_handlerオブジェクト)が呼び出されます.Acceptは接続され、IOStreamオブジェクトの作成に続いています(ssl_optionsオプションに従って暗号化socketかどうかを決定します).そしてhandle_を呼び出すstream(handle_streamはHTTPServerで定義されているため、実際にはここでの実装が呼び出される).HTTPConnectionオブジェクトを作成します.HTTPConnectionでの初期化関数_init__()で、IOStream read_を呼び出します.untilメソッドはデータの読み取りを開始します.まず、読み取ったデータが
条件を満たす、満たさなければsocketキャッシュからのデータの読み出しを継続する.条件が満たされている場合は、_を呼び出します.run_callbackは、対応する処理関数(self._header_callback)appendをIOLoopオブジェクトに、次のポーリングに遅延する処理を行う.政府もこのような操作の原因を示しています.
        # * Prevents unbounded stack growth when a callback calls an IOLoop operation that immediately runs another callback
        # * Provides a predictable execution context for e.g.  non-reentrant mutexes
        # * Ensures that the try/except in wrapper() is run outside of the application's StackContexts
このcallbackに次回ポーリング処理が行われると、実行前に転送_header_callback関数オブジェクト、HTTPRequestオブジェクトを作成し、
要求情報にbodyが含まれている場合はbodyデータブロックが処理する、プロセスはheader処理と同様である.ヘッダとbodyの解析が完了すると、
HTTPConnectionオブジェクト呼び出しrequest_callbackメソッドは、HTTPRequestオブジェクトを渡す.ここのrequest_callbackオブジェクトは私たちです
以前に作成されたアプリケーションオブジェクトです.これも、アプリケーションを定義する理由です.call__メソッド._call__方法解析からpathとmethodを得る
対応するRequestHandlerオブジェクトにルーティングし、対応するメソッドオブジェクトを動的に取得し、対応するメソッドを呼び出す. 
Responseプロシージャはreadと大きく異なりませんが、実際にはHTTPRequestオブジェクトにはHTTPConnectionオブジェクトが含まれており、最終的な書き込み操作はIOStreamの
writeメソッド
この文章を通じて、基本的にTornado webサービスの仕事に対して初歩的な理解があるべきだ.もっと深く入りたいなら、
ソースコードを直接见て、结局Tornadoのソースコードはとても友好的で、多くの実现はすべてとても高い参考があります.