Pythonのスレッド間の協調作業にはQueueを使う
はじめに
Twitterで一時期流行していた 100 Days Of Code なるものを先日知りました。本記事は、初学者である私が100日の学習を通してどの程度成長できるか記録を残すこと、アウトプットすることを目的とします。誤っている点、読みにくい点多々あると思います。ご指摘いただけると幸いです!
今回学習する教材
-
- 8章構成
- 本章216ページ
第5章:並行性と並列性
スレッド間の協調作業にはQueueを使う
- 8章構成
- 本章216ページ
第5章:並行性と並列性
並行作業を行うのに役立つ方法として関数パイプラインがあります。
パイプラインは、製造における組み立てラインのように働きます。
このような動作を実現するためにqueueを使います。
例として、デジタルカメラから一連の画像を取り出し、サイズを変更して、オンラインのフォトギャラリーに投稿するというシステムを考えます。
このシステムは、3段に分けたパイプラインで作れます。
第1段階では、新たな画像を取り出します。そのダウンロードした画像を第2段階のサイズ変更関数に引き渡します。サイズ変更した写真が、第3段階で使われてアップロードされます。最後に、すべての処理が終了したデータをdoneに格納します。
このプログラムのイメージは以下のようになります。
並行処理プログラムが第1段階から第3段階までの処理をチェックし、プログラムを処理します。
実は、自作のQueueクラスでこのプログラムを単純に実装しようとすると、多くの問題に直面します。
まず初めに、プログラムの終了条件を確認するために、doneのqueueをビジーウェイト(常に状態を確認すること)する必要があります。
第二に、前半のパイプラインが渋滞すると、後半のパイプラインは空になる頻度が高くなります。そうすると、後半のスレッドは空のキューをチェックするだけの、無駄な仕事をすることになります。
第三に、すべての仕事が終了した際に、各スレッドにこのループを終了したことを伝える必要があります。
単純な実装では、仕事がすべて終わっても download, resize, upload は終わったことに気付かず、ループを抜け出せない状態になります。
第四に、パイプラインの重体によって、プログラムがどこかでクラッシュする可能性があります。
Pythonの組み込みモジュールqueueのQueueクラスは、このような多くの問題をすべて解決する機能を持っています。
Queueは新たなデータが得られるまでgetメソッドをブロックすることで、ビジーウェイトを使わずに処理できます。
from threading import Thread
from queue import Queue
queue = Queue()
def consumer():
print('consumer waiting')
queue.get() # put() の後で実行される
print('Consumer done')
thread = Thread(target=consumer)
thread.start()
print('Producer putting')
# 実データの代わりにオブジェクトインスタンスを使用
queue.put(object()) # 上のget() より先に実行される
thread.join()
print('Producer done')
実行結果
consumer waiting
Producer putting
Consumer done
Producer done
パイプライン渋滞を解消するために、Queueクラスでは、2つのスレッド間での処理待ちデータの最大量を指定できます。
queue = Queue(1) # スレッド間での処理待ちデータは1つ
def consumer():
time.sleep(0.1)
queue.get() # 2番目の実行
print('Consumer got 1')
queue.get() # 4番目の実行
print('Consumer got 2')
thread = Thread(target=consumer)
thread.start()
# 処理待ちデータが1なので、putを実行した後に一度getが実行されないと2つ目のputは実行されない
queue.put(object())
print('Producer put 1') # 1番目の実行
queue.put(object())
print('Producer put 2') # 3番目の実行
thread.join()
print('Producer done')
実行結果
Producer put 1
Consumer got 1
Producer put 2
Consumer got 2
Producer done
Queueクラスのtask_doneメソッドを使うことで作業進捗の追跡をすることで、パイプラインの終端で、ポーリング(データ数を常に確認)する必要が無くなります。
in_queue = Queue()
def consumer():
print('Consumer waiting')
work = in_queue.get() # 2番目 完了
print('Consumer working')
# 作業中
print('Consumer done')
in_queue.task_done() # 3番目 完了
Thread(target=consumer).start()
# Producerは、Queueインスタンスでjoinを呼び出し、in_queueが終了するのを待つ
# 空になっていても、in_queueは、すべてのデータでtask_doneが呼ばれるまでjoinできない
in_queue.put(object())
print('Producer waiting') # 1番目 完了
in_queue.join()
print('Producer done') # 4番目 完了
実行結果
Consumer waiting
Producer waiting
Consumer working
Consumer done
Producer done
キューに対して終端を知らせる特別な要素を追加するcloseメソッドを実装することで、download, resize, upload スレッドに対していつ処理を止めるべきかを通知するようにできます。また、__iter__
メソッドを定義することで、適当な時期にtask_doneを呼び出して、キューの作業進捗を確認します。
class ClosableQueue(Queue):
SENTINEL = object()
def close(self):
self.put(self.SENTINEL)
def __iter__(self):
while True:
item = self.get()
try:
if item is self.SENTINEL:
return # スレッドを終了させる
yield item
finally:
self.task_done()
スレッドがClosableQueueクラスの振る舞いに依存するように定義し、一連の動作をするコードを書きます。
class ClosableQueue(Queue):
SENTINEL = object()
def close(self):
self.put(self.SENTINEL)
def __iter__(self):
while True:
item = self.get()
try:
if item is self.SENTINEL:
return # スレッドを終了させる
yield item
finally:
self.task_done()
class StoppableWorker(Thread):
def __init__(self, func, in_queue, out_queue):
super().__init__()
self.func = func
self.in_queue = in_queue
self.out_queue = out_queue
def run(self):
for item in self.in_queue:
result = self.func(item)
self.out_queue.put(result)
# download, resize, upload の疑似関数
def download(item):
return item
def resize(item):
return item
def upload(item):
return item
download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
done_queue = ClosableQueue()
threads = [
StoppableWorker(download, download_queue, resize_queue),
StoppableWorker(resize, resize_queue, upload_queue),
StoppableWorker(upload, upload_queue, done_queue),
]
# すべての入力作業が投入されたら、第一段階の入力キューを閉じて停止信号を送る
for thread in threads:
thread.start()
for _ in range(1000):
download_queue.put(object())
download_queue.close()
# 各段階を結合してたキューをジョインして作業終了を待つ
download_queue.join()
resize_queue.close()
resize_queue.join()
upload_queue.close()
upload_queue.join()
print(done_queue.qsize(), 'items finished')
# 1000 items finished
Queueクラスを使うことで、並行作業でキューを扱う際の問題点を気にすることなく、コードを書くことができるようになります。
Author And Source
この問題について(Pythonのスレッド間の協調作業にはQueueを使う), 我々は、より多くの情報をここで見つけました https://qiita.com/virusyun/items/f3716e4ec42b6b9cd67b著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .