selectモジュールで実現されたsocket server
5250 ワード
ソケットサービス
前のノートには記録が乱れていたので、最後にクラスを書いて、モジュールのようにパッケージしてみました.使用するときは継承によってサブクラスを生成しrun実行を呼び出す.一部のメソッドを再構築する必要があります.また、サブクラスに新しいメソッドを作成する必要があります.少なくともonrecvメソッドを再構築し,データを受信した後の処理が必要である.また、データを送信するにはsend_を呼び出します.dataインタフェースは、conn接続とbytesタイプのdataを転送します.
クライアントのテスト
次は、マルチスレッドによる同時実行をテストするクライアントプログラムです.
前のノートには記録が乱れていたので、最後にクラスを書いて、モジュールのようにパッケージしてみました.使用するときは継承によってサブクラスを生成しrun実行を呼び出す.一部のメソッドを再構築する必要があります.また、サブクラスに新しいメソッドを作成する必要があります.少なくともonrecvメソッドを再構築し,データを受信した後の処理が必要である.また、データを送信するにはsend_を呼び出します.dataインタフェースは、conn接続とbytesタイプのdataを転送します.
import logging
import queue
import select
import socket
class Server(object):
def __init__(self, host, port, data_size=1024):
self.host = host
self.port = port
self.data_size = data_size
self.server = socket.socket()
self.server.setblocking(False)
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.bind((self.host, self.port))
self.server.listen()
logging.critical(" ")
self.inputs = [self.server, ]
self.outputs = []
self.data_queue = {}
def run(self):
while True:
self.loop()
def loop(self):
""" select
run
:
,
"""
readable, writeable, exceptional = select.select(self.inputs, self.outputs, self.inputs)
logging.debug("select : {readable:%s, writeable:%s, exceptional:%s}"
% (readable, writeable, exceptional))
for r in readable:
if r is self.server:
conn, addr = r.accept()
logging.info(" : {conn: %s, addr: %s}" % (conn, addr))
conn.setblocking(False)
self.inputs.append(conn)
self.data_queue[conn] = queue.Queue()
else:
try:
data = r.recv(self.data_size)
except ConnectionResetError as e:
logging.error("recv : %s" % e)
self.clean(r)
if r in writeable: writeable.remove(r)
except Exception as e:
logging.critical("recv : %s" % e)
self.clean(r)
else:
if data: #
logging.debug(data)
self.onrecv(r, data)
else: # ,
logging.info(" : %s" % r)
self.clean(r)
# writeable writeable
if r in writeable: writeable.remove(r)
for w in writeable:
try:
data = self.data_queue[w].get_nowait()
except KeyError as e: # , ,
logging.error(" : %s" % e)
# self.clean(w) #
except queue.Empty: # , remove
# if w in self.outputs: self.outputs.remove(w)
self.outputs.remove(w) # , 。 KeyError
except Exception as e:
logging.critical(" : %s" % e)
self.clean(w)
else:
if data:
try:
w.sendall(data) # , send remove。
except ConnectionResetError as e:
logging.error("send : %s" % e)
except Exception as e:
logging.critical("send : %s" % e)
for e in exceptional:
logging.critical(" : %s" % e)
self.clean(e)
def clean(self, conn):
"""
4
1. ,
2. , ,
3.
4. ,
:
,
, ,
"""
if conn in self.inputs: self.inputs.remove(conn)
if conn in self.outputs: self.outputs.remove(conn)
conn.close()
if conn in self.data_queue: del self.data_queue[conn]
def onrecv(self, conn, data):
"""
send_data ,
"""
self.send_data(conn, data)
def send_data(self, conn, data):
""" """
if conn not in self.outputs: self.outputs.append(conn)
self.data_queue[conn].put(data)
if __name__ == '__main__':
Server('localhost', 9999).run()
クライアントのテスト
次は、マルチスレッドによる同時実行をテストするクライアントプログラムです.
import socket
import threading
HOST = 'localhost'
PORT = 9999
def client(i):
client = socket.socket()
client.connect((HOST, PORT))
for j in range(500):
msg = "hello %s %s" % (i, j)
client.send(msg.encode('utf-8'))
data = client.recv(1024)
print('Received:', data.decode('utf-8'))
client.close()
if __name__ == '__main__':
for i in range(50):
t = threading.Thread(target=client, args=(i,))
t.start()