Python 3同時プログラミング-マルチスレッドthreading


目次
1:スレッドの作成
1.1:Threadクラス作成スレッド
1.2:Threadクラスを継承するサブクラスの作成
2:スレッドの同期
2.1:ロック:ロック
2.2:デッドロック
2.3:再帰ロック:RLock
2.4:信号量:BoundedSemaphore
2.5:イベント:Event
2.6:スレッドプール
2.6.1:submitメソッド
2.6.2:map方法
同じプロセスの各スレッド間では、プライマリスレッドのアドレス空間と各種リソースを共有できます.
1:スレッドの作成
1.1:Threadクラス作成スレッド
# -*- coding: utf-8 -*-

from threading import Thread

import os
import time

def func(index,dic):
    print(f'  {index};  id={os.getpid()}')

    dic['cnt'] += 1
    count = 0

    while True:
        time.sleep(1)
        if count > 2:
            break
        count += 1
        dic['cnt'] += 1
        print(f'  {index};cnt=[{dic["cnt"]}]')

    print(f'  {index};cnt=[{dic["cnt"]}],  ')


def main():
    dic = {}
    dic['cnt'] = 0
    t_list = []
    for i in range(5):
        t = Thread(target=func, args=(i, dic,))  #          , dic      
        # t.setDaemon(True)  #    False;   True ,      ,       
        t.start()
        # print(t.getName())  #       
        t_list.append(t)

    print(f'   ,  id={os.getpid()}')
    for t in t_list:
        t.join()  #        ,   。

    print(f'   ;cnt=[{dic["cnt"]}],  ')

if __name__ == '__main__':
    main()

1.2:Threadクラスを継承するサブクラスの作成
# -*- coding: utf-8 -*-

from threading import Thread

import os
import time


class MyThread(Thread):
    def __init__(self, index,dic):
        super().__init__()
        self.index = index
        self.dic = dic


    def run(self): #   start()            
        print(f'  {self.index};  id={os.getpid()}')

        self.dic['cnt'] += 1
        count = 0

        while True:
            time.sleep(1)
            if count > 2:
                break
            count += 1
            self.dic['cnt'] += 1
            print(f'  {self.index};cnt=[{self.dic["cnt"]}]')

        print(f'  {self.index};cnt=[{self.dic["cnt"]}],  ')


def main():
    dic = {}
    dic['cnt'] = 0
    t_list = []
    for i in range(5):
        t = MyThread(i, dic)
        # t.setDaemon(True)  #    False;   True ,      ,       
        t.start()
        # print(t.getName())  #       
        t_list.append(t)

    print(f'   ,  id={os.getpid()}')
    for t in t_list:
        t.join()  #        ,   。

    print(f'   ;cnt=[{dic["cnt"]}],  ')

if __name__ == '__main__':
    main()

2:スレッドの同期
同じプロセスのスレッドリソースは共有されますが、共有データの操作は安全ではありません.そのため、同期操作が必要です.
2.1:ロック:ロック
# -*- coding: utf-8 -*-

from threading import Thread
from multiprocessing import Lock
import os
import time

def func(index,dic,lock):

    print(f'  {index};  id={os.getpid()}')
    while True:
        try:
            lock.acquire()
            cnt = dic['cnt']
            time.sleep(0.0001)   #     ,       
            if cnt > 0:
                dic['cnt'] -= 1
                print(f'  {index};   :{cnt}  ;  ={dic["cnt"]};')
            else:
                lock.release()
                break
            lock.release()
        except:
            break

    print(f'  {index};cnt=[{dic["cnt"]}],  ')


def main():
    lock = Lock()
    dic = {}
    dic['cnt'] = 20
    t_list = []
    for i in range(5):
        t = Thread(target=func, args=(i, dic,lock,))  #          , dic      
        # t.setDaemon(True)  #    False;   True ,      ,       
        t.start()
        # print(t.getName())  #       
        t_list.append(t)

    print(f'   ,  id={os.getpid()}')
    for t in t_list:
        t.join()  #        ,   。

    print(f'   ;cnt=[{dic["cnt"]}],  ')

if __name__ == '__main__':
    main()

ロックの注釈を外して、実行結果を見てください.
# -*- coding: utf-8 -*-

from threading import Thread
from multiprocessing import Lock
import os
import time

def func(index,dic,lock):

    print(f'  {index};  id={os.getpid()}')
    while True:
        try:
            # lock.acquire()
            cnt = dic['cnt']
            time.sleep(0.0001)   #     ,       
            if cnt > 0:
                dic['cnt'] -= 1
                print(f'  {index};   :{cnt}  ;  ={dic["cnt"]};')
            else:
                # lock.release()
                break
            # lock.release()
        except:
            break

    print(f'  {index};cnt=[{dic["cnt"]}],  ')


def main():
    lock = Lock()
    dic = {}
    dic['cnt'] = 20
    t_list = []
    for i in range(5):
        t = Thread(target=func, args=(i, dic,lock,))  #          , dic      
        # t.setDaemon(True)  #    False;   True ,      ,       
        t.start()
        # print(t.getName())  #       
        t_list.append(t)

    print(f'   ,  id={os.getpid()}')
    for t in t_list:
        t.join()  #        ,   。

    print(f'   ;cnt=[{dic["cnt"]}],  ')

if __name__ == '__main__':
    main()

