マルチプロセス(二)-信号伝達とプロセス制御
14088 ワード
コンテンツディレクトリ:
1. multiprocessing.Queue()
スレッドと同様に、マルチプロセスの一般的な使用パターンは、1つのタスクを複数のworkerに分割して並列に実行することです.マルチプロセスを効率的に使用するには、通常、それらの間のいくつかの通信が必要であり、これにより、作業が分割され、結果が集約されることができる.1つの簡単な方法は、キュー
multiprocessing.Queue()
を使用してメッセージを往復伝達することである.pickleでシーケンス化できる任意のオブジェクトは、キューを通過できます.import multiprocessing
class MyFancyClass:
def __init__(self, name):
self.name = name
def do_something(self):
proc_name = multiprocessing.current_process().name
print('Doing something fancy in {} for {}!'.format(
proc_name, self.name))
def worker(q):
obj = q.get()
obj.do_something()
if __name__ == '__main__':
queue = multiprocessing.Queue()
p = multiprocessing.Process(target=worker, args=(queue,))
p.start()
queue.put(MyFancyClass('Fancy Dan'))
# Wait for the worker to finish
queue.close()
queue.join_thread()
p.join()
結果:qが空の場合、q.get()が待ちます.
Doing something fancy in Process-1 for Fancy Da
2. JoinableQueue
JoinableQueueのインスタンスpは、Queueオブジェクトと同じ方法の他に、以下の方法を有する.
import multiprocessing
import time
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
# Poison pill means shutdown
print('{}: Exiting'.format(proc_name))
self.task_queue.task_done()
break
# next_task Task() , next_task __str__
print('{}: {}'.format(proc_name, next_task))
# next_task() __call__
answer = next_task()
self.task_queue.task_done()
self.result_queue.put(answer)
class Task:
def __init__(self, a, b):
self.a = a
self.b = b
def __call__(self):
time.sleep(0.1) # pretend to take time to do the work
return '{self.a} * {self.b} = {product}'.format(
self=self, product=self.a * self.b)
def __str__(self):
return '{self.a} * {self.b}'.format(self=self)
if __name__ == '__main__':
# Establish communication queues
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
# Start consumers
num_consumers = multiprocessing.cpu_count() * 2
print('Creating {} consumers'.format(num_consumers))
consumers = [
Consumer(tasks, results)
for i in range(num_consumers)
]
for w in consumers:
w.start()
# Enqueue jobs
num_jobs = 10
for i in range(num_jobs):
tasks.put(Task(i, i))
# Add a poison pill for each consumer
for i in range(num_consumers):
tasks.put(None)
# Wait for all of the tasks to finish
tasks.join()
# Start printing results
while num_jobs:
result = results.get()
print('Result:', result)
num_jobs -= 1
実行結果:
Creating 8 consumers
Consumer-4: 0 * 0
Consumer-1: 1 * 1
Consumer-2: 2 * 2
Consumer-4: 3 * 3
Consumer-1: 4 * 4
Consumer-2: 5 * 5
Consumer-1: 6 * 6
Consumer-6: 7 * 7
Consumer-4: 8 * 8
Consumer-2: 9 * 9
Consumer-1: Exiting
Consumer-4: Exiting
Consumer-6: Exiting
Consumer-2: Exiting
Consumer-5: Exiting
Consumer-8: Exiting
Consumer-3: Exiting
Consumer-7: Exiting
Result: 0 * 0 = 0
Result: 1 * 1 = 1
Result: 2 * 2 = 4
Result: 4 * 4 = 16
Result: 3 * 3 = 9
Result: 5 * 5 = 25
Result: 6 * 6 = 36
Result: 8 * 8 = 64
Result: 7 * 7 = 49
Result: 9 * 9 = 81
3.プロセス間の信号伝達イベント
Eventクラスは、プロセス間でステータス情報を伝達する簡単な方法を提供します.wait()が超過した場合、エラーは返されません.呼び出し元はis_の使用を担当します.set()イベントの状態をチェックする
import multiprocessing
import time
def wait_for_event(e):
"""Wait for the event to be set before doing anything"""
print('wait_for_event: starting')
e.wait()
print('wait_for_event: e.is_set()->', e.is_set())
def wait_for_event_timeout(e, t):
"""Wait t seconds and then timeout"""
print('wait_for_event_timeout: starting')
e.wait(t)
print('wait_for_event_timeout: e.is_set()->', e.is_set())
if __name__ == '__main__':
e = multiprocessing.Event()
w1 = multiprocessing.Process(
name='block',
target=wait_for_event,
args=(e,),
)
w1.start()
w2 = multiprocessing.Process(
name='nonblock',
target=wait_for_event_timeout,
args=(e, 2),
)
w2.start()
print('main: waiting before calling Event.set()')
time.sleep(3)
e.set()
print('main: event is set')
実行結果:
main: waiting before calling Event.set()
wait_for_event: starting
wait_for_event_timeout: starting
wait_for_event_timeout: e.is_set()-> False
main: event is set
wait_for_event: e.is_set()-> True
4.リソースへのアクセス制御ロック
複数のプロセス間で単一のリソースを共有する必要がある場合は、ロックを使用して競合するアクセスを回避できます.
import multiprocessing
import sys
def worker_with(lock):
with lock:
sys.stdout.write('Lock acquired via with
')
def worker_no_with(lock):
lock.acquire()
try:
sys.stdout.write('Lock acquired directly
')
finally:
lock.release()
if __name__ == '__main__':
lock = multiprocessing.Lock()
w = multiprocessing.Process(
target=worker_with,
args=(lock,),
)
nw = multiprocessing.Process(
target=worker_no_with,
args=(lock,),
)
w.start()
nw.start()
w.join()
nw.join()
実行結果:
Lock acquired via with
Lock acquired directly
5.同期操作Condition
cond.待ってるよnotify_all()通知は下へ実行できます
import multiprocessing
import time
def stage_1(cond):
"""perform first stage of work,
then notify stage_2 to continue
"""
name = multiprocessing.current_process().name
print('Starting', name)
with cond:
print('{} done and ready for stage 2'.format(name))
cond.notify_all()
def stage_2(cond):
"""wait for the condition telling us stage_1 is done"""
name = multiprocessing.current_process().name
print('Starting', name)
with cond:
cond.wait()
print('{} running'.format(name))
if __name__ == '__main__':
condition = multiprocessing.Condition()
s1 = multiprocessing.Process(name='s1',
target=stage_1,
args=(condition,))
s2_clients = [
multiprocessing.Process(
name='stage_2[{}]'.format(i),
target=stage_2,
args=(condition,),
)
for i in range(1, 3)
]
for c in s2_clients:
c.start()
time.sleep(1)
s1.start()
s1.join()
for c in s2_clients:
c.join()
実行結果:この例では、2つのプロセスが第2のフェーズの作業を並列に実行しますが、第1のフェーズが完了した後にのみ実行されます.
Starting stage_2[1]
Starting stage_2[2]
Starting s1
s1 done and ready for stage 2
stage_2[1] running
stage_2[2] running
6.リソースへの同時アクセスの制御Semaphore
複数のworkerが1つのリソースに一度にアクセスできるようにすることは便利ですが、数を制限する必要があります.
import multiprocessing
import time
def worker(s, i):
s.acquire()
print(multiprocessing.current_process().name + "acquire");
time.sleep(i)
print(multiprocessing.current_process().name + "release
");
s.release()
if __name__ == "__main__":
s = multiprocessing.Semaphore(2)
for i in range(5):
p = multiprocessing.Process(target = worker, args=(s, i*2))
p.start()
実行結果:
Process-2acquire
Process-3acquire
Process-2release
Process-4acquire
Process-3release
Process-1acquire
Process-1release
Process-5acquire
Process-4release
Process-5release
7.共有ステータスマネージャの管理
Managerで情報を共有することで、すべてのプロセスが表示されます.
import multiprocessing
import pprint
def worker(d, key, value):
d[key] = value
if __name__ == '__main__':
mgr = multiprocessing.Manager()
d = mgr.dict()
jobs = [
multiprocessing.Process(
target=worker,
args=(d, i, i * 2),
)
for i in range(10)
]
for j in jobs:
j.start()
for j in jobs:
j.join()
print('Results:', d
実行結果:マネージャによってリストが作成され、共有され、すべてのプロセスで更新が表示されます.辞書もサポートしています.
Results: {0: 0, 2: 4, 3: 6, 1: 2, 4: 8, 6: 12, 5: 10, 7: 14, 8: 16, 9: 18}
8.共有ネーミングスペースManager
辞書とリストに加えて、管理者は共有された名前空間を作成することもできます.
import multiprocessing
def producer(ns, event):
ns.value = 'This is the value'
event.set()
def consumer(ns, event):
try:
print('Before event: {}'.format(ns.value))
except Exception as err:
print('Before event, error:', str(err))
event.wait()
print('After event:', ns.value)
if __name__ == '__main__':
mgr = multiprocessing.Manager()
namespace = mgr.Namespace()
event = multiprocessing.Event()
p = multiprocessing.Process(
target=producer,
args=(namespace, event),
)
c = multiprocessing.Process(
target=consumer,
args=(namespace, event),
)
c.start()
p.start()
c.join()
p.join()
実行結果:別のプロセスでmgr.Namespace()はレプリケーションを行い、他のプロセスにアクセスできます.
Before event, error: 'Namespace' object has no attribute 'value'
After event: This is the value
重要なのはmgr.を知ることですNamespace()の可変値のコンテンツの更新は自動的に伝播しません.
import multiprocessing
def producer(ns, event):
# DOES NOT UPDATE GLOBAL VALUE!
ns.my_list.append('This is the value')
event.set()
def consumer(ns, event):
print('Before event:', ns.my_list)
event.wait()
print('After event :', ns.my_list)
if __name__ == '__main__':
mgr = multiprocessing.Manager()
namespace = mgr.Namespace()
namespace.my_list = []
event = multiprocessing.Event()
p = multiprocessing.Process(
target=producer,
args=(namespace, event),
)
c = multiprocessing.Process(
target=consumer,
args=(namespace, event),
)
c.start()
p.start()
c.join()
p.join()
実行結果:
Before event: []
After event : []
9.プロセスプールPool
プールクラスは、固定数のworkerを管理し、簡単な作業に使用できます.この場合、作業を分解して独立してworkerに割り当てることができます.
import multiprocessing
def do_calculation(data):
return data * 2
def start_process():
print('Starting', multiprocessing.current_process().name)
if __name__ == '__main__':
inputs = list(range(10))
print('Input :', inputs)
builtin_outputs = map(do_calculation, inputs)
print('Built-in:', builtin_outputs)
pool_size = multiprocessing.cpu_count() * 2
pool = multiprocessing.Pool(
processes=pool_size,
initializer=start_process,
)
pool_outputs = pool.map(do_calculation, inputs)
pool.close() # no more tasks
pool.join() # wrap up current tasks
print('Pool :', pool_outputs)
実行結果:プロセスの戻り値が収集され、リストとして返されます.
Input : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Built-in:
デフォルトでは、プールは一定数のworkerプロセスを作成し、より多くの作業がなくなるまでジョブを渡します.maxtasksperchildパラメータを設定すると、プールがいくつかのタスクを完了した後にworkerプロセスを再起動し、長時間実行するworkerがより多くのシステムリソースを消費することを防止します.
import multiprocessing
def do_calculation(data):
return data * 2
def start_process():
print('Starting', multiprocessing.current_process().name)
if __name__ == '__main__':
inputs = list(range(10))
print('Input :', inputs)
builtin_outputs = map(do_calculation, inputs)
print('Built-in:', builtin_outputs)
pool_size = multiprocessing.cpu_count() * 2
pool = multiprocessing.Pool(
processes=pool_size,
initializer=start_process,
maxtasksperchild=2,
)
pool_outputs = pool.map(do_calculation, inputs)
pool.close() # no more tasks
pool.join() # wrap up current tasks
print('Pool :', pool_outputs)
実行結果:労働者が割り当てられたタスクを完了すると、より多くの仕事がなくても、仕事を再開します.この出力では、10個のタスクのみにもかかわらず、1回に2つのタスクを完了できるワークが9個作成されます.
Input : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Built-in: