SocketServerソース分析
20821 ワード
SocketServer.py
Creating network servers.
contents SocketServer.py contents file head BaseServer BaseServer.serve_forever BaseServer.shutdown BaseServer.handle_request BaseServer._handle_request_noblock BaseServer Overridden functions
TCPServer UDPServer ForkingMixIn ThreadingMixIn BaseRequestHandler StreamRequestHandler DatagramRequestHandler 著作権
file head
BaseServer
RequestHandlerClassはhandle関数を登録します.finish_requestでインスタンス化し、ユーザー定義handle関数を呼び出す
BaseServer.serve_forever
サービスサイクル傍受ポート 処理要求
BaseServer.shutdown
サービスを停止foreverサイクル
注意
BaseServer.handle_request
とserve_forever並列の関数server_を呼び出さなければforever、外でhandleをループ呼び出します.request
BaseServer._handle_request_noblock
真のリクエスト処理関数 get_request:受信要求accept verify_request:ipフィルタ などの検証作業を行います. process_request:要求を処理し、サブクラスがメソッドを書き換えた後、SocketServerを呼び出す必要がある.BaseServer.process_request, BaseServer.process_requestにはBaseRequestHandlerのコールバック動作があり、ユーザ定義handlerをインスタンス化し、 を完了する. shutdown_reques:接続を閉じる
BaseServer Overridden functions
TCPServer
shutdown_requestはまずsocketを呼び出す.shutdown後にsocketを呼び出す.close close()releases the resource associated with a connection but does not necessarily close the connection immediately. If you want to close the connection in a timely fashion, callshutdown() beforeclose(). Shut down one or both halves of the connection. If how is SHUT_RD, further receives are disallowed. If how is SHUT_WR, further sends are disallowed. Ifhow is SHUT_RDWR, further sends and receives are disallowed. Depending on the platform, shutting down one half of the connection can also close the opposite half (e.g. on Mac OS X, shutdown(SHUT_WR) does not allow further reads on the other end of the connection).
UDPServer
UDPServer get_requestは(data,socket)tupleを返し、TCPServerはsocket handleで
データの転送には、socketのsendto()とrecvfrom()メソッドを使用する必要があります.従来のsend()とrecv()も同様の効果を達成できるが,前の2つの方法はUDP接続にとってより一般的である.
from python3-cookbook
ForkingMixIn
典型的なforkは、forkマルチプロセスの典型的な使用を見ることができます.は最大プロセス数を限定し、システム資源が を消費しないことを保証する.親プロセスwait defunctプロセス forkの後、親プロセスは を返します.サブプロセス処理要求後
ThreadingMixIn
ThreadingMixInはプロセスを再ロードしました.request関数スレッド を作成スレッドにおける処理要求 起動スレッド
BaseRequestHandler
ベースリクエストクラス、外部に3つのインタフェースを提供 setup() handle() finish()
使用時にこのクラスを継承し、BaseServerを介してBaseServerを登録する.finish_requestでBaseRequestHandlerクラスをインスタンス化し、_init__関数呼び出しでクラスの再ロードを継承するhandle()インタフェースの呼び出しを完了
StreamRequestHandler
ファイル操作インタフェースの提供
DatagramRequestHandler
著作権
著者:bigfishライセンス契約:ライセンス契約知識共有署名-非商業的使用4.0国際ライセンス契約
Creating network servers.
contents
file head
__version__ = "0.4"
import socket
import select
import sys
import os
import errno
try:
import threading
except ImportError:
import dummy_threading as threading
__all__ = ["TCPServer","UDPServer","ForkingUDPServer","ForkingTCPServer",
"ThreadingUDPServer","ThreadingTCPServer","BaseRequestHandler",
"StreamRequestHandler","DatagramRequestHandler",
"ThreadingMixIn", "ForkingMixIn"]
if hasattr(socket, "AF_UNIX"):
__all__.extend(["UnixStreamServer","UnixDatagramServer",
"ThreadingUnixStreamServer",
"ThreadingUnixDatagramServer"])
# EINTR
def _eintr_retry(func, *args):
"""restart a system call interrupted by EINTR"""
while True:
try:
return func(*args)
except (OSError, select.error) as e:
if e.args[0] != errno.EINTR:
raise
BaseServer
RequestHandlerClassはhandle関数を登録します.finish_requestでインスタンス化し、ユーザー定義handle関数を呼び出す
class BaseServer:
timeout = None
def __init__(self, server_address, RequestHandlerClass):
"""Constructor. May be extended, do not override."""
self.server_address = server_address
self.RequestHandlerClass = RequestHandlerClass
self.__is_shut_down = threading.Event()
self.__shutdown_request = False
def server_activate(self):
"""Called by constructor to activate the server.
May be overridden.
"""
pass
BaseServer.serve_forever
サービスサイクル
def serve_forever(self, poll_interval=0.5):
"""Handle one request at a time until shutdown.
Polls for shutdown every poll_interval seconds. Ignores
self.timeout. If you need to do periodic tasks, do them in
another thread.
"""
self.__is_shut_down.clear()
try:
while not self.__shutdown_request:
# select , EINTR
r, w, e = _eintr_retry(select.select, [self], [], [],
poll_interval)
#
if self in r:
self._handle_request_noblock()
finally:
self.__shutdown_request = False
self.__is_shut_down.set()
BaseServer.shutdown
サービスを停止foreverサイクル
__is_shut_down
外部に通知する、ループが終了したことに注意するthreading.Event()の使い方は、一度だけ設定し、Eventを使った頻繁な設定/クリアは避けます.serve_とforeverは異なるスレッドで呼び出されます.shutdownを呼び出すとwait信号量が必要になるので、プログラムはblockし、block後serve_foreverはserveを実行できませんforever設定信号量を終了するには、リクエストを受信します.注意
self.__shutdown_request
の読み書き操作は、原子操作に属し、マルチスレッドで使用するのは安全です. def shutdown(self):
"""Stops the serve_forever loop.
Blocks until the loop has finished. This must be called while
serve_forever() is running in another thread, or it will
deadlock.
"""
self.__shutdown_request = True
self.__is_shut_down.wait()
BaseServer.handle_request
とserve_forever並列の関数server_を呼び出さなければforever、外でhandleをループ呼び出します.request
# The distinction between handling, getting, processing and
# finishing a request is fairly arbitrary. Remember:
#
# - handle_request() is the top-level call. It calls
# select, get_request(), verify_request() and process_request()
# - get_request() is different for stream or datagram sockets
# - process_request() is the place that may fork a new process
# or create a new thread to finish the request
# - finish_request() instantiates the request handler class;
# this constructor will handle the request all by itself
def handle_request(self):
"""Handle one request, possibly blocking.
Respects self.timeout.
"""
# Support people who used socket.settimeout() to escape
# handle_request before self.timeout was available.
# socket.settimeout() ,
timeout = self.socket.gettimeout()
if timeout is None:
timeout = self.timeout
elif self.timeout is not None:
timeout = min(timeout, self.timeout)
# select, ,
fd_sets = _eintr_retry(select.select, [self], [], [], timeout)
if not fd_sets[0]:
self.handle_timeout()
return
#
self._handle_request_noblock()
BaseServer._handle_request_noblock
真のリクエスト処理関数
__init__
でhandle()
への呼び出し
def _handle_request_noblock(self):
"""Handle one request, without blocking.
I assume that select.select has returned that the socket is
readable before this function was called, so there should be
no risk of blocking in get_request().
"""
try:
#
# get_request , , socket
request, client_address = self.get_request()
except socket.error:
return
if self.verify_request(request, client_address):
try:
self.process_request(request, client_address)
except:
self.handle_error(request, client_address)
self.shutdown_request(request)
else:
self.shutdown_request(request)
BaseServer Overridden functions
def handle_timeout(self):
"""Called if no new request arrives within self.timeout.
Overridden by ForkingMixIn.
"""
pass
def verify_request(self, request, client_address):
"""Verify the request. May be overridden.
Return True if we should proceed with this request.
"""
return True
def process_request(self, request, client_address):
"""Call finish_request.
Overridden by ForkingMixIn and ThreadingMixIn.
"""
self.finish_request(request, client_address)
self.shutdown_request(request)
def server_close(self):
"""Called to clean-up the server.
May be overridden.
"""
pass
def finish_request(self, request, client_address):
"""Finish one request by instantiating RequestHandlerClass."""
self.RequestHandlerClass(request, client_address, self)
def shutdown_request(self, request):
"""Called to shutdown and close an individual request."""
self.close_request(request)
def close_request(self, request):
"""Called to clean up an individual request."""
pass
def handle_error(self, request, client_address):
"""Handle an error gracefully. May be overridden.
The default is to print a traceback and continue.
"""
print '-'*40
print 'Exception happened during processing of request from',
print client_address
import traceback
traceback.print_exc() # XXX But this goes to stderr!
print '-'*40
TCPServer
shutdown_requestはまずsocketを呼び出す.shutdown後にsocketを呼び出す.close
class TCPServer(BaseServer):
address_family = socket.AF_INET
socket_type = socket.SOCK_STREAM
request_queue_size = 5
allow_reuse_address = False
def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
"""Constructor. May be extended, do not override."""
BaseServer.__init__(self, server_address, RequestHandlerClass)
self.socket = socket.socket(self.address_family,
self.socket_type)
if bind_and_activate:
try:
self.server_bind()
self.server_activate()
except:
self.server_close()
raise
def server_bind(self):
"""Called by constructor to bind the socket.
May be overridden.
"""
if self.allow_reuse_address:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(self.server_address)
self.server_address = self.socket.getsockname()
def server_activate(self):
"""Called by constructor to activate the server.
May be overridden.
"""
self.socket.listen(self.request_queue_size)
def server_close(self):
"""Called to clean-up the server.
May be overridden.
"""
self.socket.close()
def fileno(self):
"""Return socket file number.
Interface required by select().
"""
return self.socket.fileno()
def get_request(self):
"""Get the request and client address from the socket.
May be overridden.
"""
return self.socket.accept()
# shutdown close,
def shutdown_request(self, request):
"""Called to shutdown and close an individual request."""
try:
#explicitly shutdown. socket.close() merely releases
#the socket and waits for GC to perform the actual close.
request.shutdown(socket.SHUT_WR)
except socket.error:
pass #some platforms may raise ENOTCONN here
self.close_request(request)
def close_request(self, request):
"""Called to clean up an individual request."""
request.close()
UDPServer
UDPServer get_requestは(data,socket)tupleを返し、TCPServerはsocket handleで
msg, sock = self.request
msgを区別して取得し、追加recvを必要としない.データの転送には、socketのsendto()とrecvfrom()メソッドを使用する必要があります.従来のsend()とrecv()も同様の効果を達成できるが,前の2つの方法はUDP接続にとってより一般的である.
from python3-cookbook
from SocketServer import BaseRequestHandler, UDPServer
import time
class TimeHandler(BaseRequestHandler):
def handle(self):
print('Got connection from', self.client_address)
# Get message and client socket
msg, sock = self.request
resp = time.ctime()
sock.sendto(resp.encode('ascii'), self.client_address)
if __name__ == '__main__':
serv = UDPServer(('', 20000), TimeHandler)
serv.serve_forever()
#-----------------------------
>>> from socket import socket, AF_INET, SOCK_DGRAM
>>> s = socket(AF_INET, SOCK_DGRAM)
>>> s.sendto(b'', ('localhost', 20000))
0
>>> s.recvfrom(8192)
('Thu Dec 20 10:01:01 2018', ('127.0.0.1', 20000))
class UDPServer(TCPServer):
"""UDP server class."""
allow_reuse_address = False
socket_type = socket.SOCK_DGRAM
max_packet_size = 8192
def get_request(self):
data, client_addr = self.socket.recvfrom(self.max_packet_size)
return (data, self.socket), client_addr
def server_activate(self):
# No need to call listen() for UDP.
pass
def shutdown_request(self, request):
# No need to shutdown anything.
self.close_request(request)
def close_request(self, request):
# No need to close anything.
pass
ForkingMixIn
典型的なforkは、forkマルチプロセスの典型的な使用を見ることができます.
_exit()
class ForkingMixIn:
"""Mix-in class to handle each request in a new process."""
timeout = 300
active_children = None
max_children = 40
def collect_children(self):
"""Internal routine to wait for children that have exited."""
if self.active_children is None:
return
while len(self.active_children) >= self.max_children:
try:
pid, _ = os.waitpid(-1, 0)
self.active_children.discard(pid)
except OSError as e:
if e.errno == errno.ECHILD:
# we don't have any children, we're done
self.active_children.clear()
elif e.errno != errno.EINTR:
break
# Now reap all defunct children.
for pid in self.active_children.copy():
try:
pid, _ = os.waitpid(pid, os.WNOHANG)
# if the child hasn't exited yet, pid will be 0 and ignored by
# discard() below
self.active_children.discard(pid)
except OSError as e:
if e.errno == errno.ECHILD:
# someone else reaped it
self.active_children.discard(pid)
def handle_timeout(self):
"""Wait for zombies after self.timeout seconds of inactivity.
May be extended, do not override.
"""
self.collect_children()
def process_request(self, request, client_address):
"""Fork a new subprocess to process the request."""
self.collect_children()
pid = os.fork()
if pid:
# Parent process
if self.active_children is None:
self.active_children = set()
self.active_children.add(pid)
self.close_request(request) #close handle in parent process
return
else:
# Child process.
# This must never return, hence os._exit()!
try:
self.finish_request(request, client_address)
self.shutdown_request(request)
os._exit(0)
except:
try:
self.handle_error(request, client_address)
self.shutdown_request(request)
finally:
os._exit(1)
ThreadingMixIn
ThreadingMixInはプロセスを再ロードしました.request関数
class ThreadingMixIn:
"""Mix-in class to handle each request in a new thread."""
# Decides how threads will act upon termination of the
# main process
daemon_threads = False
def process_request_thread(self, request, client_address):
"""Same as in BaseServer but as a thread.
In addition, exception handling is done here.
"""
try:
self.finish_request(request, client_address)
self.shutdown_request(request)
except:
self.handle_error(request, client_address)
self.shutdown_request(request)
def process_request(self, request, client_address):
"""Start a new thread to process the request."""
t = threading.Thread(target = self.process_request_thread,
args = (request, client_address))
t.daemon = self.daemon_threads
t.start()
class ForkingUDPServer(ForkingMixIn, UDPServer): pass
class ForkingTCPServer(ForkingMixIn, TCPServer): pass
class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
if hasattr(socket, 'AF_UNIX'):
class UnixStreamServer(TCPServer):
address_family = socket.AF_UNIX
class UnixDatagramServer(UDPServer):
address_family = socket.AF_UNIX
class ThreadingUnixStreamServer(ThreadingMixIn, UnixStreamServer): pass
class ThreadingUnixDatagramServer(ThreadingMixIn, UnixDatagramServer): pass
BaseRequestHandler
ベースリクエストクラス、外部に3つのインタフェースを提供
使用時にこのクラスを継承し、BaseServerを介してBaseServerを登録する.finish_requestでBaseRequestHandlerクラスをインスタンス化し、_init__関数呼び出しでクラスの再ロードを継承するhandle()インタフェースの呼び出しを完了
class BaseRequestHandler:
def __init__(self, request, client_address, server):
self.request = request
self.client_address = client_address
self.server = server
self.setup()
try:
self.handle()
finally:
self.finish()
def setup(self):
pass
def handle(self):
pass
def finish(self):
pass
StreamRequestHandler
ファイル操作インタフェースの提供
class StreamRequestHandler(BaseRequestHandler):
"""Define self.rfile and self.wfile for stream sockets."""
# Default buffer sizes for rfile, wfile.
# We default rfile to buffered because otherwise it could be
# really slow for large data (a getc() call per byte); we make
# wfile unbuffered because (a) often after a write() we want to
# read and we need to flush the line; (b) big writes to unbuffered
# files are typically optimized by stdio even when big reads
# aren't.
rbufsize = -1
wbufsize = 0
# A timeout to apply to the request socket, if not None.
timeout = None
# Disable nagle algorithm for this socket, if True.
# Use only when wbufsize != 0, to avoid small packets.
disable_nagle_algorithm = False
def setup(self):
self.connection = self.request
if self.timeout is not None:
self.connection.settimeout(self.timeout)
if self.disable_nagle_algorithm:
self.connection.setsockopt(socket.IPPROTO_TCP,
socket.TCP_NODELAY, True)
self.rfile = self.connection.makefile('rb', self.rbufsize)
self.wfile = self.connection.makefile('wb', self.wbufsize)
def finish(self):
if not self.wfile.closed:
try:
self.wfile.flush()
except socket.error:
# A final socket error may have occurred here, such as
# the local error ECONNABORTED.
pass
self.wfile.close()
self.rfile.close()
DatagramRequestHandler
class DatagramRequestHandler(BaseRequestHandler):
"""Define self.rfile and self.wfile for datagram sockets."""
def setup(self):
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
self.packet, self.socket = self.request
self.rfile = StringIO(self.packet)
self.wfile = StringIO()
def finish(self):
self.socket.sendto(self.wfile.getvalue(), self.client_address)
著作権
著者:bigfishライセンス契約:ライセンス契約知識共有署名-非商業的使用4.0国際ライセンス契約