2.2:デッドロック
デッドロックとは、複数のロックがあり、複数のスレッドの中で互いに相手を待つロックです.
例:
2つのロックがあります:ロックA、ロックB;
スレッドA、スレッドBの2つのスレッドがあります.
スレッドAではロックAが取得され、スレッドBではロックBが取得され、その後、スレッドAではロックBが取得されるが、このときロックBはスレッドBで占有され、スレッドAでは待つしかない.オンラインスレッドBでは、またロックAを取得するが、ロックAはスレッドAに占有されており、このときスレッドBは待つしかない.このようにして,スレッドA,スレッドBはそれぞれ相手が所望のロックを解放するのを待っている. 
# -*- coding: utf-8 -*-

from threading import Thread
from multiprocessing import Lock
import os
import time


from threading import Thread,Lock
import time


def funcA(lockA,lockB):
    print(" funcA:  A ")
    lockA.acquire()
    time.sleep(0.2)
    print(" funcA:   :A ")
    print(" funcA:  B ")
    lockB.acquire()  # B   funcB    ,  funcB     A    
    print(" funcA:   :B ")
    lockB.release()
    lockA.release()

def funcB(lockA,lockB):
    print(" funcB:  B ")
    lockB.acquire()
    time.sleep(0.2)
    print(" funcB:   :B ")

    print(" funcB:  A ")  # A   funcA
    lockA.acquire()  # A   funcA    ,  funcA     B    
    print(" funcB:   :A ")

    lockA.release()
    lockB.release()

def main():
    lockA = Lock()
    lockB = Lock()

    t_a = Thread(target=funcA, args=(lockA, lockB,))
    t_a.start()

    t_b = Thread(target=funcB, args=(lockA, lockB,))
    t_b.start()

    print("     ")

if __name__ == "__main__":
    main()


2.3:再帰ロック:RLock
# -*- coding: utf-8 -*-

from threading import Thread
from multiprocessing import Lock
import os
import time


from threading import Thread,RLock  #    
import time


def funcA(lockA,lockB):
    print(" funcA:  A ")
    lockA.acquire()
    time.sleep(0.2)
    print(" funcA:   :A ")
    print(" funcA:  B ")
    lockB.acquire()
    print(" funcA:   :B ")
    lockB.release()
    lockA.release()

def funcB(lockA,lockB):
    print(" funcB:  B ")
    lockB.acquire()
    time.sleep(0.2)
    print(" funcB:   :B ")

    print(" funcB:  A ")
    lockA.acquire()
    print(" funcB:   :A ")

    lockA.release()
    lockB.release()

def main():

    lockA = lockB = RLock()

    t_a = Thread(target=funcA, args=(lockA, lockB,))
    t_a.start()

    t_b = Thread(target=funcB, args=(lockA, lockB,))
    t_b.start()

    print("     ")

if __name__ == "__main__":
    main()


2.4:信号量:BoundedSemaphore
反発ロックは、同時に1つのスレッドがデータを変更することを許可し、信号量は同時に複数のスレッドが同時に実行することを許可する.
# -*- coding: utf-8 -*-

from threading import Thread,BoundedSemaphore,active_count
import time

def func(index, semaphore):
    semaphore.acquire()   #  
    print(f"  :{index}    ...,       :{active_count()}")
    time.sleep(5) #       ,    
    semaphore.release()     #  


def main():
    semaphore = BoundedSemaphore(3)  #     3       
    for i in range(20):
        t = Thread(target=func, args=(i, semaphore))
        t.start()

    print(f"   :       :{active_count()}")

if __name__ == '__main__':
    main()

2.5:イベント:Event
# -*- coding: utf-8 -*-
import time
from threading import Thread,Event

def light(e):
    while 1:
        print('     :')
        time.sleep(5)
        e.set()   #   event     True ;
        print('     :')
        time.sleep(3)
        e.clear()  #   event     False。

def car(index,e):
    if e.is_set():  #   event    ;
        #    True
        print(f'         ,{index}:    !')
    else:
        print(f'         {index}:   !')
        e.wait()  #     event.isSet()==False   
        print(f'         ,{index}:    !')

def main():
    e = Event()
    lgh = Thread(target=light, args=(e,))
    lgh.start()
    cnt = 0

    while 1:
        time.sleep(1)  #   1    car
        t1 = Thread(target=car, args=(cnt, e,))
        t1.start()
        cnt += 1

if __name__ == '__main__':
    main()

2.6:スレッドプール
2.6.1:submitメソッド
# -*- coding: utf-8 -*-

from  concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time
from threading import current_thread


def func(index):
    # print(index,current_thread().ident)
    time.sleep(0.1)
    return [index,index**2]


if __name__ == "__main__":
    t_p = ThreadPoolExecutor(max_workers=6)
    t_ret_list = []
    for i  in range(20):
        t = t_p.submit(func, i)
        t_ret_list.append(t)

    for ret in t_ret_list:
        print(ret.result())



2.6.2:map方法
# -*- coding: utf-8 -*-

from  concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time
from threading import current_thread

def func(index):
    # print(index,current_thread().ident)
    time.sleep(0.1)
    return [index,index**2]

if __name__ == "__main__":
    t_p = ThreadPoolExecutor(max_workers=6)
    map_ret = t_p.map(func,range(20))
    print(map_ret)
    for ret in map_ret:
        print(ret)