Pythonマルチプロセスプログラミング

16178 ワード

Process
作成プロセスのクラス:Process([group[,target[,name[,args[,kwargs]]]]])、targetは呼び出しオブジェクトを表し、argsは呼び出しオブジェクトの位置パラメータメタグループを表す.kwargsは呼び出しオブジェクトの辞書を表す.nameは別名です.グループは実質的に使用されません.
方法:is_alive()、join([timeout])、run()、start()、terminate().ここで、Processはstart()でプロセスを開始します.
 
プロパティ:authkey、daemon(start()で設定する)、exitcode(プロセスが実行時にNone、-Nの場合は被信号Nで終了)、name、pid.daemonは親プロセスが終了した後に自動的に終了し、自分が新しいプロセスを生成できない場合はstart()の前に設定する必要があります.
 
例1.1:関数を作成し、単一プロセスとする
import multiprocessing
import time

def worker(interval):
    n = 5
    while n > 0:
        print("The time is {0}".format(time.ctime()))
        time.sleep(interval)
        n -= 1

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (1,)) #target      ,args           #
    p.start()
    print ("p.pid:", p.pid)
    print ("p.name:", p.name)
    print ("p.is_alive:", p.is_alive())

結果:
p.pid: 8736
p.name: Process-1
p.is_alive: True
The time is Tue Apr 21 20:55:12 2015
The time is Tue Apr 21 20:55:15 2015
The time is Tue Apr 21 20:55:18 2015
The time is Tue Apr 21 20:55:21 2015
The time is Tue Apr 21 20:55:24 2015

 
例1.2:関数を作成して複数のプロセスとする
import multiprocessing
import time

def worker_1(interval):
    print ("worker_1")
    time.sleep(interval)
    print ("end worker_1")

def worker_2(interval):
    print ("worker_2")
    time.sleep(interval)
    print ("end worker_2")

def worker_3(interval):
    print ("worker_3")
    time.sleep(interval)
    print ("end worker_3")

if __name__ == "__main__":
    p1 = multiprocessing.Process(target = worker_1, args = (2,))
    p2 = multiprocessing.Process(target = worker_2, args = (3,))
    p3 = multiprocessing.Process(target = worker_3, args = (4,))

    p1.start()
    p2.start()
    p3.start()

    print("The number of CPU is:" + str(multiprocessing.cpu_count()))
    for p in multiprocessing.active_children():
        print("child   p.name:" + p.name + "\tp.id" + str(p.pid))
    print ("END!!!!!!!!!!!!!!!!!")

結果:
The number of CPU is:4
child   p.name:Process-3    p.id7992
child   p.name:Process-2    p.id4204
child   p.name:Process-1    p.id6380
END!!!!!!!!!!!!!!!!!
worker_1
worker_3
worker_2
end worker_1
end worker_2
end worker_3

 
例1.3:プロセスをクラスとして定義する
import multiprocessing
import time

class ClockProcess(multiprocessing.Process):
    def __init__(self, interval):
        multiprocessing.Process.__init__(self)
        self.interval = interval

    def run(self):
        n = 5
        while n > 0:
            print("the time is {0}".format(time.ctime()))
            time.sleep(self.interval)
            n -= 1

if __name__ == '__main__':
    p = ClockProcess(3)
    p.start()      

 
注意:プロセスpがstart()を呼び出すとrun()が自動的に呼び出されます.
結果
the time is Tue Apr 21 20:31:30 2015
the time is Tue Apr 21 20:31:33 2015
the time is Tue Apr 21 20:31:36 2015
the time is Tue Apr 21 20:31:39 2015
the time is Tue Apr 21 20:31:42 2015

 
 
Pool
Pythonを利用してシステム管理を行う場合、特に複数のファイルディレクトリを同時に操作したり、複数のホストを遠隔制御したりすることで、並列操作は多くの時間を節約することができる.操作対象の数が少ない場合、multiprocessingの中のProcessを直接利用して複数のプロセスを動的に生成することができ、十数個はまあまあだが、百個以上、千個以上の目標であれば、手動で制限することができるプロセスの数が煩雑すぎるため、プロセスプールの機能を発揮することができます.Poolはユーザーが呼び出すために指定された数のプロセスを提供することができます.新しい要求がpoolに提出されたとき、プールがまだいっぱいでない場合、新しいプロセスを作成して要求を実行します.しかし、プールのプロセス数が所定の最大値に達した場合、要求は待機します.プールにプロセスが終了すると、新しいプロセスが作成されます.
 
例1:プロセスプールの使用(ブロックしない)
#coding: utf-8
import multiprocessing
import time

def func(msg):
    print ("msg:", msg)
    time.sleep(1)
    print ("end")

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in range(4):
        msg = "hello %d" %(i)
        pool.apply_async(func, (msg, ))   #    ,          processes(     3),                   

    print ("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
    pool.close()
    pool.join()   #  join  ,   close  ,     。   close           pool,join           
    print ("Sub-process(es) done.")

実行結果
mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ello 0
 
msg: hello 1
msg: hello 2
end
msg: hello 3
end
end
end
Sub-process(es) done.

関数の説明:
  • apply_async(func[,args[,kwds[,callback]]])非ブロッキングであり、apply(func[,args[,kwds]])はブロッキングである(理解の違い、例1例2の結果の違いを参照)
  • close()    poolを閉じて、新しいタスクを受け入れないようにします.
  • terminate()    完了していないタスクを処理していないワークプロセスを終了します.
  • join()    メインプロセスがブロックされ、サブプロセスの終了を待機します.joinメソッドはcloseまたはterminateの後に使用します.
  • 実行説明:プロセスプールpoolを作成し、プロセス数を3に設定すると、xrange(4)は4つのオブジェクト[0,1,2,4]を相次いで生成し、4つのオブジェクトがpoolにコミットされる.poolはプロセス数が3であることを指定するため、0,1,2は直接プロセスに送られて実行され、そのうちの1つが実行された後に1つのプロセス処理オブジェクト3が空になるため、出力「msg:hello 3」が現れる「end」の後に表示されます.非ブロックであるため、メイン関数は独自に実行され、プロセスの実行を無視するので、forループを実行した後に直接「mMsg:hark~Mark~Mark~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~」と出力され、メインプログラムはpool.join()で各プロセスの結びを待ちます.
     
    例2:プロセスプールの使用(ブロック)
    #coding: utf-8
    import multiprocessing
    import time
    
    def func(msg):
        print ("msg:", msg)
        time.sleep(1)
        print ("end")
    
    if __name__ == "__main__":
        pool = multiprocessing.Pool(processes = 3)
        for i in range(4):
            msg = "hello %d" %(i)
            pool.apply(func, (msg, ))   #    ,          processes(    3),                   
    
        print ("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
        pool.close()
        pool.join()   #  join  ,   close  ,     。   close           pool,join           
        print ("Sub-process(es) done.")

    実行結果:
    msg: hello 0
    end
    msg: hello 1
    end
    msg: hello 2
    end
    msg: hello 3
    end
    Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
    Sub-process(es) done.