Python 36マルチスレッド、マルチプロセスの使用シーン

16942 ワード

マルチスレッドとマルチプロセスの使用シーン
ioオペレーションはCPUを占有しない(ハードディスクから、ネットワークから、メモリからデータを読み取るのはio)計算はCPUを占有する(例えば1+1計算)
pythonのスレッドは偽のスレッドであり、異なるスレッド間の切り替えはリソースを消費する必要があります.スレッドのコンテキストを格納する必要があるため、絶えず切り替えるとリソースが消費されます.
pythonマルチスレッドはio操作が密集しているタスク(socket serverネットワークが同時に発生するなど)に適している.pythonマルチスレッドはcpu密集操作型のタスクに適しておらず、主にcpuを使用して大量の数学計算などの計算を行う.では、cpu密集型のタスクがあればどうすればいいのか、マルチプロセスで操作できます(マルチスレッドではありません).CPUに8コアがある場合、コアごとに1つのプロセスを使用することができ、各プロセスは1つのスレッドで計算することができます.プロセス間でgilロックを使用する必要はありません.プロセスは独立しており、データは共有されません.プロセスは複数作成できますが、8コアCPUでは同時に8つのタスクしか操作できません.
マルチプロセス
     

import multiprocessing
import time

def run(name):
    time.sleep(2)
    print ('heelo',name)

if __name__ == '__main__':

    for i in range(10): #  10   
        p = multiprocessing.Process(target=run,args=('bob%s' %i,))
        p.start()

    :
heelo bob1
heelo bob0
heelo bob2
heelo bob3
heelo bob5
heelo bob4
heelo bob6
heelo bob7
heelo bob8
heelo bob9

##2         ,   CPU,           ;             N        ,  CPU     。

import multiprocessing
import time,threading

def thread_run():
    print (threading.get_ident()) #get_ident      id

def run(name):
    time.sleep(2)
    print ('heelo',name)
    t = threading.Thread(target=thread_run,)    #         1   
    t.start()

if __name__ == '__main__':

    for i in range(10):     #  10   
        p = multiprocessing.Process(target=run,args=('bob%s' %i,))
        p.start()

    :
heelo bob0
16684
heelo bob1
15052
heelo bob2
15260
heelo bob3
6192
heelo bob4
6748
heelo bob7
13980
heelo bob5
6628
heelo bob6
3904
heelo bob9
2328
heelo bob8
17072

