Pythonマルチスレッドとマルチプロセス(3)スレッドの同期の条件変数
8393 ワード
このシリーズの記事ディレクトリ
展開/終了 Pythonマルチスレッドとマルチプロセス(一)GILロックとThreadを使用したマルチスレッド の作成 Pythonマルチスレッドとマルチプロセス(二)スレッドの同期の反発ロックと再入ロック Pythonマルチスレッドとマルチプロセス(3)スレッドの同期の条件変数 Pythonマルチスレッドとマルチプロセス(4)スレッドの同期信号量 Pythonマルチスレッドとマルチプロセス(5)マルチスレッド管理-スレッドプール Pythonマルチスレッドとマルチプロセス(六)マルチプロセスプログラミングと同期 同期方式2:条件変数
まず、条件変数はマルチスレッド競合の共有リソースであるため、反発ロックと組み合わせて使用する必要があります.条件変数により待機と通知のメカニズムを実現できる.
最も基本的な使用方法は次のとおりです.
そのうち:cond.acquire() 実は、lock.acquire()cond.wait()に相当する反発ロックで鍵をかけます. 3つのことができます:1.反発ロックを解放し、同じ条件変数を使用した他のスレッドがロックを取得して実行できるようにします.2.ブロックスリープ状態に入り、CPUを逃がす.3.本スレッドのwait()が他のスレッドnotifyによって起動されると、wait()はロックを再取得します.cond.notify() 他のスレッドに通知し、他のスレッドのwait()を起動します.cond.release() 反発ロックの解除
wait()とrelease()はロック内で実行する必要があります.ロックを取得せずにwait()とrelease()を直接呼び出すとエラーが表示されます
PS:使用 with cond: if my_condition: cond.wait() do_something() cond.notify() および cond.acquire() if my_condition: cond.wait() do_something() cond.notify() cond.release()
同じです.
withを使うのはコードの外にロックをかけたのと同じです.
例1:マルチスレッドの秩序ある実行を制御する2人のA,Bが会話を行い,それぞれ1~6という数字を言い,Aに先に言うように要求し,AがBを言ってから言うことができ,BがAを言ってからA:1 B:2 A:3 B:4 A:5 B:6と言うことができる
ここでは条件変数を用いて実現した
もちろん,複数の反発ロックを用いてマルチスレッドの秩序化実行を実現することもでき,前述の反発ロックの例で示した.
例2:相互反発ロック、条件変数、リストを使用して、スレッドの安全なキューを実現します.
次はConditionの下位層がどのように実現されているかを説明します:1.Conditionをインスタンス化するとき、Conditionの_init__条件変数オブジェクトの使用を保護するためのRLock再読み込みロックが生成されます.この錠をRと呼ぶ
2.wait()を実行する前にcond.acquire()を実行して条件変数に鍵をかけなければならない.上の鍵はRである. wait()を実行するとき、wait()はいくつかのことをしました. 2−1.wait()は反発ロックを作成し、この反発ロックをXと呼び、Xに対してacquire()ロック:X.acquire()を呼び出し、その後、ロックXを双方向キューQに配置する. 2-2.wait()ロックRを解放し、他のスレッドがロックRを取得し、いくつかのタスクコードを実行できるようにする 2-3.wait()はRを解放した後、Xに対してもう一度ロックします.X.acquire()Xに2回連続してロックするため、デッドロックが発生し、wait()がブロック状態に入る. だから条件変数のwait()はデッドロック方式でブロック待ちの機能を実現します!! 2-4.wait()のロックXが他のスレッドのnotify()によって解放されると、Rに再ロックされ、ロックXは二度と使用されません.次回wait()を呼び出すと新しいXロックが生成されます 3.他のスレッドはロックRを取得し、いくつかのタスクコードを実行した後、notify()を実行して前のスレッドのwait()を起動する notify()はいくつかのことをしました. 3−1.キューQヘッダからロックXをポップアップし、ロックXを解放する.ロックXを解放することによってwait()を起動する.その後この錠Xは永遠に使われません まとめ:Conditionの実装には2つのロックが使われています:_init__()時に作成される再ロックRとwait()は、条件変数と共有変数を保護するために作成された反発ロックX RのスレッドセキュリティXであり、スレッドセキュリティではなく、デッドロックを製造してブロック効果を達成するために使用される
Rは繰り返し使用され、Xは使い捨てで、毎回新しいXが生成されます
以下にCondition中__を貼り付けますinit__,wait()とnotify()のソース:
Pythonマルチスレッドとマルチプロセス(三)スレッドの同期の条件変数
展開/終了
まず、条件変数はマルチスレッド競合の共有リソースであるため、反発ロックと組み合わせて使用する必要があります.条件変数により待機と通知のメカニズムを実現できる.
最も基本的な使用方法は次のとおりです.
cond = Condition() #
cond.acquire() #
cond.wait() # , , notify
do_something()
cond.notify() # cond
cond.release()
そのうち:cond.acquire() 実は、lock.acquire()cond.wait()に相当する反発ロックで鍵をかけます. 3つのことができます:1.反発ロックを解放し、同じ条件変数を使用した他のスレッドがロックを取得して実行できるようにします.2.ブロックスリープ状態に入り、CPUを逃がす.3.本スレッドのwait()が他のスレッドnotifyによって起動されると、wait()はロックを再取得します.cond.notify() 他のスレッドに通知し、他のスレッドのwait()を起動します.cond.release() 反発ロックの解除
wait()とrelease()はロック内で実行する必要があります.ロックを取得せずにwait()とrelease()を直接呼び出すとエラーが表示されます
PS:使用 with cond: if my_condition: cond.wait() do_something() cond.notify() および cond.acquire() if my_condition: cond.wait() do_something() cond.notify() cond.release()
同じです.
withを使うのはコードの外にロックをかけたのと同じです.
例1:マルチスレッドの秩序ある実行を制御する2人のA,Bが会話を行い,それぞれ1~6という数字を言い,Aに先に言うように要求し,AがBを言ってから言うことができ,BがAを言ってからA:1 B:2 A:3 B:4 A:5 B:6と言うことができる
from threading import Condition,Thread
cond = Condition()
a_say = [1,3,5]
b_say = [2,4,6]
class A(Thread):
def __init__(self,cond,say):
super(A,self).__init__(name="A")
self.cond = cond
self.say = say
def run(self):
self.cond.acquire()
for i in range(len(self.say)):
print("%s say %d" % (self.name,self.say.pop(0)))
self.cond.notify() # A B, B
if len(self.say):
self.cond.wait() # A , B , B A,A
self.cond.release()
class B(Thread):
def __init__(self,cond,say):
super(B,self).__init__(name="B")
self.cond = cond
self.say = say
def run(self):
self.cond.acquire()
for i in range(len(self.say)):
self.cond.wait() # A B , B
print("%s say %d" % (self.name,self.say.pop(0)))
self.cond.notify() # B A, A
self.cond.release()
if __name__=="__main__":
a = A(cond,a_say)
b = B(cond,b_say)
b.start() # b ,a , a , a b wait() notify(), notify() 。 a wait().b wait() ,
a.start()
ここでは条件変数を用いて実現した
もちろん,複数の反発ロックを用いてマルチスレッドの秩序化実行を実現することもでき,前述の反発ロックの例で示した.
例2:相互反発ロック、条件変数、リストを使用して、スレッドの安全なキューを実現します.
# coding=utf-8
from threading import Thread
from threading import Lock,Condition
import random
class ThreadSafeQueue:
def __init__(self,max_size=0,blocking=True,timeout=None): #
self.max_size=max_size
self.blocking=blocking #
self.timeout=timeout #
self.lock = Lock()
self.cond = Condition(lock=self.lock) # , Condition
self.queue = []
def size(self): # self.queue , self.queue ,
self.lock.acquire()
size = len(self.queue)
self.lock.release()
return size
def batch_push(self,items):
if not isinstance(items,list):
items=list(items)
for item in items:
self.push(item)
def push(self,item):
self.cond.acquire()
while self.max_size>0 and len(self.queue)>=self.max_size:
if self.blocking:
res = self.cond.wait(timeout=self.timeout) # timeout , False
if not res:
self.cond.release()
return False
else:
self.cond.release()
return False
self.queue.append(item)
self.cond.notify()
self.cond.release()
return True
def pop(self):
self.cond.acquire()
while len(self.queue)<=0:
if self.blocking:
res=self.cond.wait(timeout=self.timeout)
if not res:
self.cond.release()
return False
else:
self.cond.release()
return False
item = self.queue.pop()
self.cond.notify() #
self.cond.release()
return item
def get(self,index):
self.lock.acquire()
try:
item = self.queue[index]
except:
item=None
self.lock.release()
return item
#
def produce(q,n):
for i in range(100000):
q.push(i)
print("Thread %d push %d" % (n,i))
def consumer(q,n):
count_none = 0 # q.pop() 10 while
while True:
item = q.pop()
if item is False:
count_none+=1
else:
count_none=0
print("Thread %d pop %d" % (n,item))
if count_none>=10:
break
#
if __name__=="__main__":
queue = ThreadSafeQueue(1000) # , , ,
# queue = ThreadSafeQueue(1000,timeout=1) # , , , 10
# queue = ThreadSafeQueue(1000,blocking=False) # , , ,
# , , , ,
t1 = Thread(target=produce,args=(queue,1))
t2 = Thread(target=produce,args=(queue,2))
t3 = Thread(target=consumer,args=(queue,3))
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
次はConditionの下位層がどのように実現されているかを説明します:1.Conditionをインスタンス化するとき、Conditionの_init__条件変数オブジェクトの使用を保護するためのRLock再読み込みロックが生成されます.この錠をRと呼ぶ
2.wait()を実行する前にcond.acquire()を実行して条件変数に鍵をかけなければならない.上の鍵はRである. wait()を実行するとき、wait()はいくつかのことをしました. 2−1.wait()は反発ロックを作成し、この反発ロックをXと呼び、Xに対してacquire()ロック:X.acquire()を呼び出し、その後、ロックXを双方向キューQに配置する. 2-2.wait()ロックRを解放し、他のスレッドがロックRを取得し、いくつかのタスクコードを実行できるようにする 2-3.wait()はRを解放した後、Xに対してもう一度ロックします.X.acquire()Xに2回連続してロックするため、デッドロックが発生し、wait()がブロック状態に入る. だから条件変数のwait()はデッドロック方式でブロック待ちの機能を実現します!! 2-4.wait()のロックXが他のスレッドのnotify()によって解放されると、Rに再ロックされ、ロックXは二度と使用されません.次回wait()を呼び出すと新しいXロックが生成されます 3.他のスレッドはロックRを取得し、いくつかのタスクコードを実行した後、notify()を実行して前のスレッドのwait()を起動する notify()はいくつかのことをしました. 3−1.キューQヘッダからロックXをポップアップし、ロックXを解放する.ロックXを解放することによってwait()を起動する.その後この錠Xは永遠に使われません まとめ:Conditionの実装には2つのロックが使われています:_init__()時に作成される再ロックRとwait()は、条件変数と共有変数を保護するために作成された反発ロックX RのスレッドセキュリティXであり、スレッドセキュリティではなく、デッドロックを製造してブロック効果を達成するために使用される
Rは繰り返し使用され、Xは使い捨てで、毎回新しいXが生成されます
以下にCondition中__を貼り付けますinit__,wait()とnotify()のソース:
class Condition:
def __init__(self, lock=None):
if lock is None:
lock = RLock() # R。 lock lock。
self._lock = lock
self.acquire = lock.acquire
self.release = lock.release
try:
self._release_save = lock._release_save
except AttributeError:
pass
try:
self._acquire_restore = lock._acquire_restore
except AttributeError:
pass
try:
self._is_owned = lock._is_owned
except AttributeError:
pass
self._waiters = _deque()
def wait(self, timeout=None):
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
waiter = _allocate_lock() #### X ####
waiter.acquire() #### X ####
self._waiters.append(waiter) #### X Q ####
saved_state = self._release_save() #### R ####
gotit = False
try:
if timeout is None:
waiter.acquire() #### X , , ####
gotit = True
else:
if timeout > 0:
gotit = waiter.acquire(True, timeout)
else:
gotit = waiter.acquire(False)
return gotit
finally:
self._acquire_restore(saved_state) #### X , R ####
if not gotit:
try:
self._waiters.remove(waiter)
except ValueError:
pass
def notify(self, n=1):
if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
all_waiters = self._waiters
waiters_to_notify = _deque(_islice(all_waiters, n))
if not waiters_to_notify:
return
for waiter in waiters_to_notify:
waiter.release() #### X ####
try:
all_waiters.remove(waiter) #### X Q ####
except ValueError:
pass
Pythonマルチスレッドとマルチプロセス(三)スレッドの同期の条件変数