multiprocessingで一つのtqdmを複数プロセスから更新する
複数プロセス(worker)を並列実行して,一つのプログレスバーを更新したい.
overview
-
Pool
+ imap_unordered()
という手もあるが,
- 引数にリストなどのiterableを渡さなければならない
- 一つのiterableしか
imap_unordered()
の引数に渡せない.
- workerの引数も一つしか渡せない.(単純にやるからだけど)
- そこで
Process
+ queue
を使ってみる.
- リストの代わりに
queue
に詰める.
-
tqdm
をManager
で管理する.
- workerに
queue
とtqdm
も渡す.
- workerの中で
tqdm
をupdate
motivation
Pool
+ imap_unordered()
という手もあるが,
- 引数にリストなどのiterableを渡さなければならない
- 一つのiterableしか
imap_unordered()
の引数に渡せない. - workerの引数も一つしか渡せない.(単純にやるからだけど)
Process
+ queue
を使ってみる.
- リストの代わりに
queue
に詰める. -
tqdm
をManager
で管理する. - workerに
queue
とtqdm
も渡す.- workerの中で
tqdm
をupdate
- workerの中で
pool.imap_unordered()
を使っていたけれども,各workerに渡すのが別々のものと共通のものとが混在していて,iterableを作る効率が悪い.
そこで別々のものはqueueに詰めてしまって,各workerはそこから取り出すことにする.共通のものは単にworkerの引数にわたすだけ.
code
ではまず必要なモジュールのインポート.
モジュールのインポート
from multiprocessing import Process, Manager
from multiprocessing.managers import BaseManager
from tqdm import tqdm
import queue
次にworkerの定義.
- 引数はキューq,ロックオブジェクト,tqdmのプログレスバー,worker番号
- worker番号
i
は単なる例.なんでもよい.
- worker番号
- キューから取り出す.キューが空なら即終了.キューが空になるまでループ.
- 取り出したデータを処理
- lockしてtqdmのプログレスバーを更新
- lockしないと更新表示がずれる
def worker(q, lock, pbar, i):
while True:
try:
item = q.get(timeout=0)
except queue.Empty:
return
# ここでitemを処理
with lock:
pbar.update(1)
pbar.set_postfix_str('{0:3d} {1:3d}'.format(item, i))
tqdmをmanagerで管理するための定義.
class TqdmManager(BaseManager):
# https://docs.python.org/ja/3/library/multiprocessing.html#customized-managers
pass
TqdmManager.register('Tqdm', tqdm)
ではメイン.
- キュー
q
,ロック,tqdmプログレスバーpbar
のオブジェクトをmanagerで作成. -
q
にデータを詰める -
Process
で個数num_worker
のプロセスを生成.- 引数は
args
で指定.ここでiterable以外にいろいろ渡せる.
- 引数は
- workerをスタート.
-
join
で終了まで待つ. -
close
でクリーンアップ.
-
- 最後に
pbar
をclose
メイン
if __name__ == '__main__':
with TqdmManager() as tqdm_manager, Manager() as manager:
num_workers = 13
len_data = 1182
q = manager.Queue()
lock = manager.Lock()
pbar = tqdm_manager.Tqdm(total=len_data)
for item in range(len_data):
q.put(item)
p_all = [Process(target=worker,
args=(q, lock, pbar, i))
for i in range(num_workers)]
[p.start() for p in p_all]
[p.join() for p in p_all]
[p.close() for p in p_all]
pbar.close()
execute
$ python3 ./queue_test.py
100%|█████████████████████████████████| 1182/1182 [00:01<00:00, 601.64it/s, 1181 1]
$
limitation
$ python3 ./queue_test.py
100%|█████████████████████████████████| 1182/1182 [00:01<00:00, 601.64it/s, 1181 1]
$
managerを使うと,そのための別プロセスが起動するのでやや遅くなる.
misc
-
tqdm
をBaseManager
で派生して管理するという例は見かけない.なんでもmanagerにできるので便利.
-
[p.start() for p in p_all]
というインライン表記もあまり見かけない.なぜ?
Yet another version
tqdm
をBaseManager
で派生して管理するという例は見かけない.なんでもmanagerにできるので便利.[p.start() for p in p_all]
というインライン表記もあまり見かけない.なぜ?queueにデータを入れるのが時間かかる場合,先にworkerをスタートしてからqueueにデータを投げ入れる.
- queueに入れればすぐにworkerに受け渡される
- workerでは
q.get
のtimeout
を1程度にしておく(データ待ちの時間が発生するため) - queueに入れるデータが尽きたら,queueに終了目印の
None
を入れておく.- workerの方は
None
が来たら即終了
- workerの方は
queue_test.py
from multiprocessing import Process, Manager
from tqdm import tqdm
import queue
from multiprocessing.managers import BaseManager
def worker(q, lock, pbar, i):
while True:
try:
item = q.get(timeout=1)
except queue.Empty:
return
if item is None:
return
# ここでitemを処理
with lock:
pbar.update(1)
pbar.set_postfix_str('{0:3d} {1:3d}'.format(item, i))
class TqdmManager(BaseManager):
# https://docs.python.org/ja/3/library/multiprocessing.html#customized-managers
pass
TqdmManager.register('Tqdm', tqdm)
if __name__ == '__main__':
with TqdmManager() as tqdm_manager, Manager() as manager:
num_workers = 13
len_data = 1182
q = manager.Queue()
lock = manager.Lock()
pbar = tqdm_manager.Tqdm(total=len_data)
p_all = [Process(target=worker,
args=(q, lock, pbar, i))
for i in range(num_workers)]
[p.start() for p in p_all]
for item in range(len_data):
q.put(item)
for _ in range(num_workers):
q.put(None)
[p.join() for p in p_all]
[p.close() for p in p_all]
pbar.close()
Author And Source
この問題について(multiprocessingで一つのtqdmを複数プロセスから更新する), 我々は、より多くの情報をここで見つけました https://qiita.com/tttamaki/items/96b65e6555f9d255ffd9著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .