python Queueによるプロセス間通信
9163 ワード
プロセス間通信-Queue Process間では通信が必要になる場合があり、オペレーティングシステムはプロセス間の通信を実現するための多くのメカニズムを提供しています. Queueの使用はmultiprocessingモジュールのQueueを使用してマルチプロセス間のデータ伝達を実現することができ、Queue自体はメッセージキュープログラムであり、まず小さな例でQueueの動作原理を実証する:
実行結果:
説明Queue()オブジェクトを初期化する場合(例えば、q=Queue())、カッコに最大受信可能なメッセージ数が指定されていない場合、または数が負の値である場合、許容可能なメッセージ数に上限がない(メモリの終端まで).
Queue.qsize():現在のキューに含まれるメッセージの数を返します.
Queue.Empty():キューが空の場合はTrueを返し、逆にFalseを返します.
Queue.full():キューがいっぱいになったら、Trueを返し、逆にFalseを返します.
Queue.get([block[,timeout]):キュー内のメッセージを取得し、キューから削除します.blockのデフォルト値はTrueです.
1)ブロックがデフォルト値を使用する、timeout(単位秒)が設定されていない場合、メッセージ列が空の場合、プログラムはブロックされ(読み出し状態で停止)、メッセージ列からメッセージが読み出されるまで、timeoutが設定されている場合、timeout秒が待機し、メッセージがまだ読み出されていない場合、「Queue」が投げ出される.Empty異常
2)block値がFalseの場合、メッセージ列が空の場合、すぐに「Queue.Empty異常
Queue.get_nowait():かなりQueue.get(False);
Queue.put(item,[block[,timeout]):itemメッセージをキューに書き込み、blockのデフォルト値はTrue;
1)ブロックがデフォルト値を使用する、timeout(単位秒)が設定されていない場合、メッセージ列に書き込み可能なスペースがない場合、プログラムはブロック(書き込み状態で停止)され、メッセージ列からスペースが空けるまで、timeoutが設定されている場合、timeout秒が待つ、スペースがない場合は「Queue」が投げ出される.Full異常
2)block値がFalseの場合、メッセージ列に書き込み可能なスペースがない場合は、「Queue.Full異常
Queue.put_nowait(item):かなりQueue.put(item, False); 2.QueueインスタンスQueueを例に、親プロセスで2つのサブプロセスを作成します.1つはQueueにデータを書き、もう1つはQueueからデータを読みます.
実行結果:プロセスプールのQueue Poolを使用してプロセスを作成するにはmultiprocessingを使用する必要があります.Multiprocessingではなく、Manager()のQueue()です.Queue()です.そうしないと、 というエラーメッセージが表示されます.
次の例では、プロセスプール内のプロセスがどのように通信するかを示します.
実行結果:
#coding=utf-8
from multiprocessing import Queue
q=Queue(3) # Queue , put
q.put(" 1")
q.put(" 2")
print(q.full()) #False
q.put(" 3")
print(q.full()) #True
# try , try 2 , Try
try:
q.put(" 4",True,2)
except:
print(" , :%s"%q.qsize())
try:
q.put_nowait(" 4")
except:
print(" , :%s"%q.qsize())
# , ,
if not q.full():
q.put_nowait(" 4")
# , ,
if not q.empty():
for i in range(q.qsize()):
print(q.get_nowait())
実行結果:
False
True
, :3
, :3
1
2
3
説明Queue()オブジェクトを初期化する場合(例えば、q=Queue())、カッコに最大受信可能なメッセージ数が指定されていない場合、または数が負の値である場合、許容可能なメッセージ数に上限がない(メモリの終端まで).
Queue.qsize():現在のキューに含まれるメッセージの数を返します.
Queue.Empty():キューが空の場合はTrueを返し、逆にFalseを返します.
Queue.full():キューがいっぱいになったら、Trueを返し、逆にFalseを返します.
Queue.get([block[,timeout]):キュー内のメッセージを取得し、キューから削除します.blockのデフォルト値はTrueです.
1)ブロックがデフォルト値を使用する、timeout(単位秒)が設定されていない場合、メッセージ列が空の場合、プログラムはブロックされ(読み出し状態で停止)、メッセージ列からメッセージが読み出されるまで、timeoutが設定されている場合、timeout秒が待機し、メッセージがまだ読み出されていない場合、「Queue」が投げ出される.Empty異常
2)block値がFalseの場合、メッセージ列が空の場合、すぐに「Queue.Empty異常
Queue.get_nowait():かなりQueue.get(False);
Queue.put(item,[block[,timeout]):itemメッセージをキューに書き込み、blockのデフォルト値はTrue;
1)ブロックがデフォルト値を使用する、timeout(単位秒)が設定されていない場合、メッセージ列に書き込み可能なスペースがない場合、プログラムはブロック(書き込み状態で停止)され、メッセージ列からスペースが空けるまで、timeoutが設定されている場合、timeout秒が待つ、スペースがない場合は「Queue」が投げ出される.Full異常
2)block値がFalseの場合、メッセージ列に書き込み可能なスペースがない場合は、「Queue.Full異常
Queue.put_nowait(item):かなりQueue.put(item, False); 2.QueueインスタンスQueueを例に、親プロセスで2つのサブプロセスを作成します.1つはQueueにデータを書き、もう1つはQueueからデータを読みます.
from multiprocessing import Process, Queue
import os, time, random
# :
def write(q):
for value in ['A', 'B', 'C']:
print 'Put %s to queue...' % value
q.put(value)
time.sleep(random.random())
# :
def read(q):
while True:
if not q.empty():
value = q.get(True)
print 'Get %s from queue.' % value
time.sleep(random.random())
else:
break
if __name__=='__main__':
# Queue, :
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# pw, :
pw.start()
# pw :
pw.join()
# pr, :
pr.start()
pr.join()
# pr , , :
print ''
print ' '
実行結果:
Put A to queue...
Put B to queue...
Put C to queue...
Get A to queue...
Get B to queue...
Get C to queue...
RuntimeError: Queue objects should only be shared between processes through inheritance.
次の例では、プロセスプール内のプロセスがどのように通信するかを示します.
#coding=utf-8
# import Queue Manager
from multiprocessing import Manager,Pool
import os,time,random
def reader(q):
print("reader (%s), (%s)"%(os.getpid(),os.getppid()))
for i in range(q.qsize()):
print("reader Queue :%s"%q.get(True))
def writer(q):
print("writer (%s), (%s)"%(os.getpid(),os.getppid()))
for i in "dongGe":
q.put(i)
if __name__=="__main__":
print("(%s) start"%os.getpid())
q=Manager().Queue() # Manager Queue
po=Pool()
# , reader , writer , reader
po.apply(writer,(q,))
po.apply(reader,(q,))
po.close()
po.join()
print("(%s) End"%os.getpid())
実行結果:
(21156) start
writer (21162), (21156)
reader (21162), (21156)
reader Queue :d
reader Queue :o
reader Queue :n
reader Queue :g
reader Queue :G
reader Queue :e
(21156) End