Pythonでのプロセスの理解:

17727 ワード

Num 01-->プロセスの作成-fork
Pythonのosモジュールには、forkを含む一般的なシステム呼び出しがカプセル化されており、Pythonプログラムでサブプロセスを簡単に作成できます.
  import os

    #   ,fork  ,  Unix/Linux/Mac   ,windows   
    pid = os.fork()

    if pid == 0:
        print('     ')
    else:
        print('     ')
        
    #    :
    #     
    #     

上記のコードは以下のように説明されています.
プログラムはosに実行する.fork()の場合、オペレーティングシステムは新しいプロセス(サブプロセス)を作成し、親プロセスのすべての情報をサブプロセスにコピーします.
次に、親プロセスとサブプロセスはfork()関数から戻り値を得ます.サブプロセスではこの値は0であり、親プロセスではサブプロセスのid番号です.
Unix/Linuxオペレーティングシステムでは、非常に特殊なfork()システム関数が提供されています.通常の関数呼び出しは、1回呼び出され、1回返されますが、fork()呼び出しは1回、2回返されます.これは、オペレーティングシステムが現在のプロセス(親プロセスと呼ばれる)を自動的に1部(子プロセスと呼ばれる)コピーし、親プロセスと子プロセス内でそれぞれ返すためです.
子プロセスは常に0を返し、親プロセスは子プロセスのIDを返します.
このようにする理由は、1つの親プロセスがforkで多くのサブプロセスを出すことができるため、親プロセスは各サブプロセスのIDをメモし、サブプロセスはgetppid()を呼び出すだけで親プロセスのIDを取得することができるからである.
Num 02-->マルチプロセスグローバル変数の変更
#coding=utf-8
import os
import time

num = 0

#   ,fork  ,  Unix/Linux/Mac   ,windows   
pid = os.fork()

if pid == 0:
    num+=1
    print('     ---num=%d'%num)
else:
    time.sleep(1)
    num+=1
    print('     ---num=%d'%num)
 #    :
 #     ---num=1
 #     ---num=1

マルチプロセスでは、各プロセスのすべてのデータ(グローバル変数を含む)が1部ずつ所有され、互いに影響しません.
Num 03-->複数回fork問題
Test 01-->forkは4つのプロセスを2回生成
#coding=utf-8
import os
import time

#   ,fork  ,  Unix/Linux/Mac   ,windows   
pid = os.fork()
if pid == 0:
    print('     fork     ')
else:
    print('     fork     ')

pid = os.fork()
if pid == 0:
    print('     fork     ')
else:
    print('     fork     ')

time.sleep(1)

Test 02-->forkは3つのプロセスを2回生成
#! /usr/bin/env python3
# -*- coding:utf-8 -*- 

import os
import time

def sing():
    print('--     fork    --')
    time.sleep(1)

def dance():
    ppid = os.fork()

    if ppid > 0:
        print('--     fork    --')
        time.sleep(1)
    elif ppid == 0:
        print('--     fork    --')
        time.sleep(1)

def main():
    pid = os.fork()
    
    if pid > 0:
        dance()
    elif pid == 0:
        sing()

if __name__ == "__main__":
    main()

Num 04-->プロセスの最初の作成方法-multiprocessing
Multiprocessingモジュールは、プロセスオブジェクトを表すProcessクラスを提供します.
#coding=utf-8
from multiprocessing import Process
import os

#          
def fun_proc(name):
    print('      ,name= %s ,pid=%d' % (name, os.getpid()))

if __name__=='__main__':
    print('    %d' % os.getpid())
    p = Process(target=fun_proc, args=('     ',))
    print('       ')
    p.start()
    p.join()
    print('      ')
#     :
#     11876
#        
#       ,name=       ,pid=14644
#       

上記のコードについて説明します.1、Processクラスでサブプロセスを作成する場合は、実行関数と関数のパラメータ(メタグループ)を1つだけ入力する必要があります.2,start()方式を呼び出してサブプロセスを開始する.3,join()メソッドは、通常、プロセス間の同期に使用されるサブプロセスが終了してから実行を続行するのを待つことができる.
Test 01-->Processの構文は次のとおりです.
Process([group [, target [, name [, args [, kwargs]]]]])
target:このプロセスインスタンスによって呼び出されたオブジェクトを表します.
args:呼び出しオブジェクトを表す位置パラメータメタグループ;
kwargs:呼び出しオブジェクトを表すキーワードパラメータ辞書;
name:現在のプロセスインスタンスの別名です.
group:ほとんどの場合、どのグループにいるかを表すために使用されません.
Processクラスの一般的な方法:
is_alive():プロセスインスタンスがまだ実行されているかどうかを判断します.
join([timeout]):プロセスインスタンスの実行が終了するのを待つか、何秒待つか.
start():プロセスインスタンスを開始します(サブプロセスを作成します).
run():targetパラメータが与えられていない場合、このオブジェクトに対してstart()メソッドを呼び出すと、オブジェクト内のrun()メソッドが実行されます.
terminate():タスクが完了するかどうかにかかわらず、すぐに終了します.
Processクラスの共通プロパティ:
name:現在のプロセスインスタンス別名、デフォルトはProcess-N、Nは1から増加する整数です.
pid:現在のプロセスインスタンスのPID値;
Test 02-->プロセスオブジェクトの作成
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Date    : 2017-04-25 16:36:47
# @Author  : xiaoke
from multiprocessing import Process
import os
from time import sleep

#          
def fun_proc(name, age, **kwargs):
    for i in range(5):
        print('      ,name= %s,age=%d ,pid=%d...' % (name, age,os.getpid()))
        print(kwargs)
        sleep(1)

if __name__=='__main__':
    print('    %d' % os.getpid())
    p = Process(target=fun_proc, args=('     ',66), kwargs={"  ":666})
    print('       ')
    p.start()
    sleep(1)
    # p.terminate()#        ,            
    p.join()
    print('      ')
    
#     :
#     7744
#        
#       ,name=      ,age=66 ,pid=8064...
# {'  ': 666}
#       ,name=      ,age=66 ,pid=8064...
# {'  ': 666}
#       ,name=      ,age=66 ,pid=8064...
# {'  ': 666}
#       ,name=      ,age=66 ,pid=8064...
# {'  ': 666}
#       ,name=      ,age=66 ,pid=8064...
# {'  ': 666}
#       

Test 03-->2つのプロセスオブジェクトの作成
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Date    : 2017-04-25 16:36:47
# @Author  : xiaoke
#coding=utf-8
from multiprocessing import Process
import time
import os

def  worker_1(interval):
    print("worker_1,   (%s),    (%s)"%(os.getppid(),os.getpid()))
    t_start = time.time()
    time.sleep(interval) #       interval 
    t_end = time.time()
    print("worker_1,     '%0.2f' "%(t_end - t_start))

def  worker_2(interval):
    print("worker_2,   (%s),    (%s)"%(os.getppid(),os.getpid()))
    t_start = time.time()
    time.sleep(interval)
    t_end = time.time()
    print("worker_2,     '%0.2f' "%(t_end - t_start))

def main():
    #       ID
    print("  ID:%s"%os.getpid())

    #        ,target                ,
    #     name  ,          Process-N,N        
    p1=Process(target=worker_1,args=(2,))
    p2=Process(target=worker_2,name="xiaoke",args=(1,))

    #  "      .start()"           ,
    #        start ,       worker_1 worker_2      
    p1.start()
    p2.start()

    #           ,  p2      ,    True
    print("p2.is_alive=%s"%p2.is_alive())
    print("p1.is_alive=%s"%p1.is_alive())

    #  p1 p2      pid
    print("--p1      pid--")
    print("p1.name=%s"%p1.name)
    print("p1.pid=%s"%p1.pid)
    print("--p2      pid--")
    print("p2.name=%s"%p2.name)
    print("p2.pid=%s"%p2.pid)

    #join        ,             p1       ,          ,            ,
    #        ,   is_alive     True,

    #               p1.join(1),
    #  p2  2          ,     1    ,   p1      ,
    #     print   True, p1     
    print("--p1        ??--")
    p1.join()
    print("p1.is_alive=%s"%p1.is_alive())
    p2.join()
    print("p2.is_alive=%s"%p2.is_alive())

if __name__ == '__main__':
    main()

