Pythonマルチスレッドとマルチプロセス(3)スレッドの同期の条件変数


このシリーズの記事ディレクトリ
展開/終了
  • Pythonマルチスレッドとマルチプロセス(一)GILロックとThreadを使用したマルチスレッド
  • の作成
  • Pythonマルチスレッドとマルチプロセス(二)スレッドの同期の反発ロックと再入ロック
  • Pythonマルチスレッドとマルチプロセス(3)スレッドの同期の条件変数
  • Pythonマルチスレッドとマルチプロセス(4)スレッドの同期信号量
  • Pythonマルチスレッドとマルチプロセス(5)マルチスレッド管理-スレッドプール
  • Pythonマルチスレッドとマルチプロセス(六)マルチプロセスプログラミングと同期
  • 同期方式2:条件変数
    まず、条件変数はマルチスレッド競合の共有リソースであるため、反発ロックと組み合わせて使用する必要があります.条件変数により待機と通知のメカニズムを実現できる.
    最も基本的な使用方法は次のとおりです.
    cond = Condition() #         
    cond.acquire()   #        
    
    cond.wait()     #   ,          ,       notify        
    do_something()   
    cond.notify()    #               cond   
    cond.release()
    

     
    そのうち:cond.acquire() 実は、lock.acquire()cond.wait()に相当する反発ロックで鍵をかけます.    3つのことができます:1.反発ロックを解放し、同じ条件変数を使用した他のスレッドがロックを取得して実行できるようにします.2.ブロックスリープ状態に入り、CPUを逃がす.3.本スレッドのwait()が他のスレッドnotifyによって起動されると、wait()はロックを再取得します.cond.notify()    他のスレッドに通知し、他のスレッドのwait()を起動します.cond.release()  反発ロックの解除
    wait()とrelease()はロック内で実行する必要があります.ロックを取得せずにwait()とrelease()を直接呼び出すとエラーが表示されます
    PS:使用  with cond:     if my_condition:         cond.wait()     do_something()     cond.notify()      および  cond.acquire() if my_condition:     cond.wait() do_something() cond.notify() cond.release()
    同じです.
    withを使うのはコードの外にロックをかけたのと同じです.
    例1:マルチスレッドの秩序ある実行を制御する2人のA,Bが会話を行い,それぞれ1~6という数字を言い,Aに先に言うように要求し,AがBを言ってから言うことができ,BがAを言ってからA:1 B:2 A:3 B:4 A:5 B:6と言うことができる
    from threading import Condition,Thread
    
    cond = Condition()
    a_say = [1,3,5]
    b_say = [2,4,6]
    
    class A(Thread):
      def __init__(self,cond,say):
        super(A,self).__init__(name="A")
        self.cond = cond
        self.say = say
    
      def run(self):
        self.cond.acquire()
        for i in range(len(self.say)):
          print("%s say %d" % (self.name,self.say.pop(0)))
          self.cond.notify() # A      B, B   
    
          if len(self.say):
            self.cond.wait()  # A       ,    B  , B   A,A     
    
        self.cond.release()
    
    
    class B(Thread):
      def __init__(self,cond,say):
        super(B,self).__init__(name="B")
        self.cond = cond
        self.say = say
    
      def run(self):
        self.cond.acquire()
        for i in range(len(self.say)):
          self.cond.wait()  #     A     B  ,     B       
          print("%s say %d" % (self.name,self.say.pop(0)))
          self.cond.notify() # B      A, A   
    
        self.cond.release()
    
    if __name__=="__main__":
      a = A(cond,a_say)
      b = B(cond,b_say)
    
      b.start()    #    b     ,a   ,  a   ,  a  b    wait()      notify(),    notify()       。  a  wait().b   wait()       ,         
      a.start()
    

             
    ここでは条件変数を用いて実現した
    もちろん,複数の反発ロックを用いてマルチスレッドの秩序化実行を実現することもでき,前述の反発ロックの例で示した.
     
    例2:相互反発ロック、条件変数、リストを使用して、スレッドの安全なキューを実現します. 
    # coding=utf-8
    
    from threading import Thread
    from threading import Lock,Condition
    import random
    
    class ThreadSafeQueue:
      def __init__(self,max_size=0,blocking=True,timeout=None): #             
        self.max_size=max_size
        self.blocking=blocking #       
        self.timeout=timeout  #          
        self.lock = Lock()
        self.cond = Condition(lock=self.lock) #                    ,    Condition        
        self.queue = []
    
      def size(self):   # self.queue       ,    self.queue       ,     
        self.lock.acquire()
        size = len(self.queue)
        self.lock.release()
    
        return size
    
      def batch_push(self,items):
        if not isinstance(items,list):
          items=list(items)
        for item in items:
          self.push(item)
    
      def push(self,item):
        self.cond.acquire()
        while self.max_size>0 and len(self.queue)>=self.max_size:
          if self.blocking:
            res = self.cond.wait(timeout=self.timeout)  #     timeout     ,   False
           
            if not res:
              self.cond.release()
              return False
          else:
            self.cond.release()
            return False
    
        self.queue.append(item)
        self.cond.notify()
        self.cond.release()
    
    
        return True
    
      def pop(self):
        self.cond.acquire()
        while len(self.queue)<=0:
          if self.blocking:
            res=self.cond.wait(timeout=self.timeout)
           
            if not res:
              self.cond.release()
              return False
          else:
            self.cond.release()
            return False
    
        item = self.queue.pop()
        self.cond.notify()     #            
        self.cond.release()
    
        return item
    
      def get(self,index):
        self.lock.acquire()
        try:
          item = self.queue[index]
        except:
          item=None
        self.lock.release()
    
        return item
    
    #    
    def produce(q,n):
      for i in range(100000):
        q.push(i)
        print("Thread %d push %d" % (n,i))
    
    def consumer(q,n):
      count_none = 0 #   q.pop()      10   while  
      while True:
        item = q.pop()
        if item is False:
          count_none+=1
        else:
          count_none=0
          print("Thread %d pop %d" % (n,item))
    
        if count_none>=10:
          break
    
    
    #   
    if __name__=="__main__":
      queue = ThreadSafeQueue(1000)    #       ,   ,                    ,        
      # queue = ThreadSafeQueue(1000,timeout=1)    #       ,   ,                    ,  10        
      # queue = ThreadSafeQueue(1000,blocking=False)  #        ,   ,                     ,               
    
      #          ,       ,                  ,          ,        
      t1 = Thread(target=produce,args=(queue,1))
      t2 = Thread(target=produce,args=(queue,2))
      t3 = Thread(target=consumer,args=(queue,3))
      t1.start()
      t2.start()
      t3.start()
      t1.join()
      t2.join()
      t3.join()
     
    
    

                
    次はConditionの下位層がどのように実現されているかを説明します:1.Conditionをインスタンス化するとき、Conditionの_init__条件変数オブジェクトの使用を保護するためのRLock再読み込みロックが生成されます.この錠をRと呼ぶ
    2.wait()を実行する前にcond.acquire()を実行して条件変数に鍵をかけなければならない.上の鍵はRである.  wait()を実行するとき、wait()はいくつかのことをしました.  2−1.wait()は反発ロックを作成し、この反発ロックをXと呼び、Xに対してacquire()ロック:X.acquire()を呼び出し、その後、ロックXを双方向キューQに配置する.  2-2.wait()ロックRを解放し、他のスレッドがロックRを取得し、いくつかのタスクコードを実行できるようにする  2-3.wait()はRを解放した後、Xに対してもう一度ロックします.X.acquire()Xに2回連続してロックするため、デッドロックが発生し、wait()がブロック状態に入る.  だから条件変数のwait()はデッドロック方式でブロック待ちの機能を実現します!!     2-4.wait()のロックXが他のスレッドのnotify()によって解放されると、Rに再ロックされ、ロックXは二度と使用されません.次回wait()を呼び出すと新しいXロックが生成されます   3.他のスレッドはロックRを取得し、いくつかのタスクコードを実行した後、notify()を実行して前のスレッドのwait()を起動する  notify()はいくつかのことをしました.  3−1.キューQヘッダからロックXをポップアップし、ロックXを解放する.ロックXを解放することによってwait()を起動する.その後この錠Xは永遠に使われません      まとめ:Conditionの実装には2つのロックが使われています:_init__()時に作成される再ロックRとwait()は、条件変数と共有変数を保護するために作成された反発ロックX RのスレッドセキュリティXであり、スレッドセキュリティではなく、デッドロックを製造してブロック効果を達成するために使用される
    Rは繰り返し使用され、Xは使い捨てで、毎回新しいXが生成されます
    以下にCondition中__を貼り付けますinit__,wait()とnotify()のソース: 
    class Condition:
     
    
      def __init__(self, lock=None):
        if lock is None:
          lock = RLock()   #         R。       lock         lock。
        self._lock = lock
    
        self.acquire = lock.acquire
        self.release = lock.release
       
        try:
          self._release_save = lock._release_save
        except AttributeError:
          pass
        try:
          self._acquire_restore = lock._acquire_restore
        except AttributeError:
          pass
        try:
          self._is_owned = lock._is_owned
        except AttributeError:
          pass
        self._waiters = _deque()
    
     
    
      def wait(self, timeout=None):
        if not self._is_owned():
          raise RuntimeError("cannot wait on un-acquired lock")
        waiter = _allocate_lock()    ####     X ####
        waiter.acquire()        ####   X    ####
        self._waiters.append(waiter)  ####   X       Q ####
        saved_state = self._release_save()   ####     R ####
        gotit = False
        try:  
          if timeout is None:
            waiter.acquire()    ####      X      ,    ,      ####
            gotit = True
          else:
            if timeout > 0:
              gotit = waiter.acquire(True, timeout)
            else:
              gotit = waiter.acquire(False)
          return gotit
        finally:
          self._acquire_restore(saved_state)   #### X    , R     ####
          if not gotit:
            try:
              self._waiters.remove(waiter)
            except ValueError:
              pass
    
     
      def notify(self, n=1):
        if not self._is_owned():
          raise RuntimeError("cannot notify on un-acquired lock")
        all_waiters = self._waiters
        waiters_to_notify = _deque(_islice(all_waiters, n))
        if not waiters_to_notify:
          return
        for waiter in waiters_to_notify:
          waiter.release()    ####     X ####
          try:
            all_waiters.remove(waiter)  ####   X   Q   ####
          except ValueError:
            pass
    
     
    

        Pythonマルチスレッドとマルチプロセス(三)スレッドの同期の条件変数