pythonにおける生産者と消費者モデル


生産者と消費者モデル
要旨:最近、いくつかの生産者と消費者モデルを書いて、特にこの文を総括とする.異なる生産者と消費者の状況をまとめた.1人の生産者、複数の消費者、1人の生産者、複数の消費者のプログラミングモデル.
一.生産者と消費者
ソフトウェア開発の過程で、いくつかのモジュールがデータの生産を担当し、これらのデータは他のモジュールが処理を担当する(ここでのモジュールは関数、スレッド、プロセスなどである可能性がある).データを生成するモジュールを生産者と呼び,データを処理するモジュールを消費者と呼ぶ.
二.生産者と消費者モデルのメリット:
  • デカップリング
  • 合併
  • 閑忙不均
  • 一.マルチスレッド実装生産者と消費者モデルの例
  • 生産者と消費者
  • # !/usr/bin/env python3
    # -*- coding: UTF-8 -*-
    """
    @author: Frank
    @contact: [email protected]
    @file: 18.py
    @time: 2018/7/10   8:59
    
         demon          
    
    """
    import time
    import queue
    
    import threading
    import random
    
    
    class Producer(threading.Thread):
        """
               
        """
        FINISHED = True
    
        def __init__(self, name, queue):
            # python3   
            super().__init__(name=name)
            self.queue = queue
    
        def run(self):
            for i in range(10):
                print("%s is producing %d to the queue!" % (self.getName(), i))
                self.queue.put(i)
                time.sleep(random.randint(1, 10) * 0.1)
    
            #         
            self.queue.put(self.FINISHED)
            print("%s finished!" % self.getName())
    
    
    class Consumer(threading.Thread):
        """
            ,
                
        """
        FINISHED = True
    
        def __init__(self, name, queue):
            super().__init__(name=name)
            self.queue = queue
    
        def run(self):
            while True:
                value = self.queue.get()
                #       
                if value is self.FINISHED:
                    break
                print("{} is consuming. {} in the queue is consumed!".format(self.getName(), value))
    
            print("%s finished!" % self.getName())
    
    
    if __name__ == '__main__':
        queue = queue.Queue()
        producer = Producer('producer', queue)
    
        consumer = Consumer('consumer', queue)
    
        producer.start()
        consumer.start()
    
        consumer.join()
        producer.join()
    
        print('All threads  done')
    

    結果は次のとおりです.
    producer is producing 0 to the queue!
    consumer is consuming. 0 in the queue is consumed!
    producer is producing 1 to the queue!
    consumer is consuming. 1 in the queue is consumed!
    producer is producing 2 to the queue!
    consumer is consuming. 2 in the queue is consumed!
    producer is producing 3 to the queue!
    consumer is consuming. 3 in the queue is consumed!
    producer is producing 4 to the queue!
    consumer is consuming. 4 in the queue is consumed!
    producer is producing 5 to the queue!
    consumer is consuming. 5 in the queue is consumed!
    producer is producing 6 to the queue!
    consumer is consuming. 6 in the queue is consumed!
    producer is producing 7 to the queue!
    consumer is consuming. 7 in the queue is consumed!
    producer is producing 8 to the queue!
    consumer is consuming. 8 in the queue is consumed!
    producer is producing 9 to the queue!
    consumer is consuming. 9 in the queue is consumed!
    producer finished!
    consumer finished!
    All threads  done
    
    Process finished with exit code 0
    
    FINISHED                      .        ,               .
    
  • しかし、複数の消費者がいると問題が発生する.次のプレゼンテーションコードを見ると、消費者はここに詰まっています.
  • #!/usr/bin/env python3
    # -*- coding: UTF-8 -*-
    """
    @author: Frank 
    @contact: [email protected]
    @file: 18.py
    @time: 2018/7/10   8:59
    
         demon          
    
    """
    import time
    from datetime import datetime, timedelta
    import queue
    
    import threading
    import random
    
    #   
    _sentinel = object()
    
    
    class Producer(threading.Thread):
        """
               
        """
    
        def __init__(self, name, queue):
            # python3   
            super().__init__(name=name)
            self.queue = queue
    
        def run(self):
            for i in range(5):
                print("%s is producing %d to the queue!" % (self.getName(), i))
                self.queue.put(i)
                time.sleep(random.randint(1, 10) * 0.1)
    
            #         
            self.queue.put(_sentinel)
            print("%s finished!" % self.getName())
    
    
    class Consumer(threading.Thread):
        """
            ,
                
        """
    
        def __init__(self, name, queue):
            super().__init__(name=name)
            self.queue = queue
    
        def run(self):
            while True:
                #         .
                value = self.queue.get()
                #       
                if value is _sentinel:
                    break
                print("{} is consuming. {} in the queue is consumed!".format(self.getName(), value))
    
            print("%s finished!" % self.getName())
    
    
    if __name__ == '__main__':
        queue = queue.Queue()
        producer = Producer('producer', queue)
    
        consumer = Consumer('consumer', queue)
        consumer2 = Consumer('consumer2', queue)
    
        producer.start()
    
        consumer.start()
        consumer2.start()
    
        producer.join()
        consumer.join()
        consumer2.join()
    
        print('All threads  done')
    
    producer is producing 0 to the queue!
    consumer is consuming. 0 in the queue is consumed!
    producer is producing 1 to the queue!
    consumer is consuming. 1 in the queue is consumed!
    producer is producing 2 to the queue!
    consumer2 is consuming. 2 in the queue is consumed!
    producer is producing 3 to the queue!
    consumer is consuming. 3 in the queue is consumed!
    producer is producing 4 to the queue!
    consumer2 is consuming. 4 in the queue is consumed!
    producer finished!
    consumer finished!
    
    
    

    上は2番目の消費者のところに詰まっています.1番目の消費者がフラグを持って行ってスレッドを終了したからです.消費者2はキューが空いていることに気づき、そこで待っていたので、そのままここに詰まっていた.
  • 生産者と複数の消費者が現れるとき.

  • 解決方法は、各スレッドの消費が完了すると、フラグビットをキューに戻すことで、他の消費者スレッドを退けることができる.次の例を見てみましょう.
    #!/usr/bin/env python3
    # -*- coding: UTF-8 -*-
    """
    @author: Frank 
    @contact: [email protected]
    @file: 18.py
    @time: 2018/7/10   8:59
    
         demon          
    
    """
    import time
    import queue
    
    import threading
    import random
    
    #   
    _sentinel = object()
    
    
    class Producer(threading.Thread):
        """
               
        """
    
        def __init__(self, name, queue):
            # python3   
            super().__init__(name=name)
            self.queue = queue
    
        def run(self):
            for i in range(15):
                print("%s is producing %d to the queue!" % (self.getName(), i))
                self.queue.put(i)
                time.sleep(random.randint(1, 20) * 0.1)
    
            #         
            self.queue.put(_sentinel)
            print("%s finished!" % self.getName())
    
    
    class Consumer(threading.Thread):
        """
            ,       .
        """
    
        def __init__(self, name, queue):
            super().__init__(name=name)
            self.queue = queue
    
        def run(self):
            while True:
                value = self.queue.get()
                #       
                if value is _sentinel:
                    #     ,           
                    self.queue.put(value)
                    break
                print("{} is consuming. {} in the queue is consumed!".format(self.getName(), value))
    
            print("%s finished!" % self.getName())
    
    
    if __name__ == '__main__':
        queue = queue.Queue()
        producer = Producer('producer', queue)
    
        producer.start()
    
        consumer_threads = []
        for i in range(5):
            consumer = Consumer('consumer_' + str(i), queue)
            consumer_threads.append(consumer)
            consumer.start()
    
        producer.join()
        for consumer in consumer_threads:
            consumer.join()
    
        producer.join()
    
        print('All threads  done')
    
        :
    
    producer is producing 0 to the queue!
    consumer_0 is consuming. 0 in the queue is consumed!
    producer is producing 1 to the queue!
    consumer_0 is consuming. 1 in the queue is consumed!
    producer is producing 2 to the queue!
    consumer_1 is consuming. 2 in the queue is consumed!
    producer is producing 3 to the queue!
    consumer_2 is consuming. 3 in the queue is consumed!
    producer is producing 4 to the queue!
    consumer_3 is consuming. 4 in the queue is consumed!
    producer is producing 5 to the queue!
    consumer_4 is consuming. 5 in the queue is consumed!
    producer is producing 6 to the queue!
    consumer_0 is consuming. 6 in the queue is consumed!
    producer is producing 7 to the queue!
    consumer_1 is consuming. 7 in the queue is consumed!
    producer is producing 8 to the queue!
    consumer_2 is consuming. 8 in the queue is consumed!
    producer is producing 9 to the queue!
    consumer_3 is consuming. 9 in the queue is consumed!
    producer is producing 10 to the queue!
    consumer_4 is consuming. 10 in the queue is consumed!
    producer is producing 11 to the queue!
    consumer_0 is consuming. 11 in the queue is consumed!
    producer is producing 12 to the queue!
    consumer_1 is consuming. 12 in the queue is consumed!
    producer is producing 13 to the queue!
    consumer_2 is consuming. 13 in the queue is consumed!
    producer is producing 14 to the queue!
    consumer_3 is consuming. 14 in the queue is consumed!
    producer finished!
    consumer_4 finished!
    consumer_0 finished!
    consumer_1 finished!
    consumer_2 finished!
    consumer_3 finished!
    All threads  done
    
    Process finished with exit code 0
    
    

    複数の生産と複数の消費者モデルが現れると
    二.複数の生産者と複数の消費者モデル
  • 第1の実施形態
  • #!/usr/bin/env python3
    # -*- coding: UTF-8 -*-
    """
    @author: Frank
    @contact: [email protected]
    @file: test_producer_consumer.py
    @time: 2018/8/31   9:55
    
         demon            
                
    
    2               .
    
    """
    
    import time
    import queue
    
    import threading
    import random
    
    from collections import deque
    
    #   
    _sentinel = object()
    _sentinel2 = object()
    
    
    class Producer(threading.Thread):
        """
               
        """
    
        def __init__(self, name, queue):
            # python3   
            super().__init__(name=name)
            self._queue = queue
    
        def run(self):
            for i in range(5):
                print("{} is producing {} to the queue!".format(self.getName(), i))
    
                self._queue.put(i)
                time.sleep(random.randint(1, 20) * 0.1)
    
            #         
            self._queue.put(_sentinel)
            print("%s finished!" % self.getName())
    
    
    class Producer2(Producer):
    
        def run(self):
            for i in range(65, 70):
                item = chr(i)
    
                print("{} is producing {} to the queue!".format(self.getName(), item))
                self._queue.put(item)
                time.sleep(random.randint(1, 20) * 0.8)
    
            #         
            self._queue.put(_sentinel2)
            print("%s finished!" % self.getName())
    
    
    class Consumer(threading.Thread):
        """
            
        """
    
        _deque = deque()
    
        def __init__(self, name, queue, lock):
            super().__init__(name=name)
            self._queue = queue
            self._lock = lock
    
        def run(self):
            while True:
                value = self._queue.get(block=True, timeout=10)
                #       
                if value in (_sentinel, _sentinel2):
                    with self._lock:
                        if value not in Consumer._deque:
                            Consumer._deque.append(value)
    
                    self._queue.put(value)
                    if len(Consumer._deque) == 2:
                        print('Consumer._deque  ==2  break')
                        break
                else:
                    print("{} is consuming. {} in the queue is consumed!".format(self.getName(), value))
    
            print("{} finished!".format(self.getName()))
    
    
    if __name__ == '__main__':
    
        q = queue.Queue()
    
        lock = threading.Lock()
    
        sentienl_queue = queue.Queue()
    
        producer = Producer('producer111', q)
        producer2 = Producer2('producer222', q)
    
        producer2.start()
        producer.start()
    
        consumer_threads = []
        for i in range(5):
            consumer = Consumer('consumer_' + str(i), q, lock)
            consumer_threads.append(consumer)
            consumer.start()
    
        for consumer in consumer_threads:
            consumer.join()
    
        producer.join()
        producer2.join()
    
        print('All threads  done')
    
    

    結果は次のとおりです.
    producer222 is producing A to the queue!
    producer111 is producing 0 to the queue!
    consumer_0 is consuming. A in the queue is consumed!
    consumer_0 is consuming. 0 in the queue is consumed!
    producer111 is producing 1 to the queue!
    consumer_0 is consuming. 1 in the queue is consumed!
    producer111 is producing 2 to the queue!
    consumer_1 is consuming. 2 in the queue is consumed!
    producer111 is producing 3 to the queue!
    consumer_2 is consuming. 3 in the queue is consumed!
    producer111 is producing 4 to the queue!
    consumer_3 is consuming. 4 in the queue is consumed!
    producer111 finished!
    producer222 is producing B to the queue!
    consumer_4 is consuming. B in the queue is consumed!
    producer222 is producing C to the queue!
    consumer_4 is consuming. C in the queue is consumed!
    producer222 is producing D to the queue!
    consumer_1 is consuming. D in the queue is consumed!
    producer222 is producing E to the queue!
    consumer_0 is consuming. E in the queue is consumed!
    producer222 finished!
    Consumer._deque  ==2  break
    consumer_1 finished!
    Consumer._deque  ==2  break
    consumer_2 finished!
    Consumer._deque  ==2  break
    consumer_3 finished!
    Consumer._deque  ==2  break
    consumer_4 finished!
    Consumer._deque  ==2  break
    consumer_0 finished!
    All threads  done
    
    Process finished with exit code 0
    
    
  • 第2の実施形態
  • #!/usr/bin/env python3
    # -*- coding: UTF-8 -*-
    """
    @author: Frank
    @contact: [email protected]
    @file: test_producer_consumer.py
    @time: 2018/8/31   9:55
    
         demon            
                
    
    
    2               .
    
    """
    import time
    import queue
    
    import threading
    import random
    from collections import deque
    
    #              
    _sentinel = object()
    _sentinel2 = object()
    
    
    class Producer(threading.Thread):
        """
               
        """
    
        def __init__(self, name, queue):
            # python3   
            super().__init__(name=name)
            self._queue = queue
    
        def run(self):
            for i in range(5):
                print("{} is producing {} to the queue!".format(self.getName(), i))
    
                self._queue.put(i)
                time.sleep(0.1)
    
            #         
            self._queue.put(_sentinel)
            print("%s finished!" % self.getName())
    
    
    class Producer2(Producer):
    
        def run(self):
            for i in range(65, 70):
                item = chr(i)
    
                print("{} is producing {} to the queue!".format(self.getName(), item))
                self._queue.put(item)
                time.sleep(random.randint(1, 20) * 0.8)
    
            #         
            self._queue.put(_sentinel2)
            print("%s finished!" % self.getName())
    
    
    class Consumer(threading.Thread):
        """
            
        """
    
        _deque = deque()
    
        def __init__(self, name, queue):
            super().__init__(name=name)
            self._queue = queue
    
        @staticmethod
        def consume_data(datas):
            """
                     ,         ,      .       ,       .
            :param datas:
            :return: int
            """
    
            return 0
    
        def run(self):
            while True:
                try:
                    datas = self._queue.get(block=True, timeout=10)
                except queue.Empty:
                    # print(f"queue  is  empty. ")
                    datas = None
    
                #       
                if datas in (_sentinel, _sentinel2):
                    if datas not in Consumer._deque:
                        print(f'put {datas} into the  Consumer._deque')
                        Consumer._deque.append(datas)
                    if len(Consumer._deque) == 2:
                        print('Consumer._deque length == 2 ,break, current_thread:{} has finished'.format(self.getName()))
                        break
                else:
                    #     
                    if datas:
                        #     
                        sucess_number = self.consume_data(datas)
                        print("{} is consuming.  datas: {} in the queue is consumed! sucess_number:{}".format(self.getName(), datas, sucess_number))
                    else:
                        print('datas is  None...')
                        if len(Consumer._deque) == 2:
                            print('Consumer._deque length == 2 ,break, current_thread:{} has finished'.format(self.getName()))
                            break
    
    
    if __name__ == '__main__':
    
        q = queue.Queue()
    
        producer = Producer('producer111', q)
        producer2 = Producer2('producer222', q)
    
        producer.start()
        producer2.start()
    
        consumer_threads = []
        for i in range(5):
            consumer = Consumer('consumer_' + str(i), q)
            consumer_threads.append(consumer)
            consumer.start()
    
        for consumer in consumer_threads:
            consumer.join()
    
        producer2.join()
        producer.join()
    
        print('All threads  done')
    
    

    実行結果の一部を下図に示します.
    
    producer222 is producing D to the queue!
    consumer_0 is consuming.  datas: D in the queue is consumed! sucess_number:0
    datas is  None...
    datas is  None...
    datas is  None...
    datas is  None...
    datas is  None...
    datas is  None...
    datas is  None...
    datas is  None...
    datas is  None...
    producer222 is producing E to the queue!
    consumer_0 is consuming.  datas: E in the queue is consumed! sucess_number:0
    producer222 finished!
    put  into the  Consumer._deque
    Consumer._deque length == 2 ,break, current_thread:consumer_1 has finished
    datas is  None...
    Consumer._deque length == 2 ,break, current_thread:consumer_2 has finished
    datas is  None...
    Consumer._deque length == 2 ,break, current_thread:consumer_3 has finished
    datas is  None...
    Consumer._deque length == 2 ,break, current_thread:consumer_4 has finished
    datas is  None...
    Consumer._deque length == 2 ,break, current_thread:consumer_0 has finished
    All threads  done
    

    まとめ:本稿で簡単にまとめたPythonにおける消費者と生産者モデルというプログラミングモデルの書き方は、マルチスレッドによって生産者と消費者モデルを実現する.
    参考ドキュメントPythonマルチスレッドで生産者消費者モデルを実現https://segmentfault.com/a/1190000008909344
    分かち合う楽しみ、残して感动します.2018-09-30 22:46:42--frank