#     :
#   ID:4004
# p2.is_alive=True
# p1.is_alive=True
# --p1      pid--
# p1.name=Process-1
# p1.pid=3352
# --p2      pid--
# p2.name=xiaoke
# p2.pid=6092
# --p1        ??--
# worker_2,   (4004),    (6092)
# worker_2,     '1.00' 
# worker_1,   (4004),    (3352)
# worker_1,     '2.00' 
# p1.is_alive=False
# p2.is_alive=False


Num 05-->プロセスの2番目の作成方法--自分でクラスを作成し、Processクラスを継承します.
定義:新しいプロセスを作成するには、クラスの方法も使用できます.クラスをカスタマイズしてProcessクラスを継承できます.このクラスをインスタンス化するたびに、このプロセスオブジェクトをインスタンス化するのと同じです.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Date    : 2017-04-25 16:36:47
# @Author  : xiaoke
from multiprocessing import Process
import time
import os

#  Process 
class Process_Class(Process):
    #  Process     __init__  ,              ,
    #           ,             Process ,
    #                      ,
    #                Process.__init__  ,         
    def __init__(self,interval):
        Process.__init__(self)
        #        
        self.interval = interval

    #   Process  run()  
    def run(self):
        print("   (%s)     ,    (%s)"%(os.getpid(),os.getppid()))
        t_start = time.time()
        time.sleep(self.interval)
        t_stop = time.time()
        print("   (%s)    ,  %0.2f "%(os.getpid(),t_stop-t_start))

if __name__=="__main__":
    t_start = time.time()
    print("      (%s)"%os.getpid())        
    p1 = Process_Class(2)
    #      target   Process   start()  ,         run()  ,       p1.run()
    p1.start()
    p1.join()
    t_stop = time.time()
    print("   (%s)    ,  %0.2f "%(os.getpid(),t_stop-t_start))
#    :
#       (14736)
#    (4292)     ,    (14736)
#    (4292)    ,  2.00 
#    (14736)    ,  2.11 

Num 06-->プロセスプール--Pool
作成するサブプロセスの数が少ない場合は、multiprocessingのProcessを直接利用して複数のプロセスを動的に生成できます.しかし、100~数千のターゲットであれば、手動でプロセスを作成する作業量が大きく、multiprocessingモジュールが提供するPoolメソッドを使用することができます.
Poolを初期化すると、最大プロセス数を指定できます.新しいリクエストがPoolにコミットされたとき、プールがまだいっぱいでない場合、リクエストを実行するための新しいプロセスが作成されます.ただし、プール内のプロセス数が指定した最大値に達した場合、要求はプール内のプロセスが終了するまで待機し、新しいプロセスが作成されます.
#        
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2017/4/27 8:35
# @Author  : xiaoke
# @Site    : 
# @File    : Test33.py
# @Software: PyCharm Community Edition

from multiprocessing import Pool
import os, time, random


def worker(num):
    t_start = time.time()
    print("   -%s    ,    %d" % (num, os.getpid()))
    # random.random()    0~1      
    time.sleep(random.random() * 2)
    t_stop = time.time()
    print("   -%s    ,  %0.2f" % (num, t_stop - t_start))


def main():
    #        ,     5
    p = Pool(3)
    for i in range(10):
        # Pool.apply_async(      ,(          ,))
        #      ,                    
        p.apply_async(worker, (i,))

    m_start = time.time()
    print("----start----")
    p.close()  #      ,   p        
    p.join()  #   p          ,    close    
    print("-----end-----")
    m_stop = time.time()
    print("       ,  %0.2f" % (m_stop - m_start))


if __name__ == '__main__':
    main()
#     :
# ----start----
#    -0    ,    3360
#    -1    ,    16084
#    -2    ,    12580
#    -1    ,  0.05
#    -3    ,    16084
#    -3    ,  0.80
#    -4    ,    16084
#    -2    ,  1.63
#    -5    ,    12580
#    -0    ,  1.80
#    -6    ,    3360
#    -4    ,  1.55
#    -7    ,    16084
#    -7    ,  0.07
#    -8    ,    16084
#    -8    ,  0.05
#    -9    ,    16084
#    -5    ,  1.01
#    -6    ,  1.10
#    -9    ,  1.57
# -----end-----
#        ,  4.26



