python分散プロセス管理

2291 ワード

分散プロセス管理の理解マルチプロセスを複数のマシンに分散し、1つのサービスプロセスをスケジューラとして、ネットワーク通信によってタスクを複数のプロセスに割り当てることができ、Queue通信を介したマルチプロセスプログラムがあれば、タスクを送信するプロセスと処理タスクのプロセスを2つの異なるホストに分散し、既存のqueueを引き続き使用することができる.managersモジュールを介してQueueをネットワークを介して露出するだけで,他の機器のプロセスにQueueにアクセスさせることができる.サービスプロセス:Queueを起動し、Queueをネットワークに登録し、Queueにタスクを書きます.管理側
import random
from queue import Queue
from multiprocessing.managers import BaseManager

# 1.                 
task_queue = Queue()
result_queue = Queue()


# 2.            ,            ;
class QueueManager(BaseManager):
    pass


# 3.             ,
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)


#4.       5001,                 /   ;
#       0~65535
manager = QueueManager(address=('', 9999), authkey=b'westos')

# 5.           
manager.start()

# 6.            
task = manager.get_task_queue()
result = manager.get_result_queue()



# 7.        ;
for i in range(100):
    n = random.randint(1, 1000)
    print('        : %s' %(n))
    task.put(n)


# 8.  result        ;
for j in range(100):
    r = result.get()
    print("       : %s" %(r))


# 8.   
manager.shutdown()


被管理者側が分散環境でクエストをQueueに追加した場合、元のtask_に直接タスクを追加することはできません.Queueは操作を行い、それでQueueManagerのパッケージを迂回し、manager.を通過しなければならない.get_task_Queue()取得したQueueインタフェースの追加

from multiprocessing.managers import BaseManager
import queue

#1.  Queue
import time


class QueueManager(BaseManager):
    pass


#2.            ,              

QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

#3.       
server_addr='172.25.254.77'
print("       %s......" %(server_addr))

#4.               ;
m=QueueManager(address=(server_addr,9999),authkey=b'westos')

#5.    
m.connect()


#6.  Queue  
task=m.get_task_queue()
result=m.get_result_queue()

#7.          
for i in range(100):
    try:
        n=task.get()
        #      
        print('run task %d*%d...' %(n,n))
        time.sleep(1)
        r='%d * %d = %d' %(n,n,n**2)
        #               
        result.put(r)
    except queue.Empty:
        print('      ..')
print('    ')