selectモジュールで実現されたsocket server


ソケットサービス
前のノートには記録が乱れていたので、最後にクラスを書いて、モジュールのようにパッケージしてみました.使用するときは継承によってサブクラスを生成しrun実行を呼び出す.一部のメソッドを再構築する必要があります.また、サブクラスに新しいメソッドを作成する必要があります.少なくともonrecvメソッドを再構築し,データを受信した後の処理が必要である.また、データを送信するにはsend_を呼び出します.dataインタフェースは、conn接続とbytesタイプのdataを転送します.
import logging
import queue
import select
import socket

class Server(object):
    def __init__(self, host, port, data_size=1024):
        self.host = host
        self.port = port
        self.data_size = data_size

        self.server = socket.socket()
        self.server.setblocking(False)
        self.server.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
        self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server.bind((self.host, self.port))
        self.server.listen()
        logging.critical("      ")

        self.inputs = [self.server, ]
        self.outputs = []
        self.data_queue = {}

    def run(self):
        while True:
            self.loop()

    def loop(self):
        """    select   
        run              
               :
                     ,                
        """
        readable, writeable, exceptional = select.select(self.inputs, self.outputs, self.inputs)
        logging.debug("select  : {readable:%s, writeable:%s, exceptional:%s}"
                      % (readable, writeable, exceptional))
        for r in readable:
            if r is self.server:
                conn, addr = r.accept()
                logging.info("          : {conn: %s, addr: %s}" % (conn, addr))
                conn.setblocking(False)
                self.inputs.append(conn)
                self.data_queue[conn] = queue.Queue()
            else:
                try:
                    data = r.recv(self.data_size)
                except ConnectionResetError as e:
                    logging.error("recv      : %s" % e)
                    self.clean(r)
                    if r in writeable: writeable.remove(r)
                except Exception as e:
                    logging.critical("recv       : %s" % e)
                    self.clean(r)
                else:
                    if data:  #        
                        logging.debug(data)
                        self.onrecv(r, data)
                    else:  #        ,        
                        logging.info("      : %s" % r)
                        self.clean(r)
                        #   writeable    writeable     
                        if r in writeable: writeable.remove(r)

        for w in writeable:
            try:
                data = self.data_queue[w].get_nowait()
            except KeyError as e:  #        ,                  ,          
                logging.error("            : %s" % e)
                # self.clean(w)  #             
            except queue.Empty:  #                ,     remove
                # if w in self.outputs: self.outputs.remove(w)
                self.outputs.remove(w)  #         ,      。        KeyError
            except Exception as e:
                logging.critical("             : %s" % e)
                self.clean(w)
            else:
                if data:
                    try:
                        w.sendall(data)  #        ,  send        remove。         
                    except ConnectionResetError as e:
                        logging.error("send      : %s" % e)
                    except Exception as e:
                        logging.critical("send       : %s" % e)

        for e in exceptional:
            logging.critical("       : %s" % e)
            self.clean(e)

    def clean(self, conn):
        """         
                 4 
        1.        ,         
        2.        ,          ,       
        3.       
        4.                ,          
               :
                              ,
               ,           ,        
        """
        if conn in self.inputs: self.inputs.remove(conn)
        if conn in self.outputs: self.outputs.remove(conn)
        conn.close()
        if conn in self.data_queue: del self.data_queue[conn]

    def onrecv(self, conn, data):
        """            
               
             send_data  ,    
        """
        self.send_data(conn, data)

    def send_data(self, conn, data):
        """    """
        if conn not in self.outputs: self.outputs.append(conn)
        self.data_queue[conn].put(data)

if __name__ == '__main__':
    Server('localhost', 9999).run()

クライアントのテスト
次は、マルチスレッドによる同時実行をテストするクライアントプログラムです.
import socket
import threading

HOST = 'localhost'
PORT = 9999
def client(i):
    client = socket.socket()
    client.connect((HOST, PORT))
    for j in range(500):
        msg = "hello %s %s" % (i, j)
        client.send(msg.encode('utf-8'))
        data = client.recv(1024)
        print('Received:', data.decode('utf-8'))
    client.close()

if __name__ == '__main__':
    for i in range(50):
        t = threading.Thread(target=client, args=(i,))
        t.start()