Python 36マルチスレッド、マルチプロセスの使用シーン
16942 ワード
マルチスレッドとマルチプロセスの使用シーン
ioオペレーションはCPUを占有しない(ハードディスクから、ネットワークから、メモリからデータを読み取るのはio)計算はCPUを占有する(例えば1+1計算)
pythonのスレッドは偽のスレッドであり、異なるスレッド間の切り替えはリソースを消費する必要があります.スレッドのコンテキストを格納する必要があるため、絶えず切り替えるとリソースが消費されます.
pythonマルチスレッドはio操作が密集しているタスク(socket serverネットワークが同時に発生するなど)に適している.pythonマルチスレッドはcpu密集操作型のタスクに適しておらず、主にcpuを使用して大量の数学計算などの計算を行う.では、cpu密集型のタスクがあればどうすればいいのか、マルチプロセスで操作できます(マルチスレッドではありません).CPUに8コアがある場合、コアごとに1つのプロセスを使用することができ、各プロセスは1つのスレッドで計算することができます.プロセス間でgilロックを使用する必要はありません.プロセスは独立しており、データは共有されません.プロセスは複数作成できますが、8コアCPUでは同時に8つのタスクしか操作できません.
マルチプロセス
プロセス間通信
デフォルトのプロセス間ではデータは共有されません.必ず相互訪問を実現するにはQueueで実現できます.このQueueはスレッド内のQueueの使用方法と同じですが、スレッド内のQueueはスレッド間でしか使用できません.
プロセス間のデータ相互作用と共有
プロセス同期
プロセス内にもロックがあります
プロセスプール
マルチプロセスを実行すると、サブプロセスはメインプロセスから完全なデータをコピーし、1つ、10つのプロセスはまだ感じていないかもしれませんが、100または1000以上のプロセスがある場合、特にオーバーヘッドが大きくなり、マルチプロセスがカートン現象を実行していることが明らかになります.
プロセスプールでは、同じ時間にCPU上で実行できるプロセスの数を設定できます.
コールバック
転載先:https://blog.51cto.com/daimalaobing/2087354
ioオペレーションはCPUを占有しない(ハードディスクから、ネットワークから、メモリからデータを読み取るのはio)計算はCPUを占有する(例えば1+1計算)
pythonのスレッドは偽のスレッドであり、異なるスレッド間の切り替えはリソースを消費する必要があります.スレッドのコンテキストを格納する必要があるため、絶えず切り替えるとリソースが消費されます.
pythonマルチスレッドはio操作が密集しているタスク(socket serverネットワークが同時に発生するなど)に適している.pythonマルチスレッドはcpu密集操作型のタスクに適しておらず、主にcpuを使用して大量の数学計算などの計算を行う.では、cpu密集型のタスクがあればどうすればいいのか、マルチプロセスで操作できます(マルチスレッドではありません).CPUに8コアがある場合、コアごとに1つのプロセスを使用することができ、各プロセスは1つのスレッドで計算することができます.プロセス間でgilロックを使用する必要はありません.プロセスは独立しており、データは共有されません.プロセスは複数作成できますが、8コアCPUでは同時に8つのタスクしか操作できません.
マルチプロセス
import multiprocessing
import time
def run(name):
time.sleep(2)
print ('heelo',name)
if __name__ == '__main__':
for i in range(10): # 10
p = multiprocessing.Process(target=run,args=('bob%s' %i,))
p.start()
:
heelo bob1
heelo bob0
heelo bob2
heelo bob3
heelo bob5
heelo bob4
heelo bob6
heelo bob7
heelo bob8
heelo bob9
##2 , CPU, ; N , CPU 。
import multiprocessing
import time,threading
def thread_run():
print (threading.get_ident()) #get_ident id
def run(name):
time.sleep(2)
print ('heelo',name)
t = threading.Thread(target=thread_run,) # 1
t.start()
if __name__ == '__main__':
for i in range(10): # 10
p = multiprocessing.Process(target=run,args=('bob%s' %i,))
p.start()
:
heelo bob0
16684
heelo bob1
15052
heelo bob2
15260
heelo bob3
6192
heelo bob4
6748
heelo bob7
13980
heelo bob5
6628
heelo bob6
3904
heelo bob9
2328
heelo bob8
17072
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid()) # id
print('process id:', os.getpid()) # id
print("
")
def f(name):
info('\033[31;1mfunction f\033[0m')
print('hello', name)
if __name__ == '__main__':
info('\033[32;1mmain process line\033[0m') ##
# p = Process(target=f, args=('bob',))
# p.start()
# p.join()
:
main process line
module name: __main__
parent process: 1136 # ID, pycharm
process id: 16724 # python
## 。
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid()) # id
print('process id:', os.getpid()) # id
print("
")
def f(name):
info('\033[31;1mcalled from child process function f\033[0m')
print('hello', name)
if __name__ == '__main__':
info('\033[32;1mmain process line\033[0m')
p = Process(target=f, args=('bob',)) #
p.start() #
# p.join()
:
main process line
module name: __main__
parent process: 1136 # pycharm
process id: 14684 # python
called from child process function f
module name: __mp_main__
parent process: 14684 # python (1136 )
process id: 15884 #python ( 14684) 15884
## ( )
hello bob
プロセス間通信
デフォルトのプロセス間ではデータは共有されません.必ず相互訪問を実現するにはQueueで実現できます.このQueueはスレッド内のQueueの使用方法と同じですが、スレッド内のQueueはスレッド間でしか使用できません.
import queue
import threading
def f():
q.put([42,None,'heelo'])
if __name__ == '__main__':
q = queue.Queue()
p = threading.Thread(target=f,)
p.start()
print (q.get())
p.join()
:
[42, None, 'heelo']
## put , get , 。
import queue
from multiprocessing import Process
def f():
q.put([42,None,'heelo']) # q
if __name__ == '__main__':
q = queue.Queue() # q
p = Process(target=f,)
## ; , 。
## , p def f() q 。
p.start()
print (q.get())
p.join()
:
Process Process-1:
Traceback (most recent call last):
File "D:\python3.6.4\lib\multiprocessing\process.py", line 258, in _bootstrap
self.run()
File "D:\python3.6.4\lib\multiprocessing\process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "E:\python\ \A3.py", line 7, in f
q.put([42,None,'heelo'])
NameError: name 'q' is not defined
## , q
import queue
from multiprocessing import Process
def f(qq):
qq.put([42,None,'heelo'])
if __name__ == '__main__':
q = queue.Queue()
p = Process(target=f,args=(q,)) # q
p.start()
print (q.get())
p.join()
:
Traceback (most recent call last):
File "E:/python/ /A3.py", line 13, in
p.start()
File "D:\python3.6.4\lib\multiprocessing\process.py", line 105, in start
self._popen = self._Popen(self)
File "D:\python3.6.4\lib\multiprocessing\context.py", line 223, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "D:\python3.6.4\lib\multiprocessing\context.py", line 322, in _Popen
return Popen(process_obj)
File "D:\python3.6.4\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
reduction.dump(process_obj, to_child)
File "D:\python3.6.4\lib\multiprocessing\reduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects
## q , , , 。
## q , q , q。
from multiprocessing import Process,Queue
## Queue ; queue
## Queue multiprocessing
def f(qq):
qq.put([42,None,'heelo'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f,args=(q,)) # q
p.start()
print (q.get()) # get
p.join()
:
[42, None, 'heelo']
## get put ; , 。
## 。 q , q , q ; get put , , put pickle , ( )。 , 。
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello from child1']) # parent_conn
conn.close() #
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
## 。 , , 。
## 。
p = Process(target=f, args=(child_conn,)) #child_conn , send parent_conn
p.start()
print(parent_conn.recv()) #parent_conn , recv
p.join()
:
[42, None, 'hello from child1']
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello from child1'])
conn.send([42, None, 'hello from child2']) #
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv())
p.join()
:
[42, None, 'hello from child1']
##
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello from child1'])
conn.send([42, None, 'hello from child2'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv())
print(parent_conn.recv()) #
p.join()
:
[42, None, 'hello from child1']
[42, None, 'hello from child2']
## ,
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello from child1'])
conn.send([42, None, 'hello from child2']) #
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv())
print(parent_conn.recv())
print(parent_conn.recv()) # ,
p.join()
:
[42, None, 'hello from child1']
[42, None, 'hello from child2']
## , 。
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello from child1'])
conn.send([42, None, 'hello from child2']) #
print (conn.recv()) #
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv())
print(parent_conn.recv())
parent_conn.send("data from parent_conn") #
p.join()
:
[42, None, 'hello from child1']
[42, None, 'hello from child2']
data from parent_conn
## ( )
プロセス間のデータ相互作用と共有
from multiprocessing import Process, Manager
import os
def f(d, l):
d[1] = '1' # key value
d['2'] = 2
d[0.25] = None
l.append(os.getpid()) # id ; id 。
print(l)
if __name__ == '__main__':
with Manager() as manager: # , manager Manager()
d = manager.dict() #
l = manager.list(range(5)) # ; range(5) 5
p_list = []
for i in range(10): # 10
p = Process(target=f, args=(d, l)) # ,
p.start()
p_list.append(p) #
for res in p_list:
res.join()
print(d) #
print(l) #
:
[0, 1, 2, 3, 4, 15788]
# 0-4 5 ; pid 。
[0, 1, 2, 3, 4, 15788, 1568]
[0, 1, 2, 3, 4, 15788, 1568, 7196]
[0, 1, 2, 3, 4, 15788, 1568, 7196, 6544]
[0, 1, 2, 3, 4, 15788, 1568, 7196, 6544, 9568]
[0, 1, 2, 3, 4, 15788, 1568, 7196, 6544, 9568, 16952]
[0, 1, 2, 3, 4, 15788, 1568, 7196, 6544, 9568, 16952, 15704]
[0, 1, 2, 3, 4, 15788, 1568, 7196, 6544, 9568, 16952, 15704, 14412]
[0, 1, 2, 3, 4, 15788, 1568, 7196, 6544, 9568, 16952, 15704, 14412, 5368]
[0, 1, 2, 3, 4, 15788, 1568, 7196, 6544, 9568, 16952, 15704, 14412, 5368, 3092] # 10 10 pid
{1: '1', '2': 2, 0.25: None} #
[0, 1, 2, 3, 4, 15788, 1568, 7196, 6544, 9568, 16952, 15704, 14412, 5368, 3092] #
from multiprocessing import Process, Manager
import os
def f(d, l):
d[os.getpid()] = os.getpid()
l.append(os.getpid())
print(l)
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict() # , pid
l = manager.list(range(5))
p_list = []
for i in range(10):
p = Process(target=f, args=(d, l))
p.start()
p_list.append(p)
for res in p_list:
res.join()
print(d)
print(l)
:
[0, 1, 2, 3, 4, 2240]
[0, 1, 2, 3, 4, 2240, 10152]
[0, 1, 2, 3, 4, 2240, 10152, 10408]
[0, 1, 2, 3, 4, 2240, 10152, 10408, 6312]
[0, 1, 2, 3, 4, 2240, 10152, 10408, 6312, 17156]
[0, 1, 2, 3, 4, 2240, 10152, 10408, 6312, 17156, 6184]
[0, 1, 2, 3, 4, 2240, 10152, 10408, 6312, 17156, 6184, 16168]
[0, 1, 2, 3, 4, 2240, 10152, 10408, 6312, 17156, 6184, 16168, 11384]
[0, 1, 2, 3, 4, 2240, 10152, 10408, 6312, 17156, 6184, 16168, 11384, 15976]
[0, 1, 2, 3, 4, 2240, 10152, 10408, 6312, 17156, 6184, 16168, 11384, 15976, 16532]
{2240: 2240, 10152: 10152, 10408: 10408, 6312: 6312, 17156: 17156, 6184: 6184, 16168: 16168, 11384: 11384, 15976: 15976, 16532: 16532}
[0, 1, 2, 3, 4, 2240, 10152, 10408, 6312, 17156, 6184, 16168, 11384, 15976, 16532]
## 、 。
##Manager() , ;
## , 10 , 10 10 。 , copy , , 。
プロセス同期
プロセス内にもロックがあります
from multiprocessing import Process, Lock # multiprocessing Lock
def f(l, i):
l.acquire() #
print('hello world', i)
l.release() #
if __name__ == '__main__':
lock = Lock() #
for num in range(10): # 10
Process(target=f, args=(lock, num)).start() #
:
hello world 1
hello world 4
hello world 0
hello world 3
hello world 2
hello world 5
hello world 6
hello world 8
hello world 7
hello world 9
## 10 , , 。
## , , ; hello world x, ( hello wo), ( hello wohello world0201 )。 。
プロセスプール
マルチプロセスを実行すると、サブプロセスはメインプロセスから完全なデータをコピーし、1つ、10つのプロセスはまだ感じていないかもしれませんが、100または1000以上のプロセスがある場合、特にオーバーヘッドが大きくなり、マルチプロセスがカートン現象を実行していることが明らかになります.
プロセスプールでは、同じ時間にCPU上で実行できるプロセスの数を設定できます.
from multiprocessing import Process, Pool
# multiprocessing pool
import time,os
def Foo(i):
time.sleep(2)
print("in process",os.getpid()) # id
return i + 100
def Bar(arg):
print('-->exec done:', arg)
if __name__ == '__main__': ## .py , ; .py , .py , 。 , 。。
pool = Pool(5) # 5
for i in range(10): # 10 , pool , 5 (), , , 2 。
# pool.apply_async(func=Foo, args=(i,), callback=Bar)
pool.apply(func=Foo, args=(i,)) #pool.apply pool
print('end') #
pool.close() # pool (close join , close )
pool.join() # , , 。
:
in process 2240
in process 3828
in process 16396
in process 11848
in process 11636
in process 2240
in process 3828
in process 16396
in process 11848
in process 11636
end
## , pool.apply。 pool.apply 。
from multiprocessing import Process, Pool
import time,os
def Foo(i):
time.sleep(2)
print("in process",os.getpid())
return i + 100
def Bar(arg):
print('-->exec done:', arg)
if __name__ == '__main__':
pool = Pool(5)
for i in range(10):
pool.apply_async(func=Foo, args=(i,))
## pool.apply_async
print('end')
pool.close()
# pool.join()
:
end
## print('end') , , , pool.close() ,close 。
## , pool.join()
from multiprocessing import Process, Pool
import time,os
def Foo(i):
time.sleep(2)
print("in process",os.getpid())
return i + 100
def Bar(arg):
print('-->exec done:', arg)
if __name__ == '__main__':
pool = Pool(5)
for i in range(10):
pool.apply_async(func=Foo, args=(i,))
print('end')
pool.close()
pool.join()
:
end
in process 13272
in process 14472
in process 3724
in process 9072
in process 15068
in process 13272
in process 14472
in process 3724
in process 9072
in process 15068
## ,5 5 。
コールバック
from multiprocessing import Process, Pool
import time,os
def Foo(i):
time.sleep(2)
print("in process",os.getpid())
return i + 100
def Bar(arg):
print('-->exec done:', arg,os.getpid())
if __name__ == '__main__':
pool = Pool(5)
print (" :",os.getpid()) # id
for i in range(10):
pool.apply_async(func=Foo, args=(i,),callback=Bar)
##callback , func=Foo , callback=Bar( )。
## , , ; , 。
## 、 , ! , 10 , , , 。
## , , , , , 。
print('end')
pool.close()
pool.join()
:
: 12776 # 12766
end
in process 7496
-->exec done: 100 12776 #
in process 3324
-->exec done: 101 12776
in process 16812
-->exec done: 102 12776
in process 10876
-->exec done: 103 12776
in process 8200
-->exec done: 104 12776
in process 7496
-->exec done: 105 12776
in process 3324
-->exec done: 106 12776
in process 16812
-->exec done: 107 12776
in process 10876
-->exec done: 108 12776
in process 8200
-->exec done: 109 12776
転載先:https://blog.51cto.com/daimalaobing/2087354