pythonにおける生産者と消費者モデル
18229 ワード
生産者と消費者モデル
要旨:最近、いくつかの生産者と消費者モデルを書いて、特にこの文を総括とする.異なる生産者と消費者の状況をまとめた.1人の生産者、複数の消費者、1人の生産者、複数の消費者のプログラミングモデル.
一.生産者と消費者
ソフトウェア開発の過程で、いくつかのモジュールがデータの生産を担当し、これらのデータは他のモジュールが処理を担当する(ここでのモジュールは関数、スレッド、プロセスなどである可能性がある).データを生成するモジュールを生産者と呼び,データを処理するモジュールを消費者と呼ぶ.
二.生産者と消費者モデルのメリット:デカップリング 合併 閑忙不均 一.マルチスレッド実装生産者と消費者モデルの例生産者と消費者
結果は次のとおりです.しかし、複数の消費者がいると問題が発生する.次のプレゼンテーションコードを見ると、消費者はここに詰まっています.
上は2番目の消費者のところに詰まっています.1番目の消費者がフラグを持って行ってスレッドを終了したからです.消費者2はキューが空いていることに気づき、そこで待っていたので、そのままここに詰まっていた.生産者と複数の消費者が現れるとき.
解決方法は、各スレッドの消費が完了すると、フラグビットをキューに戻すことで、他の消費者スレッドを退けることができる.次の例を見てみましょう.
複数の生産と複数の消費者モデルが現れると
二.複数の生産者と複数の消費者モデル第1の実施形態
結果は次のとおりです.第2の実施形態
実行結果の一部を下図に示します.
まとめ:本稿で簡単にまとめたPythonにおける消費者と生産者モデルというプログラミングモデルの書き方は、マルチスレッドによって生産者と消費者モデルを実現する.
参考ドキュメントPythonマルチスレッドで生産者消費者モデルを実現https://segmentfault.com/a/1190000008909344
分かち合う楽しみ、残して感动します.2018-09-30 22:46:42--frank
要旨:最近、いくつかの生産者と消費者モデルを書いて、特にこの文を総括とする.異なる生産者と消費者の状況をまとめた.1人の生産者、複数の消費者、1人の生産者、複数の消費者のプログラミングモデル.
一.生産者と消費者
ソフトウェア開発の過程で、いくつかのモジュールがデータの生産を担当し、これらのデータは他のモジュールが処理を担当する(ここでのモジュールは関数、スレッド、プロセスなどである可能性がある).データを生成するモジュールを生産者と呼び,データを処理するモジュールを消費者と呼ぶ.
二.生産者と消費者モデルのメリット:
# !/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
@author: Frank
@contact: [email protected]
@file: 18.py
@time: 2018/7/10 8:59
demon
"""
import time
import queue
import threading
import random
class Producer(threading.Thread):
"""
"""
FINISHED = True
def __init__(self, name, queue):
# python3
super().__init__(name=name)
self.queue = queue
def run(self):
for i in range(10):
print("%s is producing %d to the queue!" % (self.getName(), i))
self.queue.put(i)
time.sleep(random.randint(1, 10) * 0.1)
#
self.queue.put(self.FINISHED)
print("%s finished!" % self.getName())
class Consumer(threading.Thread):
"""
,
"""
FINISHED = True
def __init__(self, name, queue):
super().__init__(name=name)
self.queue = queue
def run(self):
while True:
value = self.queue.get()
#
if value is self.FINISHED:
break
print("{} is consuming. {} in the queue is consumed!".format(self.getName(), value))
print("%s finished!" % self.getName())
if __name__ == '__main__':
queue = queue.Queue()
producer = Producer('producer', queue)
consumer = Consumer('consumer', queue)
producer.start()
consumer.start()
consumer.join()
producer.join()
print('All threads done')
結果は次のとおりです.
producer is producing 0 to the queue!
consumer is consuming. 0 in the queue is consumed!
producer is producing 1 to the queue!
consumer is consuming. 1 in the queue is consumed!
producer is producing 2 to the queue!
consumer is consuming. 2 in the queue is consumed!
producer is producing 3 to the queue!
consumer is consuming. 3 in the queue is consumed!
producer is producing 4 to the queue!
consumer is consuming. 4 in the queue is consumed!
producer is producing 5 to the queue!
consumer is consuming. 5 in the queue is consumed!
producer is producing 6 to the queue!
consumer is consuming. 6 in the queue is consumed!
producer is producing 7 to the queue!
consumer is consuming. 7 in the queue is consumed!
producer is producing 8 to the queue!
consumer is consuming. 8 in the queue is consumed!
producer is producing 9 to the queue!
consumer is consuming. 9 in the queue is consumed!
producer finished!
consumer finished!
All threads done
Process finished with exit code 0
FINISHED . , .
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
@author: Frank
@contact: [email protected]
@file: 18.py
@time: 2018/7/10 8:59
demon
"""
import time
from datetime import datetime, timedelta
import queue
import threading
import random
#
_sentinel = object()
class Producer(threading.Thread):
"""
"""
def __init__(self, name, queue):
# python3
super().__init__(name=name)
self.queue = queue
def run(self):
for i in range(5):
print("%s is producing %d to the queue!" % (self.getName(), i))
self.queue.put(i)
time.sleep(random.randint(1, 10) * 0.1)
#
self.queue.put(_sentinel)
print("%s finished!" % self.getName())
class Consumer(threading.Thread):
"""
,
"""
def __init__(self, name, queue):
super().__init__(name=name)
self.queue = queue
def run(self):
while True:
# .
value = self.queue.get()
#
if value is _sentinel:
break
print("{} is consuming. {} in the queue is consumed!".format(self.getName(), value))
print("%s finished!" % self.getName())
if __name__ == '__main__':
queue = queue.Queue()
producer = Producer('producer', queue)
consumer = Consumer('consumer', queue)
consumer2 = Consumer('consumer2', queue)
producer.start()
consumer.start()
consumer2.start()
producer.join()
consumer.join()
consumer2.join()
print('All threads done')
producer is producing 0 to the queue!
consumer is consuming. 0 in the queue is consumed!
producer is producing 1 to the queue!
consumer is consuming. 1 in the queue is consumed!
producer is producing 2 to the queue!
consumer2 is consuming. 2 in the queue is consumed!
producer is producing 3 to the queue!
consumer is consuming. 3 in the queue is consumed!
producer is producing 4 to the queue!
consumer2 is consuming. 4 in the queue is consumed!
producer finished!
consumer finished!
上は2番目の消費者のところに詰まっています.1番目の消費者がフラグを持って行ってスレッドを終了したからです.消費者2はキューが空いていることに気づき、そこで待っていたので、そのままここに詰まっていた.
解決方法は、各スレッドの消費が完了すると、フラグビットをキューに戻すことで、他の消費者スレッドを退けることができる.次の例を見てみましょう.
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
@author: Frank
@contact: [email protected]
@file: 18.py
@time: 2018/7/10 8:59
demon
"""
import time
import queue
import threading
import random
#
_sentinel = object()
class Producer(threading.Thread):
"""
"""
def __init__(self, name, queue):
# python3
super().__init__(name=name)
self.queue = queue
def run(self):
for i in range(15):
print("%s is producing %d to the queue!" % (self.getName(), i))
self.queue.put(i)
time.sleep(random.randint(1, 20) * 0.1)
#
self.queue.put(_sentinel)
print("%s finished!" % self.getName())
class Consumer(threading.Thread):
"""
, .
"""
def __init__(self, name, queue):
super().__init__(name=name)
self.queue = queue
def run(self):
while True:
value = self.queue.get()
#
if value is _sentinel:
# ,
self.queue.put(value)
break
print("{} is consuming. {} in the queue is consumed!".format(self.getName(), value))
print("%s finished!" % self.getName())
if __name__ == '__main__':
queue = queue.Queue()
producer = Producer('producer', queue)
producer.start()
consumer_threads = []
for i in range(5):
consumer = Consumer('consumer_' + str(i), queue)
consumer_threads.append(consumer)
consumer.start()
producer.join()
for consumer in consumer_threads:
consumer.join()
producer.join()
print('All threads done')
:
producer is producing 0 to the queue!
consumer_0 is consuming. 0 in the queue is consumed!
producer is producing 1 to the queue!
consumer_0 is consuming. 1 in the queue is consumed!
producer is producing 2 to the queue!
consumer_1 is consuming. 2 in the queue is consumed!
producer is producing 3 to the queue!
consumer_2 is consuming. 3 in the queue is consumed!
producer is producing 4 to the queue!
consumer_3 is consuming. 4 in the queue is consumed!
producer is producing 5 to the queue!
consumer_4 is consuming. 5 in the queue is consumed!
producer is producing 6 to the queue!
consumer_0 is consuming. 6 in the queue is consumed!
producer is producing 7 to the queue!
consumer_1 is consuming. 7 in the queue is consumed!
producer is producing 8 to the queue!
consumer_2 is consuming. 8 in the queue is consumed!
producer is producing 9 to the queue!
consumer_3 is consuming. 9 in the queue is consumed!
producer is producing 10 to the queue!
consumer_4 is consuming. 10 in the queue is consumed!
producer is producing 11 to the queue!
consumer_0 is consuming. 11 in the queue is consumed!
producer is producing 12 to the queue!
consumer_1 is consuming. 12 in the queue is consumed!
producer is producing 13 to the queue!
consumer_2 is consuming. 13 in the queue is consumed!
producer is producing 14 to the queue!
consumer_3 is consuming. 14 in the queue is consumed!
producer finished!
consumer_4 finished!
consumer_0 finished!
consumer_1 finished!
consumer_2 finished!
consumer_3 finished!
All threads done
Process finished with exit code 0
複数の生産と複数の消費者モデルが現れると
二.複数の生産者と複数の消費者モデル
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
@author: Frank
@contact: [email protected]
@file: test_producer_consumer.py
@time: 2018/8/31 9:55
demon
2 .
"""
import time
import queue
import threading
import random
from collections import deque
#
_sentinel = object()
_sentinel2 = object()
class Producer(threading.Thread):
"""
"""
def __init__(self, name, queue):
# python3
super().__init__(name=name)
self._queue = queue
def run(self):
for i in range(5):
print("{} is producing {} to the queue!".format(self.getName(), i))
self._queue.put(i)
time.sleep(random.randint(1, 20) * 0.1)
#
self._queue.put(_sentinel)
print("%s finished!" % self.getName())
class Producer2(Producer):
def run(self):
for i in range(65, 70):
item = chr(i)
print("{} is producing {} to the queue!".format(self.getName(), item))
self._queue.put(item)
time.sleep(random.randint(1, 20) * 0.8)
#
self._queue.put(_sentinel2)
print("%s finished!" % self.getName())
class Consumer(threading.Thread):
"""
"""
_deque = deque()
def __init__(self, name, queue, lock):
super().__init__(name=name)
self._queue = queue
self._lock = lock
def run(self):
while True:
value = self._queue.get(block=True, timeout=10)
#
if value in (_sentinel, _sentinel2):
with self._lock:
if value not in Consumer._deque:
Consumer._deque.append(value)
self._queue.put(value)
if len(Consumer._deque) == 2:
print('Consumer._deque ==2 break')
break
else:
print("{} is consuming. {} in the queue is consumed!".format(self.getName(), value))
print("{} finished!".format(self.getName()))
if __name__ == '__main__':
q = queue.Queue()
lock = threading.Lock()
sentienl_queue = queue.Queue()
producer = Producer('producer111', q)
producer2 = Producer2('producer222', q)
producer2.start()
producer.start()
consumer_threads = []
for i in range(5):
consumer = Consumer('consumer_' + str(i), q, lock)
consumer_threads.append(consumer)
consumer.start()
for consumer in consumer_threads:
consumer.join()
producer.join()
producer2.join()
print('All threads done')
結果は次のとおりです.
producer222 is producing A to the queue!
producer111 is producing 0 to the queue!
consumer_0 is consuming. A in the queue is consumed!
consumer_0 is consuming. 0 in the queue is consumed!
producer111 is producing 1 to the queue!
consumer_0 is consuming. 1 in the queue is consumed!
producer111 is producing 2 to the queue!
consumer_1 is consuming. 2 in the queue is consumed!
producer111 is producing 3 to the queue!
consumer_2 is consuming. 3 in the queue is consumed!
producer111 is producing 4 to the queue!
consumer_3 is consuming. 4 in the queue is consumed!
producer111 finished!
producer222 is producing B to the queue!
consumer_4 is consuming. B in the queue is consumed!
producer222 is producing C to the queue!
consumer_4 is consuming. C in the queue is consumed!
producer222 is producing D to the queue!
consumer_1 is consuming. D in the queue is consumed!
producer222 is producing E to the queue!
consumer_0 is consuming. E in the queue is consumed!
producer222 finished!
Consumer._deque ==2 break
consumer_1 finished!
Consumer._deque ==2 break
consumer_2 finished!
Consumer._deque ==2 break
consumer_3 finished!
Consumer._deque ==2 break
consumer_4 finished!
Consumer._deque ==2 break
consumer_0 finished!
All threads done
Process finished with exit code 0
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
@author: Frank
@contact: [email protected]
@file: test_producer_consumer.py
@time: 2018/8/31 9:55
demon
2 .
"""
import time
import queue
import threading
import random
from collections import deque
#
_sentinel = object()
_sentinel2 = object()
class Producer(threading.Thread):
"""
"""
def __init__(self, name, queue):
# python3
super().__init__(name=name)
self._queue = queue
def run(self):
for i in range(5):
print("{} is producing {} to the queue!".format(self.getName(), i))
self._queue.put(i)
time.sleep(0.1)
#
self._queue.put(_sentinel)
print("%s finished!" % self.getName())
class Producer2(Producer):
def run(self):
for i in range(65, 70):
item = chr(i)
print("{} is producing {} to the queue!".format(self.getName(), item))
self._queue.put(item)
time.sleep(random.randint(1, 20) * 0.8)
#
self._queue.put(_sentinel2)
print("%s finished!" % self.getName())
class Consumer(threading.Thread):
"""
"""
_deque = deque()
def __init__(self, name, queue):
super().__init__(name=name)
self._queue = queue
@staticmethod
def consume_data(datas):
"""
, , . , .
:param datas:
:return: int
"""
return 0
def run(self):
while True:
try:
datas = self._queue.get(block=True, timeout=10)
except queue.Empty:
# print(f"queue is empty. ")
datas = None
#
if datas in (_sentinel, _sentinel2):
if datas not in Consumer._deque:
print(f'put {datas} into the Consumer._deque')
Consumer._deque.append(datas)
if len(Consumer._deque) == 2:
print('Consumer._deque length == 2 ,break, current_thread:{} has finished'.format(self.getName()))
break
else:
#
if datas:
#
sucess_number = self.consume_data(datas)
print("{} is consuming. datas: {} in the queue is consumed! sucess_number:{}".format(self.getName(), datas, sucess_number))
else:
print('datas is None...')
if len(Consumer._deque) == 2:
print('Consumer._deque length == 2 ,break, current_thread:{} has finished'.format(self.getName()))
break
if __name__ == '__main__':
q = queue.Queue()
producer = Producer('producer111', q)
producer2 = Producer2('producer222', q)
producer.start()
producer2.start()
consumer_threads = []
for i in range(5):
consumer = Consumer('consumer_' + str(i), q)
consumer_threads.append(consumer)
consumer.start()
for consumer in consumer_threads:
consumer.join()
producer2.join()
producer.join()
print('All threads done')
実行結果の一部を下図に示します.
producer222 is producing D to the queue!
consumer_0 is consuming. datas: D in the queue is consumed! sucess_number:0
datas is None...
datas is None...
datas is None...
datas is None...
datas is None...
datas is None...
datas is None...
datas is None...
datas is None...
producer222 is producing E to the queue!
consumer_0 is consuming. datas: E in the queue is consumed! sucess_number:0
producer222 finished!
put
まとめ:本稿で簡単にまとめたPythonにおける消費者と生産者モデルというプログラミングモデルの書き方は、マルチスレッドによって生産者と消費者モデルを実現する.
参考ドキュメントPythonマルチスレッドで生産者消費者モデルを実現https://segmentfault.com/a/1190000008909344
分かち合う楽しみ、残して感动します.2018-09-30 22:46:42--frank