Pythonケース-ネットワークプログラミング-スレッドプール


スレッドプール:
python2.*本物のスレッドプールはありませんpython 3.*機能が弱いため、通常は自分で書くか、サードパーティのソフトウェアで以下の2つのカスタムスレッドプールコードを例に挙げます.
第1種:for循環配合キューによってマルチスレッドの特徴を実現する:1、キューにスレッドを格納する;2、スレッドの使用が終わったら閉じて、それから再創して、繰り返し使用できません;3、プログラムを開くと、スレッドプールは最大オンラインになります.
#!/usr/bin/env python
# -- coding = 'utf-8' --
# Author Allen Lee
# Python Version 3.5.1
# OS Windows 7
import queue
import threading
import time
class ThreadPool:
    def __init__(self,maxsize):
        self.maxsize = maxsize
        self._q = queue.Queue(maxsize)
        for i in range(maxsize):
            self._q.put(threading.Thread)
    def get_thread(self):
        return self._q.get()
    def add_thread(self):
        self._q.put(threading.Thread)

pool = ThreadPool(5)

def task(arg,p):
    print(arg)
    time.sleep(1)
    p.add_thread

for i in range(100):
    #threading.Thread 
    t = pool.get_thread()
    obj = t(target=task,args=(i,pool,))
    obj.start()

第二種類:少し複雑な特徴:1、キューにタスクを保存する;2、任務は2種類に分けられる:a、真実の任務;b、閉鎖信号;3、現在のタスク量に基づいてスレッドを作成する.4、スレッドは繰り返し使用できる.
#!/usr/bin/env python
# -- coding = 'utf-8' --
# Author Allen Lee
# Python Version 3.5.1
# OS Windows 7
import threading
import time
import queue
import contextlib

stoptarget = object()

#        
class ThreadPool(object):
    #          ,        maxsize
    def __init__(self,maxsize,maxtasklist=None):
        if maxtasklist:
            self.q = queue.Queue(maxtasklist)
        else:
            self.q = queue.Queue(maxsize)
        self.maxsize = maxsize
        #            
        self.Nclose = False
        #       ,      
        self.Iclose = False
        #          
        self.inuse_list = []
        #      
        self.free_list = []

    def run(self,task,args,callback=None):
        """
        #task     
        #args     
        #callback          ,      :1、    ;2、   ,default=None,        
        #   ,       ,  True or None
        """

        #              ,  
        if self.Nclose:
            return
        #        ,              ,       ,               ,              
        if len(self.free_list) == 0 and len(self.inuse_list) < self.maxsize:
            self.inuse_thread()
        obj = (task,args,callback,)
        self.q.put(obj)

    #      
    def inuse_thread(self):
        opj = threading.Thread(target=self.call)
        opj.start()

    #         
    def call(self):
        current_thread = threading.currentThread
        #           
        self.inuse_list.append(current_thread)
        #       
        event = self.q.get()
        #            ,    ,         task,args,callback     
        while event != stoptarget:
            task,args,callback = event
            #    ,    task  ,       result,  success  True
            try:
                result = task(*args)
                success = True
            #       False None
            except Exception as f:
                success = False
                result = None
            #           success result       ,         
            if callback is not None:
                try:
                    callback(success, result)
                except Exception as f:
                    pass
            with self.worker_state(self.free_list, current_thread):
                #       ,       stoptarget   
                if self.Iclose:
                    event = stoptarget
                #        
                else:
                    event = self.q.get()
        else:
                #                   
                self.inuse_list.remove(current_thread)

    #         ,    
    def Nclose(self):
        self.Nclose = True
        full_size = len(self.inuse_list)
        while full_size:
            self.q.put(stoptarget)
            full_size -= 1

     #        ,    ,      
    def Iclose(self):
        self.Iclose = True
        while self.Iclose:
            self.q.put(stoptarget)
        self.q.empty()

    #               ,                    
    @contextlib.contextmanager
    def worker_state(self, state_list, worker_thread):
        state_list.append(worker_thread)
        #     
        try:
            yield
        finally:
            state_list.remove(worker_thread)
#      5      
pool = ThreadPool(5)

#      
def callback(status,result):
    print(status,result)

#     task
def action(i):
    print(i)

#100     
for i in range(100):
    t = pool.run(action,(i,),callback)

time.sleep(2)
print(len(pool.inuse_list), len(pool.free_list))