Pythonスレッドとプロセス

45797 ワード

PythonマルチスレッドはI/O密集型タスク、Pythonマルチプロセスは計算密集型タスクに適用されます.
1マルチスレッド
スレッドには、プライマリ・スレッド、サブスレッド、デーモン・スレッド(バックグラウンド・スレッド)、フロント・スレッドが含まれます.
GIL(グローバルインタプリタロック)はPythonのマルチスレッドを本物のマルチスレッドではないようにし、インタプリタは任意のPythonコードを実行する場合、まずこのロックを取得する必要があり、I/O操作に遭遇したときにこのロックを解放します.Pythonのプロセスは全体として,インタプリタプロセス内では1つのスレッドのみが実行され,他のスレッドはGILの解放を待機状態で待っている.
一般的にGILの存在は同じ時刻に1つのスレッドしか実行できないが,マルチスレッドは真の同時ではなく,交互に実行される.Pythonのマルチスレッド同時化は,単一スレッド(スレッド切替に時間がかかるため)に及ばない可能性があり,マルチコアCPUを利用するには,マルチプロセスを考慮することができる.
1.1スレッドの作成
import time
import threading

class MyThread(threading.Thread):
    def run(self):
        for i in range(4):
            print('thread {}, @number: {}'.format(self.name, i))
            time.sleep(2)

def main():
    print('Start main threading')
    threads = [MyThread() for i in range(3)]
    for t in threads:
        t.start()
    print('End main threading')

if __name__ == '__main__':
    main()
#    :
Start main threading         #     
thread Thread-13, @number: 0 #     :3          
thread Thread-14, @number: 0
thread Thread-15, @number: 0
End main threading           #     

thread Thread-13, @number: 1 #   3 3      :3              /     
thread Thread-14, @number: 1
thread Thread-15, @number: 1

thread Thread-15, @number: 2
thread Thread-14, @number: 2
thread Thread-13, @number: 2

thread Thread-15, @number: 3
thread Thread-14, @number: 3
thread Thread-13, @number: 3

1.2スレッドのマージ-join
joinメソッド:joinメソッドを呼び出されたサブスレッドの実行が終了するまで、現在の親スレッドをブロックします.一般に、スレッドAが実行中であり、スレッドB(およびC)が割り込むと、Aはブロックされて運転を一時停止し、AはB(およびC)の運転が終了してから運転を継続する.Cがある場合、BとCの間には親/子関係ではないため、BとCの間にはパラレルインタラクションが同時に実行され、誰もブロックされていないことを強調する必要があります.
joinメソッドでパラメータを設定すると、サブスレッドが親スレッドをブロックする時間を表します.この時間が過ぎると、親スレッドは実行を続け、スレッドがどのような状態であるかは管理されません(親測:親スレッドはkillサブスレッドではありませんが、一部のブログではkillサブスレッドができると言われています).
#1.2.1   1.1    main  ,  2 
def main():
    print('Start main threading')
    threads = [MyThread() for i in range(3)]
    for t in threads: #     ,3        ,       
        t.start()
    for t in threads: #     ,       ,3          
        t.join()
        print('Next FOR operation')
    print('End main threading')
#    :
Start main threading
thread Thread-20, @number: 0
thread Thread-21, @number: 0
thread Thread-22, @number: 0

thread Thread-20, @number: 1
thread Thread-22, @number: 1
thread Thread-21, @number: 1

thread Thread-20, @number: 2
thread Thread-21, @number: 2
thread Thread-22, @number: 2

thread Thread-20, @number: 3
thread Thread-21, @number: 3
thread Thread-22, @number: 3

Next FOR operation #    , 12Next FOR operation
Next FOR operation
End main threading #   3
#1.2.2   main    ,   1.2.1  ,      :
# 1.2.1 , 3           , 1          ,  2         ,        ,   3        ,         ;
#    ,   2         ,  2       ,   1、3       ,         。    ,    3           。
def main():
    print('Start main threading')
    threads = [MyThread() for i in range(3)]
    for t in threads:
        t.start()
    threads[1].join() # 2         ,      threads[0] threads[2]
    print('End main threading')
#1.2.3   main    ,   1.2.1 1.2.2      
def main():
    print('Start main threading')
    threads = [MyThread() for i in range(3)]
    #                ,      (    for  ,              )
    for t in threads:
        t.start()
        t.join()
        print('Next FOR operation')
    print('End main threading')
#    :    ,   main  for           ,          ,  3                
Start main threading
thread Thread-23, @number: 0
thread Thread-23, @number: 1
thread Thread-23, @number: 2
thread Thread-23, @number: 3
Next FOR operation

thread Thread-24, @number: 0
thread Thread-24, @number: 1
thread Thread-24, @number: 2
thread Thread-24, @number: 3
Next FOR operation

thread Thread-25, @number: 0
thread Thread-25, @number: 1
thread Thread-25, @number: 2
thread Thread-25, @number: 3
Next FOR operation
End main threading

1.3スレッドの同期とスレッドロック
スレッドがプロセスより軽量なのは、メモリを共有しているためです.つまり、各スレッドはメモリデータに平等にアクセスでき、短時間でメモリを変更したデータを「同時に並列に」読み込むと、データが同期しない可能性があります.したがって、同時にマルチスレッドを並列に行う場合は注意してください.特に、各サブスレッドがメモリデータを同時に変更する場合.スレッドの非同期によるデータの非同期を回避するために、リソースにアクセスするには、スレッドがリソースをロックするためにロックを取得する必要があります.方法は,リソースにアクセスする前にロックが取得されたか否かを判断し,リソースにアクセスした後にロックを解除する.threadingモジュールのロック機能を利用できます.
#   
mutex = threading.Lock()
#   ,    ,    
if mutex.acquire():
    #       ……
    #   ,      
    mutex.release()

1.3.1デッドロック
複数のサブスレッドが実行される前にロックを取得する必要があるというデッドロック現象が発生する場合がありますが、ロックを取得するには他のスレッドがロックを解放する必要がありますが、他のスレッドは実行されず、ロックを解放できません.通俗的に言えば、この時袋小路に陥ったのは、デッドロックだ.
もう1つの言い方は、2つのglobalリソースaとbがあり、2つのスレッドt 1とt 2があり、t 1がaを占有するとbにアクセスしたいが、このときt 2がbを占有し、aにアクセスしたいとし、2つのスレッドはこのとき所有していたリソースを解放しない、すなわちデッドロックである.
import time
import threading
mutex_a = threading.Lock()
mutex_b = threading.Lock()

class MyThread(threading.Thread):
    def task_a(self):
        if mutex_a.acquire():
            ……
            if mutex_b.acquire():
                ……
                mutex_b.release()
            mutex_a.release()
    def task_b(self):
        if mutex_b.acquire():
            ……
            if mutex_a.acquire():
                ……
                mutex_a.release()
            mutex_b.release()
    def run(self):
        self.task_a()
        self.task_b()

def main():
    print "Start main threading"
    threads = [MyThread() for i in range(2)]
    for t in threads:
        t.start()
    print "End Main threading"

以上のコード出力結果は空です!
1.3.2再ロック可能
再ロック可能:同じスレッドで同じリソースを複数回要求するために、Pythonは再ロック可能(RLock)を提供します.RLockの内部にはロックとcounter変数が維持されており、counterはacquireの回数を記録し、1つのスレッドのすべてのacquireがreleaseされるまでリソースを複数回requireすることができ、他のスレッドはリソースを得ることができます.詳細は省略する.
1.3.3条件変数ロック
条件変数ロック:複雑な場合、ロックについていくつかの条件判断が必要です.Pythonは、acquireメソッドとreleaseメソッドのほかにwaitメソッドとnotifyメソッドを提供するConditionオブジェクトを提供します.スレッドはまず条件変数ロックをacquireします.条件が成立しない場合、スレッドwait;成立したらこのスレッドを実行し、notifyの他のスレッドを実行することもできます.他のwait状態のスレッドは通知を受けて条件を再判断します.条件変数ロックは,異なるスレッドが前後してacquireでロックを獲得し,条件が成立しなければロック/RLockのwaitingプールに投げ込まれ,他のスレッドnotifyに直行した後に条件を再判断すると見なすことができる.このモードは、生成者消費者モードによく用いられる.
import time
import threading
import random
queue = []
con = threading.Condition()

class Producer(threading.Thread):
    def run(self):
        while True:
            # 1 con.aqcuire()  True,      con.acquire() True ,    con.acquire() False,    
            if con.acquire():
                if len(queue) > 5: #     5    ,     
                    con.wait()
                else:
                    elem = random.randrange(5)
                    queue.append(elem)
                    print("Produce elem {}, size is {}".format(elem, len(queue)))
                    time.sleep(0.5)
                    con.notify() #      1   ,         ,       
                con.release() #      (           ,     )
                time.sleep(2) #  2 ,                 

class Consumer(threading.Thread):
    def run(self):
        while True:
            if con.acquire():
                if len(queue) < 0: #       ,      
                    con.wait()
                else:
                    elem = queue.pop()
                    print("Consume elem {}, size is {}".format(elem, len(queue)))
                    time.sleep(0.5)
                    con.notify() #      1   ,         ,        
                con.release() #      (           ,     )
                time.sleep(2) #  2 ,                 

def main():
    for i in range(3):
        Producer().start()
    for i in range(2):
        Consumer().start()

if __name__ == '__main__':
    main()
#    (      ,    !):
Produce elem 3, size is 1
Consume elem 3, size is 0
Produce elem 2, size is 1
Produce elem 3, size is 2
Consume elem 3, size is 1
Produce elem 2, size is 2
Produce elem 0, size is 3
Consume elem 0, size is 2
Produce elem 0, size is 3
Produce elem 3, size is 4
Consume elem 3, size is 3
Produce elem 1, size is 4
Produce elem 1, size is 5
Consume elem 1, size is 4
Produce elem 1, size is 5
Produce elem 4, size is 6
Consume elem 4, size is 5
Produce elem 4, size is 6
……

1.3.4キュー
以上の生産者消費者モデルはキューを操作し,Pythonにキュー構造Queueがあり,その内部にロックに関する設定が実現されている.Queueで生産者消費者モデルを書き換えると、以下のようになります.
import queue #Python2  Queue,Python3   queue
import threading
import time
import random
q5 = queue.Queue(5) #      5,  5,    

class Producer(threading.Thread):
    def run(self):
        while True:
            elem = random.randrange(5)
            q5.put(elem) #      ,Queue.put    ,   ,            
            print("Produce elem {}, size is {}".format(elem, queue.qsize()))
            time.sleep(2)

class Consumer(threading.Thread):
    def run(self):
        while True:
            elem = q5.get() #      ,Queue.get    ,   ,            
            q5.task_done() #     get  fetch  task ,task_done     task  
            print("Consume elem {}, size is {}".format(elem, queue.qsize()))
            time.sleep(2)

def main():
    for i in range(3):
        Producer().start()
    for i in range(2):
        Consumer().start()

if __name__ == '__main__':
    main()
#    :  1.3.3   
Produce elem 4, size is 1
Produce elem 1, size is 2
Produce elem 1, size is 3
Consume elem 4, size is 2
Consume elem 1, size is 1
Produce elem 4, size is 2
Produce elem 1, size is 3
Produce elem 3, size is 4
Consume elem 1, size is 3
Consume elem 4, size is 2
Consume elem 1, size is 1
……
Consume elem 1, size is 3
Produce elem 2, size is 4
Produce elem 4, size is 5
……

1.4 ThreadLocal
他のスレッドと変数を共有したくない場合はlocal変数として定義できますが、関数で定義されたlocal変数は関数間で伝達するのが特に面倒です.この場合はThreadLocalを使用できます.定義はglobal変数で、各サブスレッドで使用できますが、使用するとそのglobal変数はサブスレッド内部のローカル変数になり、他のスレッドでは理性的ではありません.
import threading

def test(name):
    print('Current thread: {}'.format(threading.currentThread().name))
    local.name = name #             ThreadLocal   ,            
    print("{} in {}".format(local.name, threading.currentThread().name))

local = threading.local() #  local ThreadLocal  ,          ,            
t1 = threading.Thread(target=test, args=('Tom',))
t2 = threading.Thread(target=test, args=('Lina',))
t1.start()
t2.start()
#    :
Current thread: Thread-31
Tom in Thread-31
Current thread: Thread-32
Lina in Thread-32

1.5 Eventインタフェース
threadingモジュールのEventインタフェースのwaitおよびsetメソッドは、スレッドをブロックおよび起動するためにそれぞれ使用され、スレッドを制御する役割を果たす.
import threading
import time

class MyThread(threading.Thread):
    def __init__(self, event):
        super(MyThread, self).__init__()
        self.event = event
    def run(self):
        print("thread {} is ready ".format(self.name))
        self.event.wait() #     (  event.set     )
        print("thread {} run".format(self.name))

signal = threading.Event() #    Event   signal,       signal          

