SocketServerソース分析

20821 ワード

SocketServer.py
Creating network servers.
contents
  • SocketServer.py
  • contents
  • file head
  • BaseServer
  • BaseServer.serve_forever
  • BaseServer.shutdown
  • BaseServer.handle_request
  • BaseServer._handle_request_noblock
  • BaseServer Overridden functions

  • TCPServer
  • UDPServer
  • ForkingMixIn
  • ThreadingMixIn
  • BaseRequestHandler
  • StreamRequestHandler
  • DatagramRequestHandler
  • 著作権

  • file head
    
    __version__ = "0.4"
    
    
    import socket
    import select
    import sys
    import os
    import errno
    try:
        import threading
    except ImportError:
        import dummy_threading as threading
    
    __all__ = ["TCPServer","UDPServer","ForkingUDPServer","ForkingTCPServer",
               "ThreadingUDPServer","ThreadingTCPServer","BaseRequestHandler",
               "StreamRequestHandler","DatagramRequestHandler",
               "ThreadingMixIn", "ForkingMixIn"]
    if hasattr(socket, "AF_UNIX"):
        __all__.extend(["UnixStreamServer","UnixDatagramServer",
                        "ThreadingUnixStreamServer",
                        "ThreadingUnixDatagramServer"])
    
    #    EINTR      
    def _eintr_retry(func, *args):
        """restart a system call interrupted by EINTR"""
        while True:
            try:
                return func(*args)
            except (OSError, select.error) as e:
                if e.args[0] != errno.EINTR:
                    raise

    BaseServer
    RequestHandlerClassはhandle関数を登録します.finish_requestでインスタンス化し、ユーザー定義handle関数を呼び出す
    class BaseServer:
        timeout = None
    
        def __init__(self, server_address, RequestHandlerClass):
            """Constructor.  May be extended, do not override."""
            self.server_address = server_address
            self.RequestHandlerClass = RequestHandlerClass
            self.__is_shut_down = threading.Event()
            self.__shutdown_request = False
    
        def server_activate(self):
            """Called by constructor to activate the server.
    
            May be overridden.
    
            """
            pass

    BaseServer.serve_forever
    サービスサイクル
  • 傍受ポート
  • 処理要求
  •     def serve_forever(self, poll_interval=0.5):
            """Handle one request at a time until shutdown.
    
            Polls for shutdown every poll_interval seconds. Ignores
            self.timeout. If you need to do periodic tasks, do them in
            another thread.
            """
            self.__is_shut_down.clear()
            try:
                while not self.__shutdown_request:
                    #    select     ,   EINTR   
                    r, w, e = _eintr_retry(select.select, [self], [], [],
                                           poll_interval)
                    #      
                    if self in r:
                        self._handle_request_noblock()
            finally:
                self.__shutdown_request = False
                self.__is_shut_down.set()

    BaseServer.shutdown
    サービスを停止foreverサイクル__is_shut_down外部に通知する、ループが終了したことに注意するthreading.Event()の使い方は、一度だけ設定し、Eventを使った頻繁な設定/クリアは避けます.serve_とforeverは異なるスレッドで呼び出されます.shutdownを呼び出すとwait信号量が必要になるので、プログラムはblockし、block後serve_foreverはserveを実行できませんforever設定信号量を終了するには、リクエストを受信します.
    注意self.__shutdown_requestの読み書き操作は、原子操作に属し、マルチスレッドで使用するのは安全です.
        def shutdown(self):
            """Stops the serve_forever loop.
    
            Blocks until the loop has finished. This must be called while
            serve_forever() is running in another thread, or it will
            deadlock.
            """
            self.__shutdown_request = True
            self.__is_shut_down.wait()

    BaseServer.handle_request
    とserve_forever並列の関数server_を呼び出さなければforever、外でhandleをループ呼び出します.request
        # The distinction between handling, getting, processing and
        # finishing a request is fairly arbitrary.  Remember:
        #
        # - handle_request() is the top-level call.  It calls
        #   select, get_request(), verify_request() and process_request()
        # - get_request() is different for stream or datagram sockets
        # - process_request() is the place that may fork a new process
        #   or create a new thread to finish the request
        # - finish_request() instantiates the request handler class;
        #   this constructor will handle the request all by itself
    
        def handle_request(self):
            """Handle one request, possibly blocking.
    
            Respects self.timeout.
            """
            # Support people who used socket.settimeout() to escape
            # handle_request before self.timeout was available.
    
            #        socket.settimeout()        ,       
            timeout = self.socket.gettimeout()
            if timeout is None:
                timeout = self.timeout
            elif self.timeout is not None:
                timeout = min(timeout, self.timeout)
            # select,    ,       
            fd_sets = _eintr_retry(select.select, [self], [], [], timeout)
            if not fd_sets[0]:
                self.handle_timeout()
                return
            #     
            self._handle_request_noblock()

    BaseServer._handle_request_noblock
    真のリクエスト処理関数
  • get_request:受信要求accept
  • verify_request:ipフィルタ
  • などの検証作業を行います.
  • process_request:要求を処理し、サブクラスがメソッドを書き換えた後、SocketServerを呼び出す必要がある.BaseServer.process_request,
  • BaseServer.process_requestにはBaseRequestHandlerのコールバック動作があり、ユーザ定義handlerをインスタンス化し、__init__handle()への呼び出し
  • を完了する.
  • shutdown_reques:接続を閉じる
  • 
        def _handle_request_noblock(self):
            """Handle one request, without blocking.
    
            I assume that select.select has returned that the socket is
            readable before this function was called, so there should be
            no risk of blocking in get_request().
            """
            try:
                #     
                # get_request      ,       ,   socket
                request, client_address = self.get_request()
            except socket.error:
                return
            if self.verify_request(request, client_address):
                try:
                    self.process_request(request, client_address)
                except:
                    self.handle_error(request, client_address)
                    self.shutdown_request(request)
            else:
                self.shutdown_request(request)

    BaseServer Overridden functions
        def handle_timeout(self):
            """Called if no new request arrives within self.timeout.
    
            Overridden by ForkingMixIn.
            """
            pass
    
        def verify_request(self, request, client_address):
            """Verify the request.  May be overridden.
    
            Return True if we should proceed with this request.
    
            """
            return True
    
        def process_request(self, request, client_address):
            """Call finish_request.
    
            Overridden by ForkingMixIn and ThreadingMixIn.
    
            """
            self.finish_request(request, client_address)
            self.shutdown_request(request)
    
        def server_close(self):
            """Called to clean-up the server.
    
            May be overridden.
    
            """
            pass
    
        def finish_request(self, request, client_address):
            """Finish one request by instantiating RequestHandlerClass."""
            self.RequestHandlerClass(request, client_address, self)
    
        def shutdown_request(self, request):
            """Called to shutdown and close an individual request."""
            self.close_request(request)
    
        def close_request(self, request):
            """Called to clean up an individual request."""
            pass
    
        def handle_error(self, request, client_address):
            """Handle an error gracefully.  May be overridden.
    
            The default is to print a traceback and continue.
    
            """
            print '-'*40
            print 'Exception happened during processing of request from',
            print client_address
            import traceback
            traceback.print_exc() # XXX But this goes to stderr!
            print '-'*40

    TCPServer
    shutdown_requestはまずsocketを呼び出す.shutdown後にsocketを呼び出す.close
  • close()releases the resource associated with a connection but does not necessarily close the connection immediately. If you want to close the connection in a timely fashion, callshutdown() beforeclose().
  • Shut down one or both halves of the connection. If how is SHUT_RD, further receives are disallowed. If how is SHUT_WR, further sends are disallowed. Ifhow is SHUT_RDWR, further sends and receives are disallowed. Depending on the platform, shutting down one half of the connection can also close the opposite half (e.g. on Mac OS X, shutdown(SHUT_WR) does not allow further reads on the other end of the connection).
  • class TCPServer(BaseServer):
    
        address_family = socket.AF_INET
    
        socket_type = socket.SOCK_STREAM
    
        request_queue_size = 5
    
        allow_reuse_address = False
    
        def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
            """Constructor.  May be extended, do not override."""
            BaseServer.__init__(self, server_address, RequestHandlerClass)
            self.socket = socket.socket(self.address_family,
                                        self.socket_type)
            if bind_and_activate:
                try:
                    self.server_bind()
                    self.server_activate()
                except:
                    self.server_close()
                    raise
    
        def server_bind(self):
            """Called by constructor to bind the socket.
    
            May be overridden.
    
            """
            if self.allow_reuse_address:
                self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            self.socket.bind(self.server_address)
            self.server_address = self.socket.getsockname()
    
        def server_activate(self):
            """Called by constructor to activate the server.
    
            May be overridden.
    
            """
            self.socket.listen(self.request_queue_size)
    
        def server_close(self):
            """Called to clean-up the server.
    
            May be overridden.
    
            """
            self.socket.close()
    
        def fileno(self):
            """Return socket file number.
    
            Interface required by select().
    
            """
            return self.socket.fileno()
    
        def get_request(self):
            """Get the request and client address from the socket.
    
            May be overridden.
    
            """
            return self.socket.accept()
    
        #    shutdown     close,         
        def shutdown_request(self, request):
            """Called to shutdown and close an individual request."""
            try:
                #explicitly shutdown.  socket.close() merely releases
                #the socket and waits for GC to perform the actual close.
                request.shutdown(socket.SHUT_WR)
            except socket.error:
                pass #some platforms may raise ENOTCONN here
            self.close_request(request)
    
        def close_request(self, request):
            """Called to clean up an individual request."""
            request.close()

    UDPServer
    UDPServer get_requestは(data,socket)tupleを返し、TCPServerはsocket handleでmsg, sock = self.request msgを区別して取得し、追加recvを必要としない.
    データの転送には、socketのsendto()とrecvfrom()メソッドを使用する必要があります.従来のsend()とrecv()も同様の効果を達成できるが,前の2つの方法はUDP接続にとってより一般的である.
    from python3-cookbook
    from SocketServer import BaseRequestHandler, UDPServer
    import time
    
    class TimeHandler(BaseRequestHandler):
        def handle(self):
            print('Got connection from', self.client_address)
            # Get message and client socket
            msg, sock = self.request
            resp = time.ctime()
            sock.sendto(resp.encode('ascii'), self.client_address)
    
    if __name__ == '__main__':
        serv = UDPServer(('', 20000), TimeHandler)
        serv.serve_forever()
    
    #-----------------------------
    >>> from socket import socket, AF_INET, SOCK_DGRAM
    >>> s = socket(AF_INET, SOCK_DGRAM)
    >>> s.sendto(b'', ('localhost', 20000))
    0
    >>> s.recvfrom(8192)
    ('Thu Dec 20 10:01:01 2018', ('127.0.0.1', 20000))
    
    class UDPServer(TCPServer):
    
        """UDP server class."""
    
        allow_reuse_address = False
    
        socket_type = socket.SOCK_DGRAM
    
        max_packet_size = 8192
    
        def get_request(self):
            data, client_addr = self.socket.recvfrom(self.max_packet_size)
            return (data, self.socket), client_addr
    
        def server_activate(self):
            # No need to call listen() for UDP.
            pass
    
        def shutdown_request(self, request):
            # No need to shutdown anything.
            self.close_request(request)
    
        def close_request(self, request):
            # No need to close anything.
            pass

    ForkingMixIn
    典型的なforkは、forkマルチプロセスの典型的な使用を見ることができます.
  • は最大プロセス数を限定し、システム資源が
  • を消費しないことを保証する.
  • 親プロセスwait defunctプロセス
  • forkの後、親プロセスは
  • を返します.
  • サブプロセス処理要求後_exit()
  • class ForkingMixIn:
    
        """Mix-in class to handle each request in a new process."""
    
        timeout = 300
        active_children = None
        max_children = 40
    
        def collect_children(self):
            """Internal routine to wait for children that have exited."""
            if self.active_children is None:
                return
    
            while len(self.active_children) >= self.max_children:
                try:
                    pid, _ = os.waitpid(-1, 0)
                    self.active_children.discard(pid)
                except OSError as e:
                    if e.errno == errno.ECHILD:
                        # we don't have any children, we're done
                        self.active_children.clear()
                    elif e.errno != errno.EINTR:
                        break
    
            # Now reap all defunct children.
            for pid in self.active_children.copy():
                try:
                    pid, _ = os.waitpid(pid, os.WNOHANG)
                    # if the child hasn't exited yet, pid will be 0 and ignored by
                    # discard() below
                    self.active_children.discard(pid)
                except OSError as e:
                    if e.errno == errno.ECHILD:
                        # someone else reaped it
                        self.active_children.discard(pid)
    
        def handle_timeout(self):
            """Wait for zombies after self.timeout seconds of inactivity.
    
            May be extended, do not override.
            """
            self.collect_children()
    
        def process_request(self, request, client_address):
            """Fork a new subprocess to process the request."""
            self.collect_children()
            pid = os.fork()
            if pid:
                # Parent process
                if self.active_children is None:
                    self.active_children = set()
                self.active_children.add(pid)
                self.close_request(request) #close handle in parent process
                return
            else:
                # Child process.
                # This must never return, hence os._exit()!
                try:
                    self.finish_request(request, client_address)
                    self.shutdown_request(request)
                    os._exit(0)
                except:
                    try:
                        self.handle_error(request, client_address)
                        self.shutdown_request(request)
                    finally:
                        os._exit(1)

    ThreadingMixIn
    ThreadingMixInはプロセスを再ロードしました.request関数
  • スレッド
  • を作成
  • スレッドにおける処理要求
  • 起動スレッド
  • 
    class ThreadingMixIn:
        """Mix-in class to handle each request in a new thread."""
    
        # Decides how threads will act upon termination of the
        # main process
        daemon_threads = False
    
        def process_request_thread(self, request, client_address):
            """Same as in BaseServer but as a thread.
    
            In addition, exception handling is done here.
    
            """
            try:
                self.finish_request(request, client_address)
                self.shutdown_request(request)
            except:
                self.handle_error(request, client_address)
                self.shutdown_request(request)
    
        def process_request(self, request, client_address):
            """Start a new thread to process the request."""
            t = threading.Thread(target = self.process_request_thread,
                                 args = (request, client_address))
            t.daemon = self.daemon_threads
            t.start()
    
    class ForkingUDPServer(ForkingMixIn, UDPServer): pass
    class ForkingTCPServer(ForkingMixIn, TCPServer): pass
    
    class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
    class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
    
    if hasattr(socket, 'AF_UNIX'):
    
        class UnixStreamServer(TCPServer):
            address_family = socket.AF_UNIX
    
        class UnixDatagramServer(UDPServer):
            address_family = socket.AF_UNIX
    
        class ThreadingUnixStreamServer(ThreadingMixIn, UnixStreamServer): pass
    
        class ThreadingUnixDatagramServer(ThreadingMixIn, UnixDatagramServer): pass

    BaseRequestHandler
    ベースリクエストクラス、外部に3つのインタフェースを提供
  • setup()
  • handle()
  • finish()

  • 使用時にこのクラスを継承し、BaseServerを介してBaseServerを登録する.finish_requestでBaseRequestHandlerクラスをインスタンス化し、_init__関数呼び出しでクラスの再ロードを継承するhandle()インタフェースの呼び出しを完了
    class BaseRequestHandler:
    
        def __init__(self, request, client_address, server):
            self.request = request
            self.client_address = client_address
            self.server = server
            self.setup()
            try:
                self.handle()
            finally:
                self.finish()
    
        def setup(self):
            pass
    
        def handle(self):
            pass
    
        def finish(self):
            pass

    StreamRequestHandler
    ファイル操作インタフェースの提供
    class StreamRequestHandler(BaseRequestHandler):
    
        """Define self.rfile and self.wfile for stream sockets."""
    
        # Default buffer sizes for rfile, wfile.
        # We default rfile to buffered because otherwise it could be
        # really slow for large data (a getc() call per byte); we make
        # wfile unbuffered because (a) often after a write() we want to
        # read and we need to flush the line; (b) big writes to unbuffered
        # files are typically optimized by stdio even when big reads
        # aren't.
        rbufsize = -1
        wbufsize = 0
    
        # A timeout to apply to the request socket, if not None.
        timeout = None
    
        # Disable nagle algorithm for this socket, if True.
        # Use only when wbufsize != 0, to avoid small packets.
        disable_nagle_algorithm = False
    
        def setup(self):
            self.connection = self.request
            if self.timeout is not None:
                self.connection.settimeout(self.timeout)
            if self.disable_nagle_algorithm:
                self.connection.setsockopt(socket.IPPROTO_TCP,
                                           socket.TCP_NODELAY, True)
            self.rfile = self.connection.makefile('rb', self.rbufsize)
            self.wfile = self.connection.makefile('wb', self.wbufsize)
    
        def finish(self):
            if not self.wfile.closed:
                try:
                    self.wfile.flush()
                except socket.error:
                    # A final socket error may have occurred here, such as
                    # the local error ECONNABORTED.
                    pass
            self.wfile.close()
            self.rfile.close()
    

    DatagramRequestHandler
    class DatagramRequestHandler(BaseRequestHandler):
    
        """Define self.rfile and self.wfile for datagram sockets."""
    
        def setup(self):
            try:
                from cStringIO import StringIO
            except ImportError:
                from StringIO import StringIO
            self.packet, self.socket = self.request
            self.rfile = StringIO(self.packet)
            self.wfile = StringIO()
    
        def finish(self):
            self.socket.sendto(self.wfile.getvalue(), self.client_address)
    

    著作権
    著者:bigfishライセンス契約:ライセンス契約知識共有署名-非商業的使用4.0国際ライセンス契約