Python学習ノート:スレッドとプロセス(合)、分散プロセス


前言
最近深く勉强して、すでにいくつかの模型を出しましたが、Pyhtonの基础がしっかりしていないので、Pythonの补习を始めました.Pythonのインストールについては、Pythonのインストールについて説明します.Pythonのインストールについては、Pythonのインストールについて説明します.Pythonのインストールについては、Pythonの実行モードと入出力については、Python IO Pythonの基本概念について説明します.PythonベースのPython文字列とコードについて説明します.Python文字列と符号化Pythonの基本データ構造:listとtupleの紹介を参照してください.この紹介を参照してください.Python listとtuple Python制御文の紹介:ifelse、この紹介を参照してください.Python条件判断Python制御文の紹介:ループ実装、Pythonループ文Pythonデータ構造:dictとset紹介Pythonデータ構造dictとset Python関数関連:Python関数Python高次特性:Python高級特性Python高次関数:Python高次関数Python匿名関数:Python匿名関数Python装飾器:Python装飾器Python偏関数:Python偏関数Pythonモジュール:PythonモジュールPythonオブジェクト向けプログラミング(1):Pythonオブジェクト向けPythonオブジェクト向けプログラミング(2):Pythonオブジェクト向け(2)Pythonオブジェクト向けプログラミング(3):Pythonオブジェクト向け(3)Pythonオブジェクト向けプログラミング(4):Pyhtonオブジェクト向け(4)Pythonオブジェクト向け高度プログラミング(上):Pythonオブジェクト向け高度プログラミング(上):Pythonオブジェクト向けプログラミング(上)Pythonオブジェクト向け高度プログラミング(中上):Pythonオブジェクト向け高度プログラミング(中上)Pythonオブジェクト向け高度プログラミング(中下):Pythonオブジェクト向け高度プログラミング(中下)Pythonオブジェクト向け高度プログラミング(完):Pythonオブジェクト向け高度プログラミング(完)Pythonエラーデバッグ(起):Pythonデバッグ:起Pythonエラーデバッグ(承):Pythonデバッグ:承Pythonエラーデバッグ(転):Pythonデバッグ:転Pythonエラーデバッグ(合):pythonデバッグ:合PythonファイルIOプログラミング:PythonファイルIO PythonファイルIOプログラミング2:PythonファイルIO 2 PythonファイルIOプログラミング3:PYthonファイルIO 3 Pythonプロセスとスレッド(起):Pythonプロセスとスレッド起Pythonプロセスとスレッド(承):PythonプロセスとスレッドPythonプロセスとスレッド(転):Pythonプロセスとスレッド転
目次:
  • 前言
  • ディレクトリ:
  • 分散プロセス
  • 小結

  • 分散プロセス
    ThreadおよびProcessでは、Processがより安定であり、かつ、Processが複数の機器に分散することができ、Threadは最大で同じ機器の複数のCPUにしか分散できないため、Processが好ましい.Pythonのmultiprocessingモジュールはマルチプロセスをサポートするだけでなく、managersサブモジュールはマルチプロセスを複数のマシンに分散することもサポートします.1つのサービス・プロセスは、スケジューラとして、ネットワーク通信に依存して他の複数のプロセスにタスクを分散することができる.managersモジュールはパッケージングがよいため,ネットワーク通信の詳細を理解する必要がなく,分散型マルチプロセスプログラムを容易に記述できる.例えば、Queue通信を介したマルチプロセスプログラムが同じマシン上で実行されている場合、現在、タスクを処理するプロセスタスクが重いため、タスクを送信するプロセスとタスクを処理するプロセスを2台のマシンに分散したいと考えています.分散プロセスでどのように実現しますか?従来のQueueは引き続き使用可能であったが,managersモジュールを介してQueueをネットワークを介して暴露することで,他の機器のプロセスにQueueにアクセスさせることができるようになった.まずサービスプロセスを見てみましょう.サービスプロセスはQueueを起動し、Queueをネットワークに登録し、Queueにタスクを書き込みます.
    # task_master.py
    
    import random, time, queue
    from multiprocessing.managers import BaseManager
    
    #        :
    task_queue = queue.Queue()
    #        :
    result_queue = queue.Queue()
    
    #  BaseManager   QueueManager:
    class QueueManager(BaseManager):
        pass
    
    #    Queue       , callable     Queue  :
    QueueManager.register('get_task_queue', callable=lambda: task_queue)
    QueueManager.register('get_result_queue', callable=lambda: result_queue)
    #     5000,      'abc':
    manager = QueueManager(address=('', 5000), authkey=b'abc')
    #   Queue:
    manager.start()
    #          Queue  :
    task = manager.get_task_queue()
    result = manager.get_result_queue()
    #        :
    for i in range(10):
        n = random.randint(0, 10000)
        print('Put task %d...' % n)
        task.put(n)
    #  result      :
    print('Try get results...')
    for i in range(10):
        r = result.get(timeout=10)
        print('Result: %s' % r)
    #   :
    manager.shutdown()
    print('master exit.')

    1台のマシンにマルチプロセスプログラムを書く場合、作成したQueueは直接使用できますが、分散型マルチプロセス環境ではQueueにタスクを追加して元のtask_に直接使用することはできません.Queueは操作を行い、それでQueueManagerのパッケージを迂回し、manager.を通過しなければならない.get_task_Queue()で取得したQueueインタフェースを追加します.
    次に、別のマシンでタスクプロセスを開始します(ローカルで起動しても構いません):
    # task_worker.py
    
    import time, sys, queue
    from multiprocessing.managers import BaseManager
    
    #      QueueManager:
    class QueueManager(BaseManager):
        pass
    
    #     QueueManager       Queue,          :
    QueueManager.register('get_task_queue')
    QueueManager.register('get_result_queue')
    
    #       ,     task_master.py   :
    server_addr = '127.0.0.1'
    print('Connect to server %s...' % server_addr)
    #            task_master.py       :
    m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
    #      :
    m.connect()
    #   Queue   :
    task = m.get_task_queue()
    result = m.get_result_queue()
    #  task     ,      result  :
    for i in range(10):
        try:
            n = task.get(timeout=1)
            print('run task %d * %d...' % (n, n))
            r = '%d * %d = %d' % (n, n, n*n)
            time.sleep(1)
            result.put(r)
        except Queue.Empty:
            print('task queue is empty.')
    #     :
    print('worker exit.')

    タスクプロセスは、ネットワークを介してサービスプロセスに接続されるため、サービスプロセスのIPを指定します.
    これで、分散プロセスの作業効果を試してみましょう.task_を先に起動master.pyサービスプロセス:
    $ python3 task_master.py 
    Put task 3411...
    Put task 1605...
    Put task 1398...
    Put task 4729...
    Put task 5300...
    Put task 7471...
    Put task 68...
    Put task 4219...
    Put task 339...
    Put task 7866...
    Try get results...

    task_master.pyプロセスは、タスクを送信した後、resultキューの結果を待機します.task_を開始しますworker.pyプロセス:
    $ python3 task_worker.py
    Connect to server 127.0.0.1...
    run task 3411 * 3411...
    run task 1605 * 1605...
    run task 1398 * 1398...
    run task 4729 * 4729...
    run task 5300 * 5300...
    run task 7471 * 7471...
    run task 68 * 68...
    run task 4219 * 4219...
    run task 339 * 339...
    run task 7866 * 7866...
    worker exit.

    task_worker.pyプロセス終了、task_master.pyプロセスでは、結果が印刷され続けます.
    Result: 3411 * 3411 = 11634921
    Result: 1605 * 1605 = 2576025
    Result: 1398 * 1398 = 1954404
    Result: 4729 * 4729 = 22363441
    Result: 5300 * 5300 = 28090000
    Result: 7471 * 7471 = 55815841
    Result: 68 * 68 = 4624
    Result: 4219 * 4219 = 17799961
    Result: 339 * 339 = 114921
    Result: 7866 * 7866 = 61873956

    この簡単なMaster/Workerモデルは何の役に立ちますか?実はこれは1つの簡単ですが本当の分布式の計算で、コードを少し改造して、複数のworkerを起動して、任務を何台か甚だしきに至っては数十台の機械の上で分布することができて、例えばn*nを計算するコードをメールを送信することに変えて、メールのキューの非同期の送信を実現しました.
    Queueオブジェクトはどこに保存されますか?task_に気づくworker.pyにはQueueを作成するコードがまったくないので、Queueオブジェクトはtask_に格納されます.master.pyプロセス:
                                                 │
    ┌─────────────────────────────────────────┐     ┌──────────────────────────────────────┐
    │task_master.py                           │  │  │task_worker.py                        │
    │                                         │     │                                      │
    │  task = manager.get_task_queue()        │  │  │  task = manager.get_task_queue()     │
    │  result = manager.get_result_queue()    │     │  result = manager.get_result_queue() │
    │              │                          │  │  │              │                       │
    │              │                          │     │              │                       │
    │              ▼                          │  │  │              │                       │
    │  ┌─────────────────────────────────┐    │     │              │                       │
    │  │QueueManager                     │    │  │  │              │                       │
    │  │ ┌────────────┐ ┌──────────────┐ │    │     │              │                       │
    │  │ │ task_queue │ │ result_queue │ │

    Queueがネットワークを介してアクセスできるのは,QueManagerによって実現される.QueueManagerはQueueを1つ以上管理しているため、get_task_queue.
    authkeyは何の役に立つの?これは2台の機械が正常に通信し、他の機械に悪意を持って干渉されないことを保証するためだ.task_worker.pyのauthkeyとtask_master.pyのauthkeyが一致せず、接続できないに違いない.
    小結
    Pythonの分散プロセスインタフェースは簡単で、パッケージが良好で、煩雑なタスクを複数の機械に分散する必要がある環境に適しています.Queueの役割は、各タスクの記述データ量をできるだけ小さくするために、タスクを伝達し、結果を受信するために使用されることに注意してください.たとえば、ログ・ファイルを処理するタスクを送信すると、数百兆のログ・ファイル自体を送信するのではなく、ログ・ファイルが格納されている完全なパスを送信し、Workerプロセスが共有するディスク上でファイルを読み取ります.