def main():
    start = time.time()
    for i in range(3):
        t = MyThread(signal) # Event   signal    3   
        t.start()
    time.sleep(3)
    print("after {}s".format(time.time() - start))
    signal.set() #3       wait     ,set          

if __name__ == '__main__':
    main()
#    :
thread Thread-11 is ready 
thread Thread-12 is ready 
thread Thread-13 is ready 
after 3.003596544265747s
thread Thread-12 run
thread Thread-13 run
thread Thread-11 run

1.6バックグラウンドスレッド
C/C++では、メインスレッドが終了すると、そのサブスレッドはデフォルトでメインスレッドkillによって削除されます.Pythonでは、サブスレッドが終了するのをデフォルトで待ってから、メインスレッドが終了します.メインスレッドを終了させるには、サブスレッドが実行終了するかどうかにかかわらずkillされ、daemonプロパティをTrueにしてサブスレッドをメインスレッドのバックグラウンドスレッドにし、メインスレッドとともに終了させることができます.
#     ,      ,daemon      ?……
import threading
import time

class MyThread(threading.Thread):
    def run(self):
        print('thread {} will wait {}s'.format(self.name, 2))
        time.sleep(2)
        print("thread {} finished".format(self.name))

def main():
    print("Start main threading")
    for i in range(3):
        t = MyThread()
        t.daemon = True #               
        t.start()
    print("End Main threading")

if __name__ == '__main__':
    main()
#    :       
Start main threading
thread Thread-85 will wait 2s
thread Thread-86 will wait 2s
thread Thread-87 will wait 2s
End Main threading
thread Thread-85 finished
thread Thread-87 finished
thread Thread-86 finished

1.7 map関数によるマルチスレッドの実装
Pythonにはmap関数を含む2つのライブラリがある:multiprocessingとそのサブライブラリmultiprocessing.dummy.dummy、後者は前者の完全なクローンであり、唯一の違いはmultiprocessingがプロセスに使用され、dummyがスレッドに使用されることである.
mapはmap(func,[arg 1,arg 2,...,argn])として使用され、mapは各パラメータargiを関数funcに伝達し、スレッドを異なるCPUに分け、シーケンス操作、パラメータ伝達、結果保存などの一連の操作を完了することができる.
from urllib.request import urlopen
from multiprocessing.dummy import Pool

pool = Pool() #     ,         CPU  ,    (     ,              )
urls = ['http://www.zhihu.com', 'http://www.126.com', 'http://cn.bing.com']
results = pool.map(urlopen, urls) #    url      ,            results
print(results)
pool.close()
print('main ended')
#    :
[.client.HTTPResponse object at 0x7f909000e518>, .client.HTTPResponse object at 0x7f909000e748>, .client.HTTPResponse object at 0x7f909068dc88>]
main ended

2マルチプロセス
マルチプロセス:プロセスパッケージmultiprocessingは、1つの関数を定義するだけで、サブプロセス、通信と共有データをサポートし、異なる形式の同期を実行し、Process、Queue、Pipe、Lockなどのコンポーネントを提供します.
2.1 Process
作成プロセスのクラス:Process([group[,target[,name[,args[,kwargs]]]]]]),targetは呼び出しオブジェクト,nameは別名,argsは呼び出しオブジェクトの位置パラメータメタグループ,kwargsは呼び出しオブジェクトの辞書,groupは実質的に使用されない.
2.1.1プロセスとして関数を作成する
Processを使用して関数を構築するtargetパラメータとargsパラメータ:target=関数名、args=関数で使用するパラメータメタグループ
#      ,       
import multiprocessing
import time

def worker_1(interval):
    print("worker_1")
    time.sleep(interval)
    print("end worker_1")
def worker_2(interval):
    print("worker_2")
    time.sleep(interval)
    print("end worker_2")
def worker_3(interval):
    print("worker_3")
    time.sleep(interval)
    print("end worker_3")

if __name__ == "__main__":
    p1 = multiprocessing.Process(target = worker_1, args = (1,))
    p2 = multiprocessing.Process(target = worker_2, args = (2,))
    p3 = multiprocessing.Process(target = worker_3, args = (3,))
    p1.start() #3         
    p2.start()
    p3.start()

    print("The number of CPU is:" + str(multiprocessing.cpu_count()))
    for p in multiprocessing.active_children():
        print("child p.name: " + p.name + "\tp.id:" + str(p.pid))
    print("END!!!!")
