Python探索のカスタム実装スレッドプール
5156 ワード
スレッドプールが必要なのはなぜですか?
タスクを使用してサブスレッド処理を開始し、処理が完了した後、サブスレッドを破棄したり、サブスレッドが自然に死亡したりすると、タスクにかかる時間が短くなりますが、タスクの数が多くなると、スレッドの作成と終了に多くの時間がかかり、効率が低下することを想定します.
スレッドプールの原理:
スレッドプール(Thread pool)である以上、名前はイメージ的で、指定された数の利用可能なサブスレッドを1つの「プール」に入れ、タスクがあるときに1つのスレッドを取り出して実行し、タスクが実行された後、すぐにスレッドを破棄するのではなく、スレッドプールに入れ、次のタスクの受信を待つ.これにより、メモリとcpuのオーバーヘッドも小さく、スレッドの数を制御できます.
スレッドプールの実装:
スレッドプールには多くの実装方法があり、pythonではQueue-キューという優れた実装方法が提供されています.pythonではQueue自体が同期しているので、つまりスレッドが安全なので、複数のスレッドに1つのQueueを共有させることが安心できます.
スレッドプールといえば、実行すべきタスクプールがあるはずです.タスクプールには実行すべきタスクが格納されており、各スレッドがタスクプールにタスク実行を取り、Queueでタスクプールを実現するのが最善です.
1.low版スレッドプール
設計構想:キューqueueの運用
スレッドクラス名をキューに入れ、実行すると1つずつ取り出します.
この方法は,使用者に元の関数を修正し,元の関数にパラメータを伝達するように要求し,呼び出し方法も変更され,空きスレッドがリソースを浪費し,実際の操作では不便であるため,次のスレッドプールを設計した.
2.絶版スレッドプール
設計構想:キューqueueの運用
a.キューにタスクを入れるb.スレッドは何度もタスクを取りに行き、スレッドが空き次第タスクを取りに行く
まとめ
以上がPython探索のカスタム実装スレッドプールに関するすべての内容であり,皆さんの役に立つことを願っている.興味のある方は引き続き当駅:pythonのモジュールの__を参照してください.all__属性の詳細、Pythonのオブジェクト向けプログラミングの基礎解析(二)など、不足点があれば、コメントを歓迎します.友达の本駅に対する支持に感谢します!
タスクを使用してサブスレッド処理を開始し、処理が完了した後、サブスレッドを破棄したり、サブスレッドが自然に死亡したりすると、タスクにかかる時間が短くなりますが、タスクの数が多くなると、スレッドの作成と終了に多くの時間がかかり、効率が低下することを想定します.
スレッドプールの原理:
スレッドプール(Thread pool)である以上、名前はイメージ的で、指定された数の利用可能なサブスレッドを1つの「プール」に入れ、タスクがあるときに1つのスレッドを取り出して実行し、タスクが実行された後、すぐにスレッドを破棄するのではなく、スレッドプールに入れ、次のタスクの受信を待つ.これにより、メモリとcpuのオーバーヘッドも小さく、スレッドの数を制御できます.
スレッドプールの実装:
スレッドプールには多くの実装方法があり、pythonではQueue-キューという優れた実装方法が提供されています.pythonではQueue自体が同期しているので、つまりスレッドが安全なので、複数のスレッドに1つのQueueを共有させることが安心できます.
スレッドプールといえば、実行すべきタスクプールがあるはずです.タスクプールには実行すべきタスクが格納されており、各スレッドがタスクプールにタスク実行を取り、Queueでタスクプールを実現するのが最善です.
1.low版スレッドプール
設計構想:キューqueueの運用
スレッドクラス名をキューに入れ、実行すると1つずつ取り出します.
import queue
import threading
class ThreadPool(object):
def __init__(self, max_num=20):
self.queue = queue.Queue(max_num) # , 20
for i in range(max_num):
self.queue.put(threading.Thread) #
def get_thread(self):
return self.queue.get() #
def add_thread(self):
self.queue.put(threading.Thread) #
def func(arg, p): #
print(arg)
import time
time.sleep(2)
p.add_thread()
pool = ThreadPool(10) # , ,
for i in range(30):
thread = pool.get_thread() # get_thread ,
t = thread(target=func, args=(i, pool)) # , func, args
t.start()
この方法は,使用者に元の関数を修正し,元の関数にパラメータを伝達するように要求し,呼び出し方法も変更され,空きスレッドがリソースを浪費し,実際の操作では不便であるため,次のスレッドプールを設計した.
2.絶版スレッドプール
設計構想:キューqueueの運用
a.キューにタスクを入れるb.スレッドは何度もタスクを取りに行き、スレッドが空き次第タスクを取りに行く
import queue
import threading
import contextlib
import time
StopEvent = object()
class ThreadPool(object):
def __init__(self, max_num, max_task_num = None):
if max_task_num:
self.q = queue.Queue(max_task_num)
else:
self.q = queue.Queue()
self.max_num = max_num
self.cancel = False
self.terminal = False
self.generate_list = []
self.free_list = []
def run(self, func, args, callback=None):
"""
:param func:
:param args:
:param callback: , 1、 ;2、 ( None, : )
:return: , True None
"""
if self.cancel:
return
if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
self.generate_thread()
w = (func, args, callback,)
self.q.put(w)
def generate_thread(self):
"""
"""
t = threading.Thread(target=self.call)
t.start()
def call(self):
"""
"""
current_thread = threading.currentThread()
self.generate_list.append(current_thread)
event = self.q.get()
while event != StopEvent:
func, args, callback = event
try:
result = func(*args)
success = True
except Exception as e:
success = False
result = None
if callback is not None:
try:
callback(success, result)
except Exception as e:
pass
with self.worker_state(self.free_list, current_thread):
if self.terminal:
event = StopEvent
else:
event = self.q.get()
else:
self.generate_list.remove(current_thread)
def close(self):
"""
,
"""
self.cancel = True
count = len(self.generate_list)
while count:
self.q.put(StopEvent)
count -= 1
def terminate(self):
"""
,
"""
self.terminal = True
while self.generate_list:
self.q.put(StopEvent)
self.q.queue.clear()
@contextlib.contextmanager
def worker_state(self, state_list, worker_thread):
"""
"""
state_list.append(worker_thread)
try:
yield
finally:
state_list.remove(worker_thread)
# How to use
pool = ThreadPool(5)
def callback(status, result):
# status, execute action status
# result, execute action return value
pass
def action(i):
print(i)
for i in range(30):
ret = pool.run(action, (i,), callback)
time.sleep(3)
print(len(pool.generate_list), len(pool.free_list))
print(len(pool.generate_list), len(pool.free_list))
pool.close()
# pool.terminate()
まとめ
以上がPython探索のカスタム実装スレッドプールに関するすべての内容であり,皆さんの役に立つことを願っている.興味のある方は引き続き当駅:pythonのモジュールの__を参照してください.all__属性の詳細、Pythonのオブジェクト向けプログラミングの基礎解析(二)など、不足点があれば、コメントを歓迎します.友达の本駅に対する支持に感谢します!