python同時プログラミング--プロセス、スレッド、スレッド、ロック、プール、キュー

68551 ワード

文書ディレクトリ
  • オペレーティングシステムの概念
  • プロセス
  • multiprocessingモジュール
  • デーモン
  • マルチプロセスを使用して同時socketを実現するserver
  • ロック
  • 生産者消費者モデル
  • データ共有
  • スレッドthreadingモジュール
  • デーモンスレッドおよびスレッドロック
  • 再帰錠とデッドロック(科学者が麺を食べる)
  • キュー
  • プール
  • コモン
  • geventモジュール
  • asyncioモジュール

  • オペレーティングシステムの概念オペレーティングシステムヒューマンマシン矛盾:cpu 100%ワークI/O操作入出力メモリに対する複数のオペレーティングシステム:1つのプログラムがIOに遭遇すると、cpuを他の人に譲る順序の1つ1つの実行の構想が1台のコンピュータに共通に存在し、その中でパロディプログラムでcpuを実行させた後、もう一つのプログラムはcpuを引き続き使用してcpuの利用率を高めることができる単純な切り替えは時間を占有するが、マルチオペレーティングシステムの原理は全体的に時間を節約し、cpuの利用率を高める時空多重の概念単cpu分時オペレーティングシステム:プログラムごとに順番に1つのタイムスライスを実行し、タイムスライスが回転し、FCFSタイムスライスのリアルタイムオペレーティングシステムの分散オペレーティングシステムにユーザー体験を向上させた:大きなタスクを多くの小さなタスクに分解し、それぞれ異なるオペレーティングシステムに実行し、最後にまとめた.タスク分解可能celery python分散フレームワーク
  • プロセス:進行中のプログラム.リソースの占有にはオペレーティングシステム呼び出しpidが必要です.プロセスコンピュータ内の最小のリソース割り当て単位を一意に識別できます.複数のプログラムが同時に実行されます.1つのcpuのみが、複数のプログラムが1つのcpu上で順番に実行されます.複数のプログラムを並列に実行し、複数のcpu上で同時に実行します.同期は、あることをするときに別のことを開始し、前回のイベントが終了してから非同期を実行する必要があります.Aイベントを行うときにBイベントを開始します.Aイベントが終了するのを待つ必要はありません.Bイベントブロックcpuが動作しない非ブロックcpu動作を開始できます.同期ブロック:input sleep(cpuが動作しない)非同期ブロックスレッドはプロセスの単位です.プロセス存在から離脱できないスレッドは、コンピュータにおいてcpuによってスケジューリング可能な最小単位プロセスの3つの状態図である:準備完了実行->ブロック->準備完了->実行プロセスのスケジューリングアルゴリズムは、すべてのプロセスにリソースを割り当てるか、cpu使用権を割り当てる方法短作業優先先サービスマルチレベルフィードバックアルゴリズム
  • である.
  • multiprocessingモジュール
    # multiple      
    # processing    
    # multiprocessing            
    # from multiprocessing import Process
    # import time
    # import os
    # def func(name, age):
    #     # print(os.getpid(), os.getppid(), name, age)  # pid process id  ppid parent process id
    #     print(f'       {name, age}')
    #     time.sleep(1)
    #     print('    ')
    #
    # if __name__ == '__main__':
    #     #       
    #     # print('main:', os.getpid(), os.getppid())
    #     # p = Process(target=func, args=('alex', 82))
    #     # p.start()  #         
    #     arg_lst = [('  ', 78), ('alex', 89)]
    #     p_lst = []
    #     for arg in arg_lst:
    #         p = Process(target=func, args=arg)
    #         p.start()
    #         p_lst.append(p)
    #     for p in p_lst:
    #         p.join()
    #     print('            ')
    
    #                 
    #                
    #                              ,               
    # join   
    # p.join()  #   p    ,  p          
    #                 
    from  multiprocessing import Process
    n = 0
    def func():
        global n
        n += 1
    
    if __name__ == '__main__':
        p_lst = []
        for i in range(10):
            p = Process(target=func)
            p.start()
            p_lst.append(p)
        for p in p_lst:
            p.join()
        print(n)  # 0       n            n
    
    #             socket server
    
    プロセスを開始する別の方法
    import os
    from multiprocessing import Process
    
    class MyProcess(Process):
        def __init__(self, a, b, c):
            self.a = a
            self.b = b
            self.c = c
            super().__init__()
        def run(self):
            print(os.getppid(), os.getpid(), self.a, self.b, self.c)
    
    if __name__ == '__main__':
        print(os.getpid())
        p = MyProcess(1, 2, 3)
        p.start()
        # p.terminate() #          
    
  • .
  • デーモン
    from multiprocessing import Process
    import time
    
    def son1():
        while True:
            print('in son1')
            time.sleep(1)
    
    def son2():
        for i in range(10):
            print('in son2')
            time.sleep(1)
    
    if __name__ == '__main__':
        p1 = Process(target=son1)
        p1.daemon = True  #     p1     
        p1.start()
        p2 = Process(target=son2)
        p2.start()
        p2.join()
        time.sleep(5)
        print('in main')
        #    5  in son1
    
    #              ,           
    #                       ,           
    #             ,           ,        
    #          p2      
    
  • マルチプロセスを使用して、同時socketのserver serverを実現する.py
    import socket
    from multiprocessing import Process
    def talk(conn):
        while True:
            msg = conn.recv(1024).decode('utf-8')
            ret = msg.upper().encode('utf-8')
            conn.send(ret)
        conn.close()
    
    
    if __name__ == '__main__':
        sk = socket.socket()
        sk.bind(('127.0.0.1', 9000))
        sk.listen()
        while True:
            conn, addr = sk.accept()
            Process(target=talk, args=(conn,)).start()
        sk.close()
    
    client.py
    import socket
    import time
    
    sk = socket.socket()
    sk.connect(('127.0.0.1', 9000))
    
    while True:
        sk.send(b'hello')
        msg = sk.recv(1024).decode('utf-8')
        print(msg)
        time.sleep(0.5)
    
    sk.close()
    
  • #                 acquire  
    #            
    #      
    import json
    import time
    from multiprocessing import Process, Lock
    
    def search(i):
        with open('ticket', encoding='utf-8') as f:
            ticket = json.load(f)
        print(f"{i}:      {ticket['count']}")
    
    def buy_ticket(i):
        with open('ticket', encoding='utf-8') as f:
            ticket = json.load(f)
        if ticket['count'] > 0:
            ticket['count'] -= 1
            print(f'{i}    ')
        time.sleep(0.2)
        with open('ticket', mode='w', encoding='utf-8') as f:
            json.dump(ticket, f)
    def get_ticket(i, lock):
        search(i)
        # lock.acquire()
        # buy_ticket(i)
        # lock.release()
        #   
        with lock:  #     ,         ,                 
            buy_ticket(i)
    if __name__ == '__main__':
        lock = Lock()
        for i in range(10):
            Process(target=get_ticket, args=(i, lock)).start()
    #               
    # import time
    # from multiprocessing import Lock, Process
    #
    # def func(i, lock):
    #     lock.acquire()  #    
    #     print(f'      {i}')
    #     time.sleep(1)
    #     lock.release()  #    
    #
    #
    # if __name__ == '__main__':
    #     lock = Lock()
    #     for i in range(10):
    #         Process(target=func, args=(i, lock)).start()
    
  • 生産者消費者モデル
    #         
    
    
    #       (IPC) Inter Process Communication
        #     :                 Queue
    #     #     :                  
    # from multiprocessing import Queue, Process
    #
    # def son(q):
    #     q.put('hello')
    #
    # if __name__ == '__main__':
    #     q = Queue()
    #     Process(target=son, args=(q, )).start()
    #     print(q.get())
    #     # son   mian      
    
    #         
    #        
    #      
    #                                
    # from multiprocessing import Process, Queue
    # import time
    # import random
    #
    # def consumer(q):#     :                
    #     for i in range(10):
    #         print(q.get())
    #
    #
    # def producer(q):  #    :                     
    #     for i in range(10):
    #         time.sleep(random.random())
    #         q.put(i)
    #
    #
    # if __name__ == '__main__':
    #     q = Queue()
    #     c1 = Process(target=consumer, args=(q,))
    #     p1 = Process(target=producer, args=(q,))
    #     c1.start()
    #     p1.start()
    # import requests
    # from multiprocessing import Process, Queue
    #
    # def producer(i, url, q):
    #     ret = requests.get(url)
    #     q.put((i, ret.status_code))
    #     # print(ret.status_code)
    #
    # if __name__ == '__main__':
    #     q = Queue()
    #     url_lst = ['https://www.cnblogs.com/Eva-J/p/7277026.html',
    #                'https://blog.csdn.net/qq_31910669',
    #                'https://blog.csdn.net/qq_31910669/article/details/109136837']
    #     # for i, url in enumerate(url_lst):
    #     #     producer(i, url, '1')
    #     for index, url in enumerate(url_lst):
    #         Process(target=producer, args=(index, url, q)).start()
    #     for i in range(3):  #     ,                   
    #         print(q.get())
    import requests
    from multiprocessing import Process, Queue
    
    def producer(i, url, q):
        ret = requests.get(url)
        q.put((i, ret.text))
        # print(ret.status_code)
    
    def consumer(q):
        while True:
            res = q.get()
            if res is None:
                break
            with open(f'{res[0]}.html', mode='w', encoding='utf-8') as f:
                f.write(res[1])
    
    
    if __name__ == '__main__':
        q = Queue()
        url_dic = {
           'cn':'https://www.cnblogs.com/Eva-J/p/7277026.html',
                   'mu':'https://blog.csdn.net/qq_31910669',
                   'li':'https://blog.csdn.net/qq_31910669/article/details/109136837'}
        p = []
        for key in url_dic:
            p1 = Process(target=producer, args=(key, url_dic[key], q))
            p1.start()
            p.append(p1)
        Process(target=consumer, args=(q,)).start()
        for p2 in p:
            p2.join()
        q.put(None)
    
  • データ共有
    #                   ,  lock
    from multiprocessing import Process, Manager, Lock
    
    def change_dic(dic, lock):
        with lock:
            dic['count'] -= 1
    
    
    if __name__ == '__main__':
        m = Manager()
        dic = m.dict({
           'count':100})
        lock = Lock()
        p_lst = []
        for i in range(50):
            p = Process(target=change_dic, args=(dic, lock))
            p.start()
            p_lst.append(p)
        for p in p_lst:
            p.join()
        print(dic)
    
  • スレッドthreadingモジュール
    #   :    ,         ,      ,      
        #        multiprocessing,      start join
        #            Lock
        #         :
            #   (  )    
            #      :redis
        #          Manager
        #         
    
    
    #   
        #      :             
        #            
        #              cpu  ?      
        #       ,     ,           
    
    # CPython     
        # gc       
            #      +     
        #            gc     
        #       (GIL)global interpreter lock
            #                         cpu  
        #     IO    
    
    # threading  
    import time
    from  threading import Thread, current_thread, enumerate, active_count
    # current_thread()     id   current_thread().ident     id
    def func(i):
        print(f'start{i},{current_thread()}')
        time.sleep(1)
        print(f'end{i}')
    
    
    if __name__ == '__main__':
        t1 = []
        for i in range(10):
            t = Thread(target=func, args=(i,))
            t.start()
            t1.append(t)
        print(enumerate(), active_count()) # 11         
        for t in t1:
            t.join()
        # print(enumerate(), active_count())  # 1    1    
        print('          ')
    
    #         terminate
    #              
    # enumerate                
    # active_count                
    #              
    #            
    
  • デーモンスレッドおよびスレッドロック
    # import time
    # from threading import Thread
    #
    # def son():
    #     while True:
    #         print('in son')
    #         time.sleep(1)
    # def son2():
    #     for i in range(10):
    #         print('in son2')
    #         time.sleep(1)
    # t = Thread(target=son)
    # t.daemon = True  #     ,             
    # Thread(target=son2).start()  #                          
    # t.start()
    #                
    #    ?
        #            
    #                          
    #    ?
        #               
    
    #    
    from threading import Thread,Lock
    n = 0
    def add(lock):
        with lock:
            for i in range(200000):
                global n
                n += 1
    
    def sub(lock):
        with lock:
            for i in range(200000):
                global n
                n -= 1
    lock = Lock()
    Thread(target=add, args=(lock,))
    Thread(target=sub, args=(lock,))
    print(n)
    
    # += -=           +           
    # append pop    
    #               
    
  • 再帰錠とデッドロック(科学者が麺を食べる)
    #     RLock (Recursion)
        #          acquire  
        #          ,                  ,   
        #    ,       ,     ,      ,   .   ,       
    
    # from threading import Lock, RLock
    # rl = RLock()
    # rl.acquire()
    # rl.acquire()
    # rl.acquire()
    # rl.acquire()
    # print('     ')
    # rl.acquire()
    # rl.acquire()
    # rl.acquire()
    # rl.acquire()
    # l = Lock()
    # l.acquire()
    # print('     ')
    # l.release()
    # from threading import  RLock, Thread
    #
    # def func(i, lock):
    #     lock.acquire()
    #     lock.acquire()
    #     lock.acquire()
    #     print(f'{i}:start')
    #     lock.release()
    #     # lock.release()
    #     # lock.release()
    #     print(f'{i}:end')
    # lock = RLock()
    # for i in range(5):
    #     Thread(target=func, args=(i, lock)).start()
    #     
    #                             
    from threading import Lock, Thread, RLock
    import time
    
    # noodle_lock = Lock()
    # fork_lock = Lock()
    noodle_lock = fork_lock = RLock()  #         
    
    def eat(name):
        noodle_lock.acquire() #     
        print(f'{name}    ')
        fork_lock.acquire()
        print(f'{name}     ')
        print(f'{name}  ')
        time.sleep(0.1)
        fork_lock.release()
        print(f'{name}     ')
        noodle_lock.release()
        print(f'{name}    ')
    
    def eat2(name):
        fork_lock.acquire()
        print(f'{name}     ')
        noodle_lock.acquire()  #     
        print(f'{name}    ')
        print(f'{name}  ')
        time.sleep(0.1)
        noodle_lock.release()
        print(f'{name}    ')
        fork_lock.release()
        print(f'{name}     ')
    
    
    Thread(target=eat, args=('alex',)).start()
    Thread(target=eat2, args=('taibai',)).start()
    Thread(target=eat, args=('wusie',)).start()
    Thread(target=eat2, args=('dazhuang',)).start()
    
    #          :                
    #              ,                        
    #         ,      ,       
    
  • キュー
    # import queue  #              
    #
    # q = queue.Queue(4)  # FIFO     
    # # q.put(1)
    # q.put(2)
    # # q.get()
    # q.put(3)
    # q.put(4)
    # print('4 done')
    # q.put(5)
    # print('5 done')
    
    # q.get_nowait()
    # q.put_nowait()
    
    from queue import LifoQueue, PriorityQueue
    # LifoQueue  last in first out     
    # PriorityQueue      
    priq = PriorityQueue()
    priq.put((2, 'alex'))
    priq.put((5, 'xia'))
    priq.put((0, 'ming'))
    
    print(priq.get())
    print(priq.get())
    print(priq.get())
    # (0, 'ming')
    # (2, 'alex')
    # (5, 'xia')
    
    #            
        #      Queue
        #      LifoQueue
        #     PriorityQueue
    
  • プール
    #     
        #          ,                 
        #        ,    
    #       
        #        /  ,                      
        #                   ,           
        #           /      /       
        #      /                    ,       
        #             ,         
    
    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
    from threading import current_thread
    import time
    import os
    # threading         
    # multiprocessing      threading  Pool
    # concurrent.futures                            /  
    # def func(a, b):
    #     print(current_thread().ident, 'start', a, b)
    #     time.sleep(1)  #         ,           
    #     print(current_thread().ident, 'end')
    #
    #
    # tp = ThreadPoolExecutor(4)
    # for i in range(20):
    #     tp.submit(func, i, b=i+1)  #        ,   
    
    #         
    #        , submit    
    #    
    def func(a, b):
        print(current_thread().ident, 'start', a, b)
        time.sleep(1)  #         ,           
        print(current_thread().ident, 'end')
        return a*b
    
    
    if __name__ == '__main__':
        pp = ProcessPoolExecutor(4)
        future_l = []
        for i in range(20):
            ret = pp.submit(func, i, b=i+1)  #        ,   
            # print(ret)  #              ,         ,   result       
            # print(ret.result())
            future_l.append(ret)  #       
        for ret in future_l:
            print(ret.result())
    
    # map
    # ret = pp.map(func, range(20))
    # for key in ret:
    #     print(key)
    
    #     ?????????    
    
  • コモン
    """
                       
                 
                                       IO  ,                     
    
    """
    #      IO     
    # gevent      greenlet           +      IO   
    # asyncio        yeild
    
  • geventモジュール
    # import gevent
    #
    # def func():  #   IO          ,    func gevent
    #     print('start func')
    #     gevent.sleep(1)
    #     print('end func')
    #
    # g1 = gevent.spawn(func)
    # g2 = gevent.spawn(func)
    # g3 = gevent.spawn(func)
    # gevent.joinall([g1, g2, g3])
    # # g1.join()  #         g1      
    # # g1.join()  #         g1      
    # # g1.join()  #         g1      
    from gevent import monkey
    monkey.patch_all()
    import time
    import gevent
    
    def func():  #   IO          ,    func gevent
        print('start func')
        time.sleep(1)
        print('end func')
    
    g1 = gevent.spawn(func)
    g2 = gevent.spawn(func)
    g3 = gevent.spawn(func)
    gevent.joinall([g1, g2, g3])
    # g1.join()  #         g1      
    # g1.join()  #         g1      
    # g1.join()  #         g1      
    
  • asyncioモジュール
    import asyncio
    
    async def func(name):
        print(f'start{name}')
        # await           
        # await          async    , py3.8    
        await asyncio.sleep(1)
        print('end')
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait([func('alex'), func('taibai')]))  #   
    
    #