python同時プログラミング--プロセス、スレッド、スレッド、ロック、プール、キュー
68551 ワード
文書ディレクトリ オペレーティングシステムの概念 プロセス multiprocessingモジュール デーモン マルチプロセスを使用して同時socketを実現するserver ロック 生産者消費者モデル データ共有 スレッドthreadingモジュール デーモンスレッドおよびスレッドロック 再帰錠とデッドロック(科学者が麺を食べる) キュー プール コモン geventモジュール asyncioモジュール
オペレーティングシステムの概念オペレーティングシステムヒューマンマシン矛盾:cpu 100%ワークI/O操作入出力メモリに対する複数のオペレーティングシステム:1つのプログラムがIOに遭遇すると、cpuを他の人に譲る順序の1つ1つの実行の構想が1台のコンピュータに共通に存在し、その中でパロディプログラムでcpuを実行させた後、もう一つのプログラムはcpuを引き続き使用してcpuの利用率を高めることができる単純な切り替えは時間を占有するが、マルチオペレーティングシステムの原理は全体的に時間を節約し、cpuの利用率を高める時空多重の概念単cpu分時オペレーティングシステム:プログラムごとに順番に1つのタイムスライスを実行し、タイムスライスが回転し、FCFSタイムスライスのリアルタイムオペレーティングシステムの分散オペレーティングシステムにユーザー体験を向上させた:大きなタスクを多くの小さなタスクに分解し、それぞれ異なるオペレーティングシステムに実行し、最後にまとめた.タスク分解可能celery python分散フレームワーク プロセス:進行中のプログラム.リソースの占有にはオペレーティングシステム呼び出しpidが必要です.プロセスコンピュータ内の最小のリソース割り当て単位を一意に識別できます.複数のプログラムが同時に実行されます.1つのcpuのみが、複数のプログラムが1つのcpu上で順番に実行されます.複数のプログラムを並列に実行し、複数のcpu上で同時に実行します.同期は、あることをするときに別のことを開始し、前回のイベントが終了してから非同期を実行する必要があります.Aイベントを行うときにBイベントを開始します.Aイベントが終了するのを待つ必要はありません.Bイベントブロックcpuが動作しない非ブロックcpu動作を開始できます.同期ブロック:input sleep(cpuが動作しない)非同期ブロックスレッドはプロセスの単位です.プロセス存在から離脱できないスレッドは、コンピュータにおいてcpuによってスケジューリング可能な最小単位プロセスの3つの状態図である:準備完了実行->ブロック->準備完了->実行プロセスのスケジューリングアルゴリズムは、すべてのプロセスにリソースを割り当てるか、cpu使用権を割り当てる方法短作業優先先サービスマルチレベルフィードバックアルゴリズム である. multiprocessingモジュール .デーモン マルチプロセスを使用して、同時socketのserver serverを実現する.py 錠 生産者消費者モデル データ共有 スレッドthreadingモジュール デーモンスレッドおよびスレッドロック 再帰錠とデッドロック(科学者が麺を食べる) キュー プール コモン geventモジュール asyncioモジュール
# multiple
# processing
# multiprocessing
# from multiprocessing import Process
# import time
# import os
# def func(name, age):
# # print(os.getpid(), os.getppid(), name, age) # pid process id ppid parent process id
# print(f' {name, age}')
# time.sleep(1)
# print(' ')
#
# if __name__ == '__main__':
# #
# # print('main:', os.getpid(), os.getppid())
# # p = Process(target=func, args=('alex', 82))
# # p.start() #
# arg_lst = [(' ', 78), ('alex', 89)]
# p_lst = []
# for arg in arg_lst:
# p = Process(target=func, args=arg)
# p.start()
# p_lst.append(p)
# for p in p_lst:
# p.join()
# print(' ')
#
#
# ,
# join
# p.join() # p , p
#
from multiprocessing import Process
n = 0
def func():
global n
n += 1
if __name__ == '__main__':
p_lst = []
for i in range(10):
p = Process(target=func)
p.start()
p_lst.append(p)
for p in p_lst:
p.join()
print(n) # 0 n n
# socket server
プロセスを開始する別の方法import os
from multiprocessing import Process
class MyProcess(Process):
def __init__(self, a, b, c):
self.a = a
self.b = b
self.c = c
super().__init__()
def run(self):
print(os.getppid(), os.getpid(), self.a, self.b, self.c)
if __name__ == '__main__':
print(os.getpid())
p = MyProcess(1, 2, 3)
p.start()
# p.terminate() #
from multiprocessing import Process
import time
def son1():
while True:
print('in son1')
time.sleep(1)
def son2():
for i in range(10):
print('in son2')
time.sleep(1)
if __name__ == '__main__':
p1 = Process(target=son1)
p1.daemon = True # p1
p1.start()
p2 = Process(target=son2)
p2.start()
p2.join()
time.sleep(5)
print('in main')
# 5 in son1
# ,
# ,
# , ,
# p2
import socket
from multiprocessing import Process
def talk(conn):
while True:
msg = conn.recv(1024).decode('utf-8')
ret = msg.upper().encode('utf-8')
conn.send(ret)
conn.close()
if __name__ == '__main__':
sk = socket.socket()
sk.bind(('127.0.0.1', 9000))
sk.listen()
while True:
conn, addr = sk.accept()
Process(target=talk, args=(conn,)).start()
sk.close()
client.py import socket
import time
sk = socket.socket()
sk.connect(('127.0.0.1', 9000))
while True:
sk.send(b'hello')
msg = sk.recv(1024).decode('utf-8')
print(msg)
time.sleep(0.5)
sk.close()
# acquire
#
#
import json
import time
from multiprocessing import Process, Lock
def search(i):
with open('ticket', encoding='utf-8') as f:
ticket = json.load(f)
print(f"{i}: {ticket['count']}")
def buy_ticket(i):
with open('ticket', encoding='utf-8') as f:
ticket = json.load(f)
if ticket['count'] > 0:
ticket['count'] -= 1
print(f'{i} ')
time.sleep(0.2)
with open('ticket', mode='w', encoding='utf-8') as f:
json.dump(ticket, f)
def get_ticket(i, lock):
search(i)
# lock.acquire()
# buy_ticket(i)
# lock.release()
#
with lock: # , ,
buy_ticket(i)
if __name__ == '__main__':
lock = Lock()
for i in range(10):
Process(target=get_ticket, args=(i, lock)).start()
#
# import time
# from multiprocessing import Lock, Process
#
# def func(i, lock):
# lock.acquire() #
# print(f' {i}')
# time.sleep(1)
# lock.release() #
#
#
# if __name__ == '__main__':
# lock = Lock()
# for i in range(10):
# Process(target=func, args=(i, lock)).start()
#
# (IPC) Inter Process Communication
# : Queue
# # :
# from multiprocessing import Queue, Process
#
# def son(q):
# q.put('hello')
#
# if __name__ == '__main__':
# q = Queue()
# Process(target=son, args=(q, )).start()
# print(q.get())
# # son mian
#
#
#
#
# from multiprocessing import Process, Queue
# import time
# import random
#
# def consumer(q):# :
# for i in range(10):
# print(q.get())
#
#
# def producer(q): # :
# for i in range(10):
# time.sleep(random.random())
# q.put(i)
#
#
# if __name__ == '__main__':
# q = Queue()
# c1 = Process(target=consumer, args=(q,))
# p1 = Process(target=producer, args=(q,))
# c1.start()
# p1.start()
# import requests
# from multiprocessing import Process, Queue
#
# def producer(i, url, q):
# ret = requests.get(url)
# q.put((i, ret.status_code))
# # print(ret.status_code)
#
# if __name__ == '__main__':
# q = Queue()
# url_lst = ['https://www.cnblogs.com/Eva-J/p/7277026.html',
# 'https://blog.csdn.net/qq_31910669',
# 'https://blog.csdn.net/qq_31910669/article/details/109136837']
# # for i, url in enumerate(url_lst):
# # producer(i, url, '1')
# for index, url in enumerate(url_lst):
# Process(target=producer, args=(index, url, q)).start()
# for i in range(3): # ,
# print(q.get())
import requests
from multiprocessing import Process, Queue
def producer(i, url, q):
ret = requests.get(url)
q.put((i, ret.text))
# print(ret.status_code)
def consumer(q):
while True:
res = q.get()
if res is None:
break
with open(f'{res[0]}.html', mode='w', encoding='utf-8') as f:
f.write(res[1])
if __name__ == '__main__':
q = Queue()
url_dic = {
'cn':'https://www.cnblogs.com/Eva-J/p/7277026.html',
'mu':'https://blog.csdn.net/qq_31910669',
'li':'https://blog.csdn.net/qq_31910669/article/details/109136837'}
p = []
for key in url_dic:
p1 = Process(target=producer, args=(key, url_dic[key], q))
p1.start()
p.append(p1)
Process(target=consumer, args=(q,)).start()
for p2 in p:
p2.join()
q.put(None)
# , lock
from multiprocessing import Process, Manager, Lock
def change_dic(dic, lock):
with lock:
dic['count'] -= 1
if __name__ == '__main__':
m = Manager()
dic = m.dict({
'count':100})
lock = Lock()
p_lst = []
for i in range(50):
p = Process(target=change_dic, args=(dic, lock))
p.start()
p_lst.append(p)
for p in p_lst:
p.join()
print(dic)
# : , , ,
# multiprocessing, start join
# Lock
# :
# ( )
# :redis
# Manager
#
#
# :
#
# cpu ?
# , ,
# CPython
# gc
# +
# gc
# (GIL)global interpreter lock
# cpu
# IO
# threading
import time
from threading import Thread, current_thread, enumerate, active_count
# current_thread() id current_thread().ident id
def func(i):
print(f'start{i},{current_thread()}')
time.sleep(1)
print(f'end{i}')
if __name__ == '__main__':
t1 = []
for i in range(10):
t = Thread(target=func, args=(i,))
t.start()
t1.append(t)
print(enumerate(), active_count()) # 11
for t in t1:
t.join()
# print(enumerate(), active_count()) # 1 1
print(' ')
# terminate
#
# enumerate
# active_count
#
#
# import time
# from threading import Thread
#
# def son():
# while True:
# print('in son')
# time.sleep(1)
# def son2():
# for i in range(10):
# print('in son2')
# time.sleep(1)
# t = Thread(target=son)
# t.daemon = True # ,
# Thread(target=son2).start() #
# t.start()
#
# ?
#
#
# ?
#
#
from threading import Thread,Lock
n = 0
def add(lock):
with lock:
for i in range(200000):
global n
n += 1
def sub(lock):
with lock:
for i in range(200000):
global n
n -= 1
lock = Lock()
Thread(target=add, args=(lock,))
Thread(target=sub, args=(lock,))
print(n)
# += -= +
# append pop
#
# RLock (Recursion)
# acquire
# , ,
# , , , , . ,
# from threading import Lock, RLock
# rl = RLock()
# rl.acquire()
# rl.acquire()
# rl.acquire()
# rl.acquire()
# print(' ')
# rl.acquire()
# rl.acquire()
# rl.acquire()
# rl.acquire()
# l = Lock()
# l.acquire()
# print(' ')
# l.release()
# from threading import RLock, Thread
#
# def func(i, lock):
# lock.acquire()
# lock.acquire()
# lock.acquire()
# print(f'{i}:start')
# lock.release()
# # lock.release()
# # lock.release()
# print(f'{i}:end')
# lock = RLock()
# for i in range(5):
# Thread(target=func, args=(i, lock)).start()
#
#
from threading import Lock, Thread, RLock
import time
# noodle_lock = Lock()
# fork_lock = Lock()
noodle_lock = fork_lock = RLock() #
def eat(name):
noodle_lock.acquire() #
print(f'{name} ')
fork_lock.acquire()
print(f'{name} ')
print(f'{name} ')
time.sleep(0.1)
fork_lock.release()
print(f'{name} ')
noodle_lock.release()
print(f'{name} ')
def eat2(name):
fork_lock.acquire()
print(f'{name} ')
noodle_lock.acquire() #
print(f'{name} ')
print(f'{name} ')
time.sleep(0.1)
noodle_lock.release()
print(f'{name} ')
fork_lock.release()
print(f'{name} ')
Thread(target=eat, args=('alex',)).start()
Thread(target=eat2, args=('taibai',)).start()
Thread(target=eat, args=('wusie',)).start()
Thread(target=eat2, args=('dazhuang',)).start()
# :
# ,
# , ,
# import queue #
#
# q = queue.Queue(4) # FIFO
# # q.put(1)
# q.put(2)
# # q.get()
# q.put(3)
# q.put(4)
# print('4 done')
# q.put(5)
# print('5 done')
# q.get_nowait()
# q.put_nowait()
from queue import LifoQueue, PriorityQueue
# LifoQueue last in first out
# PriorityQueue
priq = PriorityQueue()
priq.put((2, 'alex'))
priq.put((5, 'xia'))
priq.put((0, 'ming'))
print(priq.get())
print(priq.get())
print(priq.get())
# (0, 'ming')
# (2, 'alex')
# (5, 'xia')
#
# Queue
# LifoQueue
# PriorityQueue
#
# ,
# ,
#
# / ,
# ,
# / /
# / ,
# ,
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from threading import current_thread
import time
import os
# threading
# multiprocessing threading Pool
# concurrent.futures /
# def func(a, b):
# print(current_thread().ident, 'start', a, b)
# time.sleep(1) # ,
# print(current_thread().ident, 'end')
#
#
# tp = ThreadPoolExecutor(4)
# for i in range(20):
# tp.submit(func, i, b=i+1) # ,
#
# , submit
#
def func(a, b):
print(current_thread().ident, 'start', a, b)
time.sleep(1) # ,
print(current_thread().ident, 'end')
return a*b
if __name__ == '__main__':
pp = ProcessPoolExecutor(4)
future_l = []
for i in range(20):
ret = pp.submit(func, i, b=i+1) # ,
# print(ret) # , , result
# print(ret.result())
future_l.append(ret) #
for ret in future_l:
print(ret.result())
# map
# ret = pp.map(func, range(20))
# for key in ret:
# print(key)
# ?????????
"""
IO ,
"""
# IO
# gevent greenlet + IO
# asyncio yeild
# import gevent
#
# def func(): # IO , func gevent
# print('start func')
# gevent.sleep(1)
# print('end func')
#
# g1 = gevent.spawn(func)
# g2 = gevent.spawn(func)
# g3 = gevent.spawn(func)
# gevent.joinall([g1, g2, g3])
# # g1.join() # g1
# # g1.join() # g1
# # g1.join() # g1
from gevent import monkey
monkey.patch_all()
import time
import gevent
def func(): # IO , func gevent
print('start func')
time.sleep(1)
print('end func')
g1 = gevent.spawn(func)
g2 = gevent.spawn(func)
g3 = gevent.spawn(func)
gevent.joinall([g1, g2, g3])
# g1.join() # g1
# g1.join() # g1
# g1.join() # g1
import asyncio
async def func(name):
print(f'start{name}')
# await
# await async , py3.8
await asyncio.sleep(1)
print('end')
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([func('alex'), func('taibai')])) #
#