Pythonマルチプロセス同時操作におけるプロセスプールPoolの応用

4579 ワード

Pythonでシステム管理を行う場合、特に複数のファイルディレクトリを同時に操作したり、複数のホストをリモートで制御したりすることで、並列操作で多くの時間を節約できます.操作対象の数が少ない場合は、multiprocessingのProcessを直接利用して複数のプロセスを動的に生成することができます.10個はまあまあですが、100個以上、千個以上のターゲットであれば、手動でプロセスの数を制限するのは煩雑すぎます.このとき、プロセスプールPoolが機能するときです.Poolは、ユーザーが呼び出すために指定された数のプロセスを提供し、新しいリクエストがpoolにコミットされた場合、プールが満たされていない場合、
リクエストを実行するための新しいプロセスが作成されます.ただし、プール内のプロセス数が所定の最大値に達した場合、
リクエストはプールでプロセスが終了するまで待機します.
新しいプロセスが作成されます.ここには簡単な例があります.
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 #!/usr/bin/env python #coding=utf-8     from   multiprocessing  import   Pool from   time  import   sleep   def   f(x):      for   in   range ( 10 ):          print   '%s --- %s '   %   (i, x)          sleep( 1 )     def   main():      pool  =   Pool(processes = 3 )     # set the processes max number 3      for   in   range ( 11 , 20 ):          result  =   pool.apply_async(f, (i,))      pool.close()      pool.join()      if   result.successful():          print   'successful'     if   __name__  = =   "__main__" :      main()
 
まず容量3のプロセスプールを作成し、f(i)を順次渡し、スクリプトを実行した後にps aux|grep poolを利用する.pyプロセスの状況を見ると、最大3つのプロセスしか実行されないことがわかります.pool.apply_async()はプロセスプールにターゲットリクエストを送信するために使用されます.pool.join()は、プロセスプール内のworkerプロセスの実行が完了するのを待つために使用され、メインプロセスがworkerプロセスの終了前に終了することを防止します.でも必ずjoin()はpoolで使用する必要があります.close()またはpool.terminate()の後.ここでclose()とterminate()の違いは、close()がプール内のworkerプロセスの実行が終了してからpoolを閉じるのを待つことであり、terminate()は直接閉じることである.result.successful()は呼び出し実行全体の状態を表し、workerがまだ実行されていない場合はAssertionError異常が放出されます.MultiprocessingのPoolを利用することで、同時に数百あるいは数千の並列操作を自動的に処理することができ、スクリプトの複雑さも大幅に低減することができる.
 
pythonでmultiprocessing.pool関数の紹介
 
1 apply(func[,args[,kwds]]))applyは不定パラメータを伝達するために使用され、pythonのapply関数と一致します(ただし、内蔵apply関数は2.3以降は使用を推奨しません)、メインプロセスは関数にブロックされます.for x in gen_list(l):     result = pool.apply(pool_test,(x,))print'main process'このときメインプロセスの実行プロセスは単一プロセスと一致する2 apply_async(func[,args[,kwds[,callback]]]))はapplyの使用法と一致するが、非ブロックであり、サポート結果が戻った後にコールバックされる.for x in gen_list(l):     result = pool.apply_async(pool_test,(x,))print'main process'この時点でメインプロセスループ実行中にapplyを待たないasyncの戻り結果は、メインプロセスが終了すると、サブプロセスがプログラム全体を返さなくても終了します.apply_asyncは非ブロックであるが、結果を返すgetメソッドはブロックであり、この例ではresult.get()はメインプロセスをブロックします.したがって、[x.get()for x in[pool.apply_async(pool_test,(x,))for x in gen_list(l)]を処理することができる.closeとpool.joinは、プライマリ・プロセスの終了を防止します.注意joinメソッドはcloseまたはterminateの後に呼び出さなければなりません.    for x in gen_list(l):     pool.apply_async(pool_test, (x, ))     print 'main_process'     pool.close()     pool.join()三map(func,iterable[,chunksize])mapメソッドは、組み込まれたmap関数の動作とほぼ一致し、結果が戻るまでプロセスをブロックします.ただし、2番目のパラメータはiterableとして記述されていますが、実際の使用では、キュー全体が準備されている場合にのみ、プログラムがサブプロセスを実行することがわかります.四map_async(func,iterable[,chunksize[,callback]))はmapの用法と一致するが,非ブロックである.その関連事項はapply_を参照してください.async. 五imap(func,iterable[,chunksize])はmapと異なり、imapの戻り結果はiterであり、メインプロセスでnextをアクティブに使用してサブプロセスの呼び出しを駆動する必要がある.サブプロセスが結果を返さなくても、メインプロセスはgen_List(l)のiterは引き続き行われ、python 2による.6ドキュメントの説明では、ビッグデータ量のiterableでは、chunksizeをデフォルトの1より大きく設定します.   for x in pool.imap(pool_test,gen_list(l):pass六imap_unordered(func,iterable[,chunksize])はimapと一致するが、戻り結果が反復伝達の順序と一致することは保証されない.7 close()poolを閉じて、新しいタスクを受け入れないようにします.8 terminate()は、未処理のタスクを処理しないで作業プロセスを終了します.9 join()プライマリ・プロセスはサブプロセスの終了を待つブロックであり、joinメソッドはcloseまたはterminateの後に使用されます.
 
l = range(10) def gen_list(l):     for x in l:         print 'yield', x         yield x
def pool_test(x):     print 'f2', x     time.sleep(1)
 
私は負けることができますが、私は負けません.