multiprocessingで一つのtqdmを複数プロセスから更新する


複数プロセス(worker)を並列実行して,一つのプログレスバーを更新したい.

overview

  • Pool + imap_unordered()という手もあるが,
    • 引数にリストなどのiterableを渡さなければならない
    • 一つのiterableしかimap_unordered()の引数に渡せない.
    • workerの引数も一つしか渡せない.(単純にやるからだけど)
  • そこでProcess + queueを使ってみる.
    • リストの代わりにqueueに詰める.
    • tqdmManagerで管理する.
    • workerにqueuetqdmも渡す.
      • workerの中でtqdmをupdate

motivation

pool.imap_unordered()を使っていたけれども,各workerに渡すのが別々のものと共通のものとが混在していて,iterableを作る効率が悪い.

そこで別々のものはqueueに詰めてしまって,各workerはそこから取り出すことにする.共通のものは単にworkerの引数にわたすだけ.

code

ではまず必要なモジュールのインポート.

モジュールのインポート
from multiprocessing import Process, Manager
from multiprocessing.managers import BaseManager
from tqdm import tqdm
import queue

次にworkerの定義.

  • 引数はキューq,ロックオブジェクト,tqdmのプログレスバー,worker番号
    • worker番号iは単なる例.なんでもよい.
  • キューから取り出す.キューが空なら即終了.キューが空になるまでループ.
  • 取り出したデータを処理
  • lockしてtqdmのプログレスバーを更新
    • lockしないと更新表示がずれる
def worker(q, lock, pbar, i):

    while True:
        try:
            item = q.get(timeout=0)
        except queue.Empty:
            return

        # ここでitemを処理

        with lock:
            pbar.update(1)
            pbar.set_postfix_str('{0:3d} {1:3d}'.format(item, i))

tqdmをmanagerで管理するための定義.

class TqdmManager(BaseManager):
    # https://docs.python.org/ja/3/library/multiprocessing.html#customized-managers
    pass


TqdmManager.register('Tqdm', tqdm)

ではメイン.

  • キューq,ロック,tqdmプログレスバーpbarのオブジェクトをmanagerで作成.
  • qにデータを詰める
  • Processで個数num_workerのプロセスを生成.
    • 引数はargsで指定.ここでiterable以外にいろいろ渡せる.
  • workerをスタート.
    • joinで終了まで待つ.
    • closeでクリーンアップ.
  • 最後にpbarclose
メイン
if __name__ == '__main__':

    with TqdmManager() as tqdm_manager, Manager() as manager:

        num_workers = 13
        len_data = 1182

        q = manager.Queue()
        lock = manager.Lock()
        pbar = tqdm_manager.Tqdm(total=len_data)

        for item in range(len_data):
            q.put(item)

        p_all = [Process(target=worker,
                         args=(q, lock, pbar, i))
                 for i in range(num_workers)]
        [p.start() for p in p_all]
        [p.join() for p in p_all]
        [p.close() for p in p_all]

        pbar.close()

execute

$ python3 ./queue_test.py
100%|█████████████████████████████████| 1182/1182 [00:01<00:00, 601.64it/s, 1181   1]
$ 

limitation

managerを使うと,そのための別プロセスが起動するのでやや遅くなる.

misc

  • tqdmBaseManagerで派生して管理するという例は見かけない.なんでもmanagerにできるので便利.
  • [p.start() for p in p_all]というインライン表記もあまり見かけない.なぜ?

Yet another version

queueにデータを入れるのが時間かかる場合,先にworkerをスタートしてからqueueにデータを投げ入れる.

  • queueに入れればすぐにworkerに受け渡される
  • workerではq.gettimeoutを1程度にしておく(データ待ちの時間が発生するため)
  • queueに入れるデータが尽きたら,queueに終了目印のNoneを入れておく.
    • workerの方はNoneが来たら即終了
queue_test.py
from multiprocessing import Process, Manager
from tqdm import tqdm
import queue
from multiprocessing.managers import BaseManager


def worker(q, lock, pbar, i):

    while True:
        try:
            item = q.get(timeout=1)
        except queue.Empty:
            return
        if item is None:
            return

        # ここでitemを処理

        with lock:
            pbar.update(1)
            pbar.set_postfix_str('{0:3d} {1:3d}'.format(item, i))


class TqdmManager(BaseManager):
    # https://docs.python.org/ja/3/library/multiprocessing.html#customized-managers
    pass


TqdmManager.register('Tqdm', tqdm)


if __name__ == '__main__':

    with TqdmManager() as tqdm_manager, Manager() as manager:

        num_workers = 13
        len_data = 1182

        q = manager.Queue()
        lock = manager.Lock()
        pbar = tqdm_manager.Tqdm(total=len_data)

        p_all = [Process(target=worker,
                         args=(q, lock, pbar, i))
                 for i in range(num_workers)]
        [p.start() for p in p_all]

        for item in range(len_data):
            q.put(item)
        for _ in range(num_workers):
            q.put(None)

        [p.join() for p in p_all]
        [p.close() for p in p_all]

        pbar.close()