Pythonケース-ネットワークプログラミング-スレッドプール
9750 ワード
スレッドプール:
python2.*本物のスレッドプールはありませんpython 3.*機能が弱いため、通常は自分で書くか、サードパーティのソフトウェアで以下の2つのカスタムスレッドプールコードを例に挙げます.
第1種:for循環配合キューによってマルチスレッドの特徴を実現する:1、キューにスレッドを格納する;2、スレッドの使用が終わったら閉じて、それから再創して、繰り返し使用できません;3、プログラムを開くと、スレッドプールは最大オンラインになります.
第二種類:少し複雑な特徴:1、キューにタスクを保存する;2、任務は2種類に分けられる:a、真実の任務;b、閉鎖信号;3、現在のタスク量に基づいてスレッドを作成する.4、スレッドは繰り返し使用できる.
python2.*本物のスレッドプールはありませんpython 3.*機能が弱いため、通常は自分で書くか、サードパーティのソフトウェアで以下の2つのカスタムスレッドプールコードを例に挙げます.
第1種:for循環配合キューによってマルチスレッドの特徴を実現する:1、キューにスレッドを格納する;2、スレッドの使用が終わったら閉じて、それから再創して、繰り返し使用できません;3、プログラムを開くと、スレッドプールは最大オンラインになります.
#!/usr/bin/env python
# -- coding = 'utf-8' --
# Author Allen Lee
# Python Version 3.5.1
# OS Windows 7
import queue
import threading
import time
class ThreadPool:
def __init__(self,maxsize):
self.maxsize = maxsize
self._q = queue.Queue(maxsize)
for i in range(maxsize):
self._q.put(threading.Thread)
def get_thread(self):
return self._q.get()
def add_thread(self):
self._q.put(threading.Thread)
pool = ThreadPool(5)
def task(arg,p):
print(arg)
time.sleep(1)
p.add_thread
for i in range(100):
#threading.Thread
t = pool.get_thread()
obj = t(target=task,args=(i,pool,))
obj.start()
第二種類:少し複雑な特徴:1、キューにタスクを保存する;2、任務は2種類に分けられる:a、真実の任務;b、閉鎖信号;3、現在のタスク量に基づいてスレッドを作成する.4、スレッドは繰り返し使用できる.
#!/usr/bin/env python
# -- coding = 'utf-8' --
# Author Allen Lee
# Python Version 3.5.1
# OS Windows 7
import threading
import time
import queue
import contextlib
stoptarget = object()
#
class ThreadPool(object):
# , maxsize
def __init__(self,maxsize,maxtasklist=None):
if maxtasklist:
self.q = queue.Queue(maxtasklist)
else:
self.q = queue.Queue(maxsize)
self.maxsize = maxsize
#
self.Nclose = False
# ,
self.Iclose = False
#
self.inuse_list = []
#
self.free_list = []
def run(self,task,args,callback=None):
"""
#task
#args
#callback , :1、 ;2、 ,default=None,
# , , True or None
"""
# ,
if self.Nclose:
return
# , , , ,
if len(self.free_list) == 0 and len(self.inuse_list) < self.maxsize:
self.inuse_thread()
obj = (task,args,callback,)
self.q.put(obj)
#
def inuse_thread(self):
opj = threading.Thread(target=self.call)
opj.start()
#
def call(self):
current_thread = threading.currentThread
#
self.inuse_list.append(current_thread)
#
event = self.q.get()
# , , task,args,callback
while event != stoptarget:
task,args,callback = event
# , task , result, success True
try:
result = task(*args)
success = True
# False None
except Exception as f:
success = False
result = None
# success result ,
if callback is not None:
try:
callback(success, result)
except Exception as f:
pass
with self.worker_state(self.free_list, current_thread):
# , stoptarget
if self.Iclose:
event = stoptarget
#
else:
event = self.q.get()
else:
#
self.inuse_list.remove(current_thread)
# ,
def Nclose(self):
self.Nclose = True
full_size = len(self.inuse_list)
while full_size:
self.q.put(stoptarget)
full_size -= 1
# , ,
def Iclose(self):
self.Iclose = True
while self.Iclose:
self.q.put(stoptarget)
self.q.empty()
# ,
@contextlib.contextmanager
def worker_state(self, state_list, worker_thread):
state_list.append(worker_thread)
#
try:
yield
finally:
state_list.remove(worker_thread)
# 5
pool = ThreadPool(5)
#
def callback(status,result):
print(status,result)
# task
def action(i):
print(i)
#100
for i in range(100):
t = pool.run(action,(i,),callback)
time.sleep(2)
print(len(pool.inuse_list), len(pool.free_list))