import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())  #      id
    print('process id:', os.getpid())   #     id
    print("

") def f(name): info('\033[31;1mfunction f\033[0m') print('hello', name) if __name__ == '__main__': info('\033[32;1mmain process line\033[0m') ## # p = Process(target=f, args=('bob',)) # p.start() # p.join() : main process line module name: __main__ parent process: 1136 # ID, pycharm process id: 16724 # python ## 。

from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())  #      id
    print('process id:', os.getpid())   #     id
    print("

") def f(name): info('\033[31;1mcalled from child process function f\033[0m') print('hello', name) if __name__ == '__main__': info('\033[32;1mmain process line\033[0m') p = Process(target=f, args=('bob',)) # p.start() # # p.join() : main process line module name: __main__ parent process: 1136 # pycharm process id: 14684 # python called from child process function f module name: __mp_main__ parent process: 14684 # python (1136 ) process id: 15884 #python ( 14684) 15884 ## ( ) hello bob

プロセス間通信
デフォルトのプロセス間ではデータは共有されません.必ず相互訪問を実現するにはQueueで実現できます.このQueueはスレッド内のQueueの使用方法と同じですが、スレッド内のQueueはスレッド間でしか使用できません.
  

import queue
import threading

def f():
    q.put([42,None,'heelo'])

if __name__ == '__main__':
    q = queue.Queue()     
    p = threading.Thread(target=f,)

    p.start()

    print (q.get())
    p.join()

    :
[42, None, 'heelo']
##      put    ,      get   ,              。
  

import queue
from multiprocessing import Process

def f():
    q.put([42,None,'heelo'])    #   q     

if __name__ == '__main__':
    q = queue.Queue()   #     q
    p = Process(target=f,)
    ##            ;             ,                 。
    ##       ,   p        def f()  q 。
    p.start()

    print (q.get())
    p.join()

    :
Process Process-1:
Traceback (most recent call last):
  File "D:\python3.6.4\lib\multiprocessing\process.py", line 258, in _bootstrap
    self.run()
  File "D:\python3.6.4\lib\multiprocessing\process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "E:\python\    \A3.py", line 7, in f
    q.put([42,None,'heelo'])    
NameError: name 'q' is not defined

##        ,               q

import queue
from multiprocessing import Process

def f(qq):
    qq.put([42,None,'heelo'])

if __name__ == '__main__':
    q = queue.Queue()
    p = Process(target=f,args=(q,)) #    q     

    p.start()

    print (q.get())
    p.join()

    :

Traceback (most recent call last):
  File "E:/python/    /A3.py", line 13, in 
    p.start()
  File "D:\python3.6.4\lib\multiprocessing\process.py", line 105, in start
    self._popen = self._Popen(self)
  File "D:\python3.6.4\lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "D:\python3.6.4\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "D:\python3.6.4\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
    reduction.dump(process_obj, to_child)
  File "D:\python3.6.4\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects

##           q       ,      ,         ,        。
##     q     ,       q   ,     q。

from multiprocessing import Process,Queue
##   Queue     ; queue     
##   Queue   multiprocessing  

def f(qq):
    qq.put([42,None,'heelo'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f,args=(q,)) #    q     

    p.start()

    print (q.get()) #    get      
    p.join()

    :
[42, None, 'heelo']
##     get   put      ;                 ,    。

##              。    q     ,        q    ,          q    ;             get   put      ,                     ,      put      pickle               ,                 (            )。                ,         。
                    

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello from child1'])  #     parent_conn
    conn.close()    #        

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    ##     。             ,             ,        。
    ##             。
    p = Process(target=f, args=(child_conn,))   #child_conn      ,  send   parent_conn
    p.start()
    print(parent_conn.recv())  #parent_conn   ,  recv  
    p.join()

    :
[42, None, 'hello from child1']

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello from child1'])
    conn.send([42, None, 'hello from child2'])  #      
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())  
    p.join()

    :
[42, None, 'hello from child1']
##                
from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello from child1'])
    conn.send([42, None, 'hello from child2'])  
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    print(parent_conn.recv())   #       
    p.join()

    :
[42, None, 'hello from child1']
[42, None, 'hello from child2']
##      ,         

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello from child1'])
    conn.send([42, None, 'hello from child2'])  #      
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    print(parent_conn.recv())
    print(parent_conn.recv())   #      ,      
    p.join()

    :
[42, None, 'hello from child1']
[42, None, 'hello from child2']
##      ,           。

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello from child1'])
    conn.send([42, None, 'hello from child2'])  #      
    print (conn.recv()) #    
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    print(parent_conn.recv())
    parent_conn.send("data from parent_conn")   #    
    p.join()

    :
[42, None, 'hello from child1']
[42, None, 'hello from child2']
data from parent_conn

##               (       )

プロセス間のデータ相互作用と共有
from multiprocessing import Process, Manager
import os

def f(d, l):
    d[1] = '1'  #  key value     
    d['2'] = 2
    d[0.25] = None

    l.append(os.getpid()) #      id      ;     id    。
    print(l)

if __name__ == '__main__':
    with Manager() as manager:  #     ,  manager    Manager()
        d = manager.dict()  #                    

        l = manager.list(range(5))  #                    ;  range(5)      5   
        p_list = []
        for i in range(10): #  10   
            p = Process(target=f, args=(d, l))  #            ,          
            p.start()
            p_list.append(p)    #            
        for res in p_list:
            res.join()

        print(d)    #              
        print(l)    #              

    :
[0, 1, 2, 3, 4, 15788] 
#            0-4 5   ;           pid     。
[0, 1, 2, 3, 4, 15788, 1568]
[0, 1, 2, 3, 4, 15788, 1568, 7196]
[0, 1, 2, 3, 4, 15788, 1568, 7196, 6544]
[0, 1, 2, 3, 4, 15788, 1568, 7196, 6544, 9568]
[0, 1, 2, 3, 4, 15788, 1568, 7196, 6544, 9568, 16952]
[0, 1, 2, 3, 4, 15788, 1568, 7196, 6544, 9568, 16952, 15704]
[0, 1, 2, 3, 4, 15788, 1568, 7196, 6544, 9568, 16952, 15704, 14412]
[0, 1, 2, 3, 4, 15788, 1568, 7196, 6544, 9568, 16952, 15704, 14412, 5368]
[0, 1, 2, 3, 4, 15788, 1568, 7196, 6544, 9568, 16952, 15704, 14412, 5368, 3092] # 10          10    pid
{1: '1', '2': 2, 0.25: None}    #       
[0, 1, 2, 3, 4, 15788, 1568, 7196, 6544, 9568, 16952, 15704, 14412, 5368, 3092] #       

from multiprocessing import Process, Manager
import os

def f(d, l):
    d[os.getpid()] = os.getpid()

    l.append(os.getpid())
    print(l)

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()  #       ,  pid      

        l = manager.list(range(5))
        p_list = []
        for i in range(10):
            p = Process(target=f, args=(d, l))
            p.start()
            p_list.append(p)
        for res in p_list:
            res.join()

        print(d)
        print(l)

    :
[0, 1, 2, 3, 4, 2240]
[0, 1, 2, 3, 4, 2240, 10152]
[0, 1, 2, 3, 4, 2240, 10152, 10408]
[0, 1, 2, 3, 4, 2240, 10152, 10408, 6312]
[0, 1, 2, 3, 4, 2240, 10152, 10408, 6312, 17156]
[0, 1, 2, 3, 4, 2240, 10152, 10408, 6312, 17156, 6184]
[0, 1, 2, 3, 4, 2240, 10152, 10408, 6312, 17156, 6184, 16168]
[0, 1, 2, 3, 4, 2240, 10152, 10408, 6312, 17156, 6184, 16168, 11384]
[0, 1, 2, 3, 4, 2240, 10152, 10408, 6312, 17156, 6184, 16168, 11384, 15976]
[0, 1, 2, 3, 4, 2240, 10152, 10408, 6312, 17156, 6184, 16168, 11384, 15976, 16532]
{2240: 2240, 10152: 10152, 10408: 10408, 6312: 6312, 17156: 17156, 6184: 6184, 16168: 16168, 11384: 11384, 15976: 15976, 16532: 16532}
[0, 1, 2, 3, 4, 2240, 10152, 10408, 6312, 17156, 6184, 16168, 11384, 15976, 16532]

##                  、     。
##Manager()   ,               ;
##            ,    10   ,   10    10   。       ,  copy     ,                ,          。

プロセス同期
プロセス内にもロックがあります

from multiprocessing import Process, Lock   # multiprocessing  Lock   

def f(l, i):
    l.acquire()     #        
    print('hello world', i)
    l.release()     #   

if __name__ == '__main__':
    lock = Lock()   #   

    for num in range(10):   #  10   
        Process(target=f, args=(lock, num)).start() #              

    :
hello world 1
hello world 4
hello world 0
hello world 3
hello world 2
hello world 5
hello world 6
hello world 8
hello world 7
hello world 9
##       10   ,      ,                   。

##          ,            ,                    ;              hello world x,                      (      hello wo),           (         hello wohello world0201   )。                              。

プロセスプール
マルチプロセスを実行すると、サブプロセスはメインプロセスから完全なデータをコピーし、1つ、10つのプロセスはまだ感じていないかもしれませんが、100または1000以上のプロセスがある場合、特にオーバーヘッドが大きくなり、マルチプロセスがカートン現象を実行していることが明らかになります.
プロセスプールでは、同じ時間にCPU上で実行できるプロセスの数を設定できます.

from  multiprocessing import Process, Pool
# multiprocessing  pool

import time,os

def Foo(i):
    time.sleep(2)
    print("in process",os.getpid()) #    id
    return i + 100

def Bar(arg):
    print('-->exec done:', arg)

if __name__ == '__main__':  ##                 .py  ,              ;   .py           ,        .py  ,             。                 ,              。。
    pool = Pool(5)  #      5   

    for i in range(10): #  10   ,    pool   ,         5        (),       ,                 ,    2     。
        # pool.apply_async(func=Foo, args=(i,), callback=Bar)
        pool.apply(func=Foo, args=(i,)) #pool.apply       pool

    print('end')    #    
    pool.close()    #  pool      (close   join  ,    close        )
    pool.join()  #               ,    ,        。

    :
in process 2240
in process 3828
in process 16396
in process 11848
in process 11636
in process 2240
in process 3828
in process 16396
in process 11848
in process 11636
end

##                  ,          pool.apply。 pool.apply            。
from  multiprocessing import Process, Pool
import time,os

def Foo(i):
    time.sleep(2)
    print("in process",os.getpid())
    return i + 100

def Bar(arg):
    print('-->exec done:', arg)

if __name__ == '__main__':
    pool = Pool(5)

    for i in range(10):
        pool.apply_async(func=Foo, args=(i,))
        ##   pool.apply_async      

    print('end')
    pool.close()
    # pool.join()      

    :
end
##     print('end')  ,           ,               ,   pool.close()     ,close               。
##               ,    pool.join()
from  multiprocessing import Process, Pool
import time,os

def Foo(i):
    time.sleep(2)
    print("in process",os.getpid())
    return i + 100

def Bar(arg):
    print('-->exec done:', arg)

if __name__ == '__main__':
    pool = Pool(5)

    for i in range(10):
        pool.apply_async(func=Foo, args=(i,))

    print('end')
    pool.close()
    pool.join()

    :

end
in process 13272
in process 14472
in process 3724
in process 9072
in process 15068
in process 13272
in process 14472
in process 3724
in process 9072
in process 15068

##       ,5  5       。

コールバック
from  multiprocessing import Process, Pool
import time,os

def Foo(i):
    time.sleep(2)
    print("in process",os.getpid())
    return i + 100

def Bar(arg):
    print('-->exec done:', arg,os.getpid())

if __name__ == '__main__':
    pool = Pool(5)

    print ("   :",os.getpid())  #     id
    for i in range(10):
        pool.apply_async(func=Foo, args=(i,),callback=Bar)
        ##callback    ,       func=Foo ,    callback=Bar(               )。
        ##                     ,        ,        ;          ,     。
        ##   、              ,          !           , 10             ,                 ,           ,            。
        ##               ,                     ,             ,          ,                  ,          。

    print('end')
    pool.close()
    pool.join()

    :
   : 12776              #    12766
end
in process 7496
-->exec done: 100 12776     #                 
in process 3324
-->exec done: 101 12776
in process 16812
-->exec done: 102 12776
in process 10876
-->exec done: 103 12776
in process 8200
-->exec done: 104 12776
in process 7496
-->exec done: 105 12776
in process 3324
-->exec done: 106 12776
in process 16812
-->exec done: 107 12776
in process 10876
-->exec done: 108 12776
in process 8200
-->exec done: 109 12776

転載先:https://blog.51cto.com/daimalaobing/2087354