Python探索のカスタム実装スレッドプール

5156 ワード

スレッドプールが必要なのはなぜですか?
タスクを使用してサブスレッド処理を開始し、処理が完了した後、サブスレッドを破棄したり、サブスレッドが自然に死亡したりすると、タスクにかかる時間が短くなりますが、タスクの数が多くなると、スレッドの作成と終了に多くの時間がかかり、効率が低下することを想定します.
スレッドプールの原理:
スレッドプール(Thread pool)である以上、名前はイメージ的で、指定された数の利用可能なサブスレッドを1つの「プール」に入れ、タスクがあるときに1つのスレッドを取り出して実行し、タスクが実行された後、すぐにスレッドを破棄するのではなく、スレッドプールに入れ、次のタスクの受信を待つ.これにより、メモリとcpuのオーバーヘッドも小さく、スレッドの数を制御できます.
スレッドプールの実装:
スレッドプールには多くの実装方法があり、pythonではQueue-キューという優れた実装方法が提供されています.pythonではQueue自体が同期しているので、つまりスレッドが安全なので、複数のスレッドに1つのQueueを共有させることが安心できます.
スレッドプールといえば、実行すべきタスクプールがあるはずです.タスクプールには実行すべきタスクが格納されており、各スレッドがタスクプールにタスク実行を取り、Queueでタスクプールを実現するのが最善です.
1.low版スレッドプール
設計構想:キューqueueの運用
スレッドクラス名をキューに入れ、実行すると1つずつ取り出します.

import queue
import threading
class ThreadPool(object):
  def __init__(self, max_num=20):
    self.queue = queue.Queue(max_num) #    ,    20
    for i in range(max_num):
      self.queue.put(threading.Thread) #        
  def get_thread(self):
    return self.queue.get() #        
  def add_thread(self):
    self.queue.put(threading.Thread) #        
def func(arg, p): #      
  print(arg)
  import time
  time.sleep(2)
  p.add_thread()
pool = ThreadPool(10) #    ,          ,            
for i in range(30):
  thread = pool.get_thread() #      get_thread  ,    
  t = thread(target=func, args=(i, pool)) #    ,  func,   args 
  t.start()

この方法は,使用者に元の関数を修正し,元の関数にパラメータを伝達するように要求し,呼び出し方法も変更され,空きスレッドがリソースを浪費し,実際の操作では不便であるため,次のスレッドプールを設計した.
2.絶版スレッドプール
設計構想:キューqueueの運用
a.キューにタスクを入れるb.スレッドは何度もタスクを取りに行き、スレッドが空き次第タスクを取りに行く

import queue
import threading
import contextlib
import time
StopEvent = object()
class ThreadPool(object):
  def __init__(self, max_num, max_task_num = None):
    if max_task_num:
      self.q = queue.Queue(max_task_num)
    else:
      self.q = queue.Queue()
    self.max_num = max_num
    self.cancel = False
    self.terminal = False
    self.generate_list = []
    self.free_list = []
  def run(self, func, args, callback=None):
    """
             
    :param func:     
    :param args:         
    :param callback:                  ,         1、        ;2、       (   None, :       )
    :return:          ,   True  None
    """
    if self.cancel:
      return
    if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
      self.generate_thread()
    w = (func, args, callback,)
    self.q.put(w)
  def generate_thread(self):
    """
          
    """
    t = threading.Thread(target=self.call)
    t.start()
  def call(self):
    """
                    
    """
    current_thread = threading.currentThread()
    self.generate_list.append(current_thread)
    event = self.q.get()
    while event != StopEvent:
      func, args, callback = event
      try:
        result = func(*args)
        success = True
      except Exception as e:
        success = False
        result = None
      if callback is not None:
        try:
          callback(success, result)
        except Exception as e:
          pass
      with self.worker_state(self.free_list, current_thread):
        if self.terminal:
          event = StopEvent
        else:
          event = self.q.get()
    else:
      self.generate_list.remove(current_thread)
  def close(self):
    """
             ,      
    """
    self.cancel = True
    count = len(self.generate_list)
    while count:
      self.q.put(StopEvent)
      count -= 1
  def terminate(self):
    """
            ,    
    """
    self.terminal = True
    while self.generate_list:
      self.q.put(StopEvent)
    self.q.queue.clear()
  @contextlib.contextmanager
  def worker_state(self, state_list, worker_thread):
    """
                   
    """
    state_list.append(worker_thread)
    try:
      yield
    finally:
      state_list.remove(worker_thread)
# How to use
pool = ThreadPool(5)
def callback(status, result):
  # status, execute action status
  # result, execute action return value
  pass
def action(i):
  print(i)
for i in range(30):
  ret = pool.run(action, (i,), callback)
time.sleep(3)
print(len(pool.generate_list), len(pool.free_list))
print(len(pool.generate_list), len(pool.free_list))
pool.close()
# pool.terminate()

まとめ
以上がPython探索のカスタム実装スレッドプールに関するすべての内容であり,皆さんの役に立つことを願っている.興味のある方は引き続き当駅:pythonのモジュールの__を参照してください.all__属性の詳細、Pythonのオブジェクト向けプログラミングの基礎解析(二)など、不足点があれば、コメントを歓迎します.友达の本駅に対する支持に感谢します!