#    :
worker_1 # 1       
The number of CPU is:8
child p.name: Process-5  p.id29126
child p.name: Process-3  p.id29122
child p.name: Process-4  p.id29123
END!!!!
worker_2 # 2       ,          ,            
worker_3
end worker_1
end worker_2
end worker_3

コードp3.start()の後に1行のコードを追加します:time.sleep(0.5)
#    :
worker_1
worker_2
worker_3 #0.5        2 3    ,          
The number of CPU is:8
child p.name:Process-12 p.id3610
child p.name:Process-14 p.id3622
child p.name:Process-13 p.id3611
END!!!!
end worker_1
end worker_2
end worker_3

2.1.2プロセスをクラスとして定義する
クラスの__init__関数ではProcess.__init__(self)を実現し,run関数では具体的な機能を実現する.
import multiprocessing
import time

class ClockProcess(multiprocessing.Process):
    def __init__(self, interval):
        multiprocessing.Process.__init__(self)
        self.interval = interval
    def run(self):
        for i in range(5):
            print("time is {0}".format(time.ctime()))
            time.sleep(self.interval)

if __name__ == '__main__':
    p = ClockProcess(2)
    p.start()
#    :
time is Mon Jun 20 18:58:52 2016
time is Mon Jun 20 18:58:54 2016
time is Mon Jun 20 18:58:56 2016
time is Mon Jun 20 18:58:58 2016
time is Mon Jun 20 18:59:00 2016

2.1.3 daemonプロパティ
daemonプロパティは、プロセスがバックグラウンドプロセスであるかどうかを決定し、バックグラウンドプロセスはメインプロセスの終了に伴って終了します(ただし、実際の操作ではそうではありません.なぜですか?)
#daemon       False
import multiprocessing
import time

def worker(interval):
    print("work start: {0}".format(time.ctime()));
    time.sleep(interval)
    print("work end: {0}".format(time.ctime()));

if __name__ == "__main__":
    p = multiprocessing.Process(target=worker, args=(2,))
    #p.daemon = True
    p.start()
    print("end!")
#    :
end!
work start: Mon Jun 20 19:10:41 2016
work end: Mon Jun 
p.daemon = Trueを加えると、出力は変わりません!?原因はまだ調査中・・・
バックグラウンド・プロセスの実行を完了する方法は、メイン・プロセス・コードにp.join()行を追加することです.
2.2 Lock
ロックは、現在のリソースが1つのプロセスにのみ使用されることを制限し、複数のプロセスが共有リソースにアクセスする必要がある場合、アクセス競合を回避するために使用できます.
import multiprocessing

def worker_with(lock):
    with lock: # lock,      ,         
        for i in range(5):
            print("Lock acquired via with")

def worker_no_with(lock):
    lock.acquire()
    for i in range(5):
        print("Lock acquired directly")
    lock.release()

if __name__ == "__main__":
    lock = multiprocessing.Lock()
    w = multiprocessing.Process(target=worker_with, args=(lock,))
    nw = multiprocessing.Process(target=worker_no_with, args=(lock,))
    w.start()
    nw.start()
    print("end")
#    :
end
Lock acquired via with
Lock acquired via with
Lock acquired via with
Lock acquired via with
Lock acquired via with
Lock acquired directly #lock   worker_with  ,       ,worker_no_with   
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly

2.3 Semaphore
Semaphoreは、プールの最大接続数など、共有リソースへのアクセス数を制御するために使用されています..
2.4 Event
Eventはプロセス間の同期通信の継続を実現するために使用される......
2.5 Queue
Queueはマルチプロセスセキュリティキューであり、マルチプロセス間のデータ転送を実現することができ、主にputとgetの方法があり、それぞれデータをキューに挿入し、キューから要素を読み取り、削除するために使用される.続きは...
2.6 Pipe
Pipeメソッド戻り(conn 1,conn 2)は、パイプの両端が続くことを示します.
2.7 Pool
Poolは、ユーザが呼び出すために指定された数のプロセスを提供することができる.新しいリクエストがpoolにコミットされたとき、満たされていない場合は、新しいプロセスを作成してリクエストを実行します.満タンになると、poolでプロセスが終了するまでリクエストは待機し、リクエストを実行するために新しいプロセスが作成されます.続きは...
リファレンスリンク
参照先:http://python.jobbole.com/85050/参照先:http://python.jobbole.com/85177/参照先:http://www.cnblogs.com/kaituorensheng/p/4445418.html