multiprocessing.Pool共通関数解析:
apply_async(func[,args[,kwds]):非ブロック方式を使用してfuncを呼び出す(並列実行、ブロック方式は前のプロセスの終了を待たなければ次のプロセスを実行できない)、argsはfuncに渡されるパラメータリスト、kwdsはfuncに渡されるキーワードパラメータリストである.
apply(func[,args[,kwds]):ブロック方式でfuncを呼び出す
close():Poolを閉じて、新しいタスクを受け入れないようにします.
terminate():タスクが完了するかどうかにかかわらず、すぐに終了します.
join():メインプロセスがブロックされ、サブプロセスの終了を待つには、closeまたはterminateの後に使用する必要があります.
applyブロック方式を採用
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2017/4/27 8:35
# @Author  : xiaoke
# @Site    : 
# @File    : Test33.py
# @Software: PyCharm Community Edition

from multiprocessing import Pool
import os, time, random


def worker(num):
    t_start = time.time()
    print("   -%s    ,    %d" % (num, os.getpid()))
    # random.random()    0~1      
    time.sleep(random.random() * 2)
    t_stop = time.time()
    print("   -%s    ,  %0.2f" % (num, t_stop - t_start))


def main():
    #        ,     5
    p = Pool(3)
    for i in range(10):
        # Pool.apply_async(      ,(          ,))
        #      ,                    
        p.apply(worker, (i,))

    m_start = time.time()
    print("----start----")
    p.close()  #      ,   p        
    p.join()  #   p          ,    close    
    print("-----end-----")
    m_stop = time.time()
    print("       ,  %0.2f" % (m_stop - m_start))


if __name__ == '__main__':
    main()
#     :
#    -0    ,    4464
#    -0    ,  1.75
#    -1    ,    11640
#    -1    ,  1.33
#    -2    ,    8756
#    -2    ,  1.86
#    -3    ,    4464
#    -3    ,  0.70
#    -4    ,    11640
#    -4    ,  1.29
#    -5    ,    8756
#    -5    ,  0.69
#    -6    ,    4464
#    -6    ,  0.33
#    -7    ,    11640
#    -7    ,  1.83
#    -8    ,    8756
#    -8    ,  1.58
#    -9    ,    4464
#    -9    ,  1.37
# ----start----
# -----end-----
#        ,  0.08


Num 07-->プロセス間の通信--Queue
プロセス(Process)間で通信を必要とする時間があり、オペレーティングシステムはプロセス間の通信を実現するための多くのメカニズムを提供しています.Queue、Pipesなどです.Queue自体はメッセージキューです.
Test 01-->まず簡単なケースを見てみましょう.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2017/4/27 8:35
# @Author  : xiaoke
# @Site    : 
# @File    : Test33.py
# @Software: PyCharm Community Edition

from multiprocessing import Queue

q = Queue(3)  #      Queue  ,       put  
q.put("  1")
q.put("  2")
print(q.full())  # False
q.put("  3")
print(q.full())  # True

#            try      ,   try   3       ,   Try       
try:
    q.put("  4", True, 3)
except:
    print("      ,      :%s" % q.qsize())

try:
    q.put_nowait("  4")
except:
    print("      ,      :%s" % q.qsize())

#      ,           ,   
if not q.full():
    q.put_nowait("  4")

#      ,           ,   
if not q.empty():
    for i in range(q.qsize()):
        # print("    :%s" % q.get())
        print("    :%s" % q.get_nowait())
#    :
# False
# True
#       ,      :3
#       ,      :3
#     :  1
#     :  2
#     :  3


上記のコードについて説明します.
Queue()オブジェクトを初期化する場合(例えば、q=Queue())、カッコに最大受信可能なメッセージ数が指定されていない場合、または数が負の値である場合、許容可能なメッセージ数に上限がない(メモリの終端まで).
Queue.qsize():現在のキューに含まれるメッセージの数を返します.
Queue.Empty():キューが空の場合はTrueを返し、逆にFalseを返します.
Queue.full():キューがいっぱいになったら、Trueを返し、逆にFalseを返します.
Queue.get([block[,timeout]):キュー内のメッセージを取得し、キューから削除します.blockのデフォルト値はTrueです.1)blockがデフォルト値を使用し、timeout(単位秒)が設定されていない場合、メッセージ列が空の場合、プログラムはブロックされ(読み取り状態に停止)、メッセージ列からメッセージが読み出されるまで、timeoutが設定されている場合、timeout秒が待機し、メッセージがまだ読み出されていない場合、「Queue.Empty」異常が投げ出される.2)block値がFalseの場合、メッセージ列が空の場合、すぐに「Queue.Empty」異常が投げ出される.
Queue.get_nowait():かなりQueue.get(False);
Queue.put(item,[block[,timeout]):itemメッセージをキューに書き込み、blockのデフォルト値はTrue;1)ブロックがデフォルト値を使用し、timeout(単位秒)が設定されていない場合、メッセージ列に書き込み可能なスペースがない場合、プログラムはブロックされ(書き込み状態に止まり)、メッセージ列からスペースが出るまで、timeoutが設定されている場合、timeout秒が待機し、まだスペースがない場合、「Queue.Full」異常が放出される.2)block値がFalseの場合、メッセージ列に書き込み可能なスペースがなければ、すぐに「Queue.Full」異常が投げ出される.
Queue.put_nowait(item):かなりQueue.put(item, False);
Test 02-->親プロセスで2つのサブプロセスを作成し、1つはQueueにデータを書き、1つはQueueからデータを読みます.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2017/4/27 8:35
# @Author  : xiaoke
# @Site    : 
# @File    : Test33.py
# @Software: PyCharm Community Edition

