Pythonカスタムプロセスプール(生産者/消費者モデル)
5167 ワード
コードはすべてを説明します.
ps:
使用可能
代替.
*** Updated 2016-01-06 ***
面白い例です
Windows 7で実行した結果:
Ubuntu 14.04で実行した結果:
Windows 7で2回目の修正が成功せず、Ubuntuで修正が成功したことがわかります.uliwebの著者limodou氏によると、Windowsの下では再起動を実現するサブプロセスであるためだ.Linuxの下ではforkが実現しています.
参照先:
0、公式マルチプロセスドキュメント.
1、Python並列タスクテクニック
2、pythonでのマルチプロセス処理
3、pythonのthreadingとmultiprocessingモジュール
*** walker * 2014-05-21 ***
#encoding=utf-8
#author: walker
#date: 2014-05-21
#function:
from multiprocessing import Process, Queue, Lock
import time, os
#
class Consumer(Process):
def __init__(self, queue, ioLock):
super(Consumer, self).__init__()
self.queue = queue
self.ioLock = ioLock
def run(self):
while True:
task = self.queue.get() # ,
if isinstance(task, str) and task == 'quit':
break;
time.sleep(1) # 1
self.ioLock.acquire()
print( str(os.getpid()) + ' ' + task)
self.ioLock.release()
self.ioLock.acquire()
print 'Bye-bye'
self.ioLock.release()
#
def Producer():
queue = Queue() # /
ioLock = Lock()
subNum = 4 #
workers = build_worker_pool(queue, ioLock, subNum)
start_time = time.time()
for parent, dirnames, filenames in os.walk(r'D:\test'):
for filename in filenames:
queue.put(filename)
ioLock.acquire()
print('qsize:' + str(queue.qsize()))
ioLock.release()
while queue.qsize() > subNum * 10: #
time.sleep(1)
for worker in workers:
queue.put('quit')
for worker in workers:
worker.join()
ioLock.acquire()
print('Done! Time taken: {}'.format(time.time() - start_time))
ioLock.release()
#
def build_worker_pool(queue, ioLock, size):
workers = []
for _ in range(size):
worker = Consumer(queue, ioLock)
worker.start()
workers.append(worker)
return workers
if __name__ == '__main__':
Producer()
ps:
self.ioLock.acquire()
...
self.ioLock.release()
使用可能
with self.ioLock:
...
代替.
*** Updated 2016-01-06 ***
面白い例です
#encoding=utf-8
#author: walker
#date: 2016-01-06
#function:
import os, sys, time
from multiprocessing import Pool
cur_dir_fullpath = os.path.dirname(os.path.abspath(__file__))
g_List = ['a']
# g_List
def ModifyDict_1():
global g_List
g_List.append('b')
# g_List
def ModifyDict_2():
global g_List
g_List.append('c')
#
def ProcOne(num):
print('ProcOne ' + str(num) + ', g_List:' + repr(g_List))
#
def ProcAll():
pool = Pool(processes = 4)
for i in range(1, 20):
#ProcOne(i)
#pool.apply(ProcOne, (i,))
pool.apply_async(ProcOne, (i,))
pool.close()
pool.join()
ModifyDict_1() # g_List
if __name__ == '__main__':
ModifyDict_2() # g_List
print('In main g_List :' + repr(g_List))
ProcAll()
Windows 7で実行した結果:
λ python3 demo.py
In main g_List :['a', 'b', 'c']
ProcOne 1, g_List:['a', 'b']
ProcOne 2, g_List:['a', 'b']
ProcOne 3, g_List:['a', 'b']
ProcOne 4, g_List:['a', 'b']
ProcOne 5, g_List:['a', 'b']
ProcOne 6, g_List:['a', 'b']
ProcOne 7, g_List:['a', 'b']
ProcOne 8, g_List:['a', 'b']
ProcOne 9, g_List:['a', 'b']
ProcOne 10, g_List:['a', 'b']
ProcOne 11, g_List:['a', 'b']
ProcOne 12, g_List:['a', 'b']
ProcOne 13, g_List:['a', 'b']
ProcOne 14, g_List:['a', 'b']
ProcOne 15, g_List:['a', 'b']
ProcOne 16, g_List:['a', 'b']
ProcOne 17, g_List:['a', 'b']
ProcOne 18, g_List:['a', 'b']
ProcOne 19, g_List:['a', 'b']
Ubuntu 14.04で実行した結果:
In main g_List :['a', 'b', 'c']
ProcOne 1, g_List:['a', 'b', 'c']
ProcOne 2, g_List:['a', 'b', 'c']
ProcOne 3, g_List:['a', 'b', 'c']
ProcOne 5, g_List:['a', 'b', 'c']
ProcOne 4, g_List:['a', 'b', 'c']
ProcOne 8, g_List:['a', 'b', 'c']
ProcOne 9, g_List:['a', 'b', 'c']
ProcOne 7, g_List:['a', 'b', 'c']
ProcOne 11, g_List:['a', 'b', 'c']
ProcOne 6, g_List:['a', 'b', 'c']
ProcOne 12, g_List:['a', 'b', 'c']
ProcOne 13, g_List:['a', 'b', 'c']
ProcOne 10, g_List:['a', 'b', 'c']
ProcOne 14, g_List:['a', 'b', 'c']
ProcOne 15, g_List:['a', 'b', 'c']
ProcOne 16, g_List:['a', 'b', 'c']
ProcOne 17, g_List:['a', 'b', 'c']
ProcOne 18, g_List:['a', 'b', 'c']
ProcOne 19, g_List:['a', 'b', 'c']
Windows 7で2回目の修正が成功せず、Ubuntuで修正が成功したことがわかります.uliwebの著者limodou氏によると、Windowsの下では再起動を実現するサブプロセスであるためだ.Linuxの下ではforkが実現しています.
参照先:
0、公式マルチプロセスドキュメント.
1、Python並列タスクテクニック
2、pythonでのマルチプロセス処理
3、pythonのthreadingとmultiprocessingモジュール
*** walker * 2014-05-21 ***