Python 3同時プログラミング-マルチスレッドthreading
10125 ワード
目次
1:スレッドの作成
1.1:Threadクラス作成スレッド
1.2:Threadクラスを継承するサブクラスの作成
2:スレッドの同期
2.1:ロック:ロック
2.2:デッドロック
2.3:再帰ロック:RLock
2.4:信号量:BoundedSemaphore
2.5:イベント:Event
2.6:スレッドプール
2.6.1:submitメソッド
2.6.2:map方法
同じプロセスの各スレッド間では、プライマリスレッドのアドレス空間と各種リソースを共有できます.
1:スレッドの作成
1.1:Threadクラス作成スレッド
1.2:Threadクラスを継承するサブクラスの作成
2:スレッドの同期
同じプロセスのスレッドリソースは共有されますが、共有データの操作は安全ではありません.そのため、同期操作が必要です.
2.1:ロック:ロック
ロックの注釈を外して、実行結果を見てください.
2.2:デッドロック
デッドロックとは、複数のロックがあり、複数のスレッドの中で互いに相手を待つロックです.
例:
2つのロックがあります:ロックA、ロックB;
スレッドA、スレッドBの2つのスレッドがあります.
スレッドAではロックAが取得され、スレッドBではロックBが取得され、その後、スレッドAではロックBが取得されるが、このときロックBはスレッドBで占有され、スレッドAでは待つしかない.オンラインスレッドBでは、またロックAを取得するが、ロックAはスレッドAに占有されており、このときスレッドBは待つしかない.このようにして,スレッドA,スレッドBはそれぞれ相手が所望のロックを解放するのを待っている.
2.3:再帰ロック:RLock
2.4:信号量:BoundedSemaphore
反発ロックは、同時に1つのスレッドがデータを変更することを許可し、信号量は同時に複数のスレッドが同時に実行することを許可する.
2.5:イベント:Event
2.6:スレッドプール
2.6.1:submitメソッド
2.6.2:map方法
1:スレッドの作成
1.1:Threadクラス作成スレッド
1.2:Threadクラスを継承するサブクラスの作成
2:スレッドの同期
2.1:ロック:ロック
2.2:デッドロック
2.3:再帰ロック:RLock
2.4:信号量:BoundedSemaphore
2.5:イベント:Event
2.6:スレッドプール
2.6.1:submitメソッド
2.6.2:map方法
同じプロセスの各スレッド間では、プライマリスレッドのアドレス空間と各種リソースを共有できます.
1:スレッドの作成
1.1:Threadクラス作成スレッド
# -*- coding: utf-8 -*-
from threading import Thread
import os
import time
def func(index,dic):
print(f' {index}; id={os.getpid()}')
dic['cnt'] += 1
count = 0
while True:
time.sleep(1)
if count > 2:
break
count += 1
dic['cnt'] += 1
print(f' {index};cnt=[{dic["cnt"]}]')
print(f' {index};cnt=[{dic["cnt"]}], ')
def main():
dic = {}
dic['cnt'] = 0
t_list = []
for i in range(5):
t = Thread(target=func, args=(i, dic,)) # , dic
# t.setDaemon(True) # False; True , ,
t.start()
# print(t.getName()) #
t_list.append(t)
print(f' , id={os.getpid()}')
for t in t_list:
t.join() # , 。
print(f' ;cnt=[{dic["cnt"]}], ')
if __name__ == '__main__':
main()
1.2:Threadクラスを継承するサブクラスの作成
# -*- coding: utf-8 -*-
from threading import Thread
import os
import time
class MyThread(Thread):
def __init__(self, index,dic):
super().__init__()
self.index = index
self.dic = dic
def run(self): # start()
print(f' {self.index}; id={os.getpid()}')
self.dic['cnt'] += 1
count = 0
while True:
time.sleep(1)
if count > 2:
break
count += 1
self.dic['cnt'] += 1
print(f' {self.index};cnt=[{self.dic["cnt"]}]')
print(f' {self.index};cnt=[{self.dic["cnt"]}], ')
def main():
dic = {}
dic['cnt'] = 0
t_list = []
for i in range(5):
t = MyThread(i, dic)
# t.setDaemon(True) # False; True , ,
t.start()
# print(t.getName()) #
t_list.append(t)
print(f' , id={os.getpid()}')
for t in t_list:
t.join() # , 。
print(f' ;cnt=[{dic["cnt"]}], ')
if __name__ == '__main__':
main()
2:スレッドの同期
同じプロセスのスレッドリソースは共有されますが、共有データの操作は安全ではありません.そのため、同期操作が必要です.
2.1:ロック:ロック
# -*- coding: utf-8 -*-
from threading import Thread
from multiprocessing import Lock
import os
import time
def func(index,dic,lock):
print(f' {index}; id={os.getpid()}')
while True:
try:
lock.acquire()
cnt = dic['cnt']
time.sleep(0.0001) # ,
if cnt > 0:
dic['cnt'] -= 1
print(f' {index}; :{cnt} ; ={dic["cnt"]};')
else:
lock.release()
break
lock.release()
except:
break
print(f' {index};cnt=[{dic["cnt"]}], ')
def main():
lock = Lock()
dic = {}
dic['cnt'] = 20
t_list = []
for i in range(5):
t = Thread(target=func, args=(i, dic,lock,)) # , dic
# t.setDaemon(True) # False; True , ,
t.start()
# print(t.getName()) #
t_list.append(t)
print(f' , id={os.getpid()}')
for t in t_list:
t.join() # , 。
print(f' ;cnt=[{dic["cnt"]}], ')
if __name__ == '__main__':
main()
ロックの注釈を外して、実行結果を見てください.
# -*- coding: utf-8 -*-
from threading import Thread
from multiprocessing import Lock
import os
import time
def func(index,dic,lock):
print(f' {index}; id={os.getpid()}')
while True:
try:
# lock.acquire()
cnt = dic['cnt']
time.sleep(0.0001) # ,
if cnt > 0:
dic['cnt'] -= 1
print(f' {index}; :{cnt} ; ={dic["cnt"]};')
else:
# lock.release()
break
# lock.release()
except:
break
print(f' {index};cnt=[{dic["cnt"]}], ')
def main():
lock = Lock()
dic = {}
dic['cnt'] = 20
t_list = []
for i in range(5):
t = Thread(target=func, args=(i, dic,lock,)) # , dic
# t.setDaemon(True) # False; True , ,
t.start()
# print(t.getName()) #
t_list.append(t)
print(f' , id={os.getpid()}')
for t in t_list:
t.join() # , 。
print(f' ;cnt=[{dic["cnt"]}], ')
if __name__ == '__main__':
main()
2.2:デッドロック
デッドロックとは、複数のロックがあり、複数のスレッドの中で互いに相手を待つロックです.
例:
2つのロックがあります:ロックA、ロックB;
スレッドA、スレッドBの2つのスレッドがあります.
スレッドAではロックAが取得され、スレッドBではロックBが取得され、その後、スレッドAではロックBが取得されるが、このときロックBはスレッドBで占有され、スレッドAでは待つしかない.オンラインスレッドBでは、またロックAを取得するが、ロックAはスレッドAに占有されており、このときスレッドBは待つしかない.このようにして,スレッドA,スレッドBはそれぞれ相手が所望のロックを解放するのを待っている.
# -*- coding: utf-8 -*-
from threading import Thread
from multiprocessing import Lock
import os
import time
from threading import Thread,Lock
import time
def funcA(lockA,lockB):
print(" funcA: A ")
lockA.acquire()
time.sleep(0.2)
print(" funcA: :A ")
print(" funcA: B ")
lockB.acquire() # B funcB , funcB A
print(" funcA: :B ")
lockB.release()
lockA.release()
def funcB(lockA,lockB):
print(" funcB: B ")
lockB.acquire()
time.sleep(0.2)
print(" funcB: :B ")
print(" funcB: A ") # A funcA
lockA.acquire() # A funcA , funcA B
print(" funcB: :A ")
lockA.release()
lockB.release()
def main():
lockA = Lock()
lockB = Lock()
t_a = Thread(target=funcA, args=(lockA, lockB,))
t_a.start()
t_b = Thread(target=funcB, args=(lockA, lockB,))
t_b.start()
print(" ")
if __name__ == "__main__":
main()
2.3:再帰ロック:RLock
# -*- coding: utf-8 -*-
from threading import Thread
from multiprocessing import Lock
import os
import time
from threading import Thread,RLock #
import time
def funcA(lockA,lockB):
print(" funcA: A ")
lockA.acquire()
time.sleep(0.2)
print(" funcA: :A ")
print(" funcA: B ")
lockB.acquire()
print(" funcA: :B ")
lockB.release()
lockA.release()
def funcB(lockA,lockB):
print(" funcB: B ")
lockB.acquire()
time.sleep(0.2)
print(" funcB: :B ")
print(" funcB: A ")
lockA.acquire()
print(" funcB: :A ")
lockA.release()
lockB.release()
def main():
lockA = lockB = RLock()
t_a = Thread(target=funcA, args=(lockA, lockB,))
t_a.start()
t_b = Thread(target=funcB, args=(lockA, lockB,))
t_b.start()
print(" ")
if __name__ == "__main__":
main()
2.4:信号量:BoundedSemaphore
反発ロックは、同時に1つのスレッドがデータを変更することを許可し、信号量は同時に複数のスレッドが同時に実行することを許可する.
# -*- coding: utf-8 -*-
from threading import Thread,BoundedSemaphore,active_count
import time
def func(index, semaphore):
semaphore.acquire() #
print(f" :{index} ..., :{active_count()}")
time.sleep(5) # ,
semaphore.release() #
def main():
semaphore = BoundedSemaphore(3) # 3
for i in range(20):
t = Thread(target=func, args=(i, semaphore))
t.start()
print(f" : :{active_count()}")
if __name__ == '__main__':
main()
2.5:イベント:Event
# -*- coding: utf-8 -*-
import time
from threading import Thread,Event
def light(e):
while 1:
print(' :')
time.sleep(5)
e.set() # event True ;
print(' :')
time.sleep(3)
e.clear() # event False。
def car(index,e):
if e.is_set(): # event ;
# True
print(f' ,{index}: !')
else:
print(f' {index}: !')
e.wait() # event.isSet()==False
print(f' ,{index}: !')
def main():
e = Event()
lgh = Thread(target=light, args=(e,))
lgh.start()
cnt = 0
while 1:
time.sleep(1) # 1 car
t1 = Thread(target=car, args=(cnt, e,))
t1.start()
cnt += 1
if __name__ == '__main__':
main()
2.6:スレッドプール
2.6.1:submitメソッド
# -*- coding: utf-8 -*-
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time
from threading import current_thread
def func(index):
# print(index,current_thread().ident)
time.sleep(0.1)
return [index,index**2]
if __name__ == "__main__":
t_p = ThreadPoolExecutor(max_workers=6)
t_ret_list = []
for i in range(20):
t = t_p.submit(func, i)
t_ret_list.append(t)
for ret in t_ret_list:
print(ret.result())
2.6.2:map方法
# -*- coding: utf-8 -*-
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time
from threading import current_thread
def func(index):
# print(index,current_thread().ident)
time.sleep(0.1)
return [index,index**2]
if __name__ == "__main__":
t_p = ThreadPoolExecutor(max_workers=6)
map_ret = t_p.map(func,range(20))
print(map_ret)
for ret in map_ret:
print(ret)