from multiprocessing import Process, Queue
import os, time, random


#      
def write(q):
    for value in ['A', 'B', 'C', 'quit']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())


#      
def read(q):
    while True:
        if not q.empty():
            value = q.get(True)
            print('Get %s from queue.' % value)
            #            ,         ,    
            if value == "quit":
                break
            time.sleep(random.random())


if __name__ == '__main__':
    #      Queue,        :
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    #      pw,  :
    pw.start()
    #   pw  :
    pw.join()

    #      pr,  :
    pr.start()
    pr.join()

    print('           ')
    print("pw  is   alive:%s" % pw.is_alive())
    print("pr  is   alive:%s" % pr.is_alive())

#     :
# Put A to queue...
# Put B to queue...
# Put C to queue...
# Put quit to queue...
# Get A from queue.
# Get B from queue.
# Get C from queue.
# Get quit from queue.
#            
# pw  is   alive:False
# pr  is   alive:False


Test 03-->プロセスプールPoolのQueueでプロセス間の通信を行う
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : xiaoke
# @Site    : 
# @File    : Test33.py
# @Software: PyCharm Community Edition
from multiprocessing import Pool, Queue, Manager

import os
import time
import random


def task_write(q):
    for s in ('hello', 'python', 'world', 'quit'):
        q.put(s)  #         

        print('%s           :%s' % (os.getpid(), s))

        time.sleep(random.random() * 2)

    print('%s       ' % os.getpid())


def task_read(q):
    while True:

        msg = q.get()  #           
        print('%s           :%s' % (os.getpid(), msg))

        if msg == "quit":
            break

            time.sleep(random.random() * 2)

    print('%s       ' % os.getpid())


def main():
    # 1.        
    # q = Queue() #        
    # Manger().Queue()            
    q = Manager().Queue()

    # 2.     ,       
    my_pool = Pool(2)

    # 3.    
    #        
    my_pool.apply(task_write, args=(q,))
    my_pool.apply(task_read, args=(q,))
    #         
    # my_pool.apply_async(task_write, args=(q,))
    # my_pool.apply_async(task_read, args=(q,))

    # 4.     
    my_pool.close()

    # 5.        
    my_pool.join()


if __name__ == "__main__":
    main()


#           :
# 7380           :hello
# 11256           :hello
# 7380           :python
# 11256           :python
# 7380           :world
# 11256           :world
# 7380           :quit
# 11256           :quit
# 11256       
# 7380       

#          :
# 96           :hello
# 96           :python
# 96           :world
# 96           :quit
# 96       
# 12412           :hello
# 12412           :python
# 12412           :world
# 12412           :quit
# 12412