実例解析によるPython RPCの原理と方法


単スレッド同期
  • socketを用いてデータを転送する
  • jsonプログレッシブメッセージ
  • を使用する。
  • structはメッセージをバイナリバイト列に符号化し、ネットワーク伝送を行う

  • メッセージプロトコル
    
    //   
    {
      in: "ping",
      params: "ireader 0"
    }
    
    //   
    {
      out: "pong",
      result: "ireader 0"
    }
    クライアントclient.py
    
    # coding: utf-8
    # client.py
    
    import json
    import time
    import struct
    import socket
    
    
    def rpc(sock, in_, params):
      response = json.dumps({"in": in_, "params": params}) #      
      length_prefix = struct.pack("I", len(response)) #       
      sock.sendall(length_prefix)
      sock.sendall(response)
      length_prefix = sock.recv(4) #       
      length, = struct.unpack("I", length_prefix)
      body = sock.recv(length) #      
      response = json.loads(body)
      return response["out"], response["result"] #          
    
    if __name__ == '__main__':
      s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
      s.connect(("localhost", 8080))
      for i in range(10): #     10 rpc  
        out, result = rpc(s, "ping", "ireader %d" % i)
        print out, result
        time.sleep(1) #   1s,    
      s.close() #     

    サービスエンドblocking_single.py
    
    # coding: utf8
    # blocking_single.py
    
    import json
    import struct
    import socket
    
    
    def handle_conn(conn, addr, handlers):
      print addr, "comes"
      while True: #     
        length_prefix = conn.recv(4) #       
        if not length_prefix: #      
          print addr, "bye"
          conn.close()
          break #     ,       
        length, = struct.unpack("I", length_prefix)
        body = conn.recv(length) #      
        request = json.loads(body)
        in_ = request['in']
        params = request['params']
        print in_, params
        handler = handlers[in_] #        
        handler(conn, params) #     
    
    
    def loop(sock, handlers):
      while True:
        conn, addr = sock.accept() #     
        handle_conn(conn, addr, handlers) #     
    
    
    def ping(conn, params):
      send_result(conn, "pong", params)
    
    
    def send_result(conn, out, result):
      response = json.dumps({"out": out, "result": result}) #      
      length_prefix = struct.pack("I", len(response)) #       
      conn.sendall(length_prefix)
      conn.sendall(response)
    
    
    if __name__ == '__main__':
      sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #     TCP   
      sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) #   reuse addr  
      sock.bind(("localhost", 8080)) #     
      sock.listen(1) #        
      handlers = { #        
        "ping": ping
      }
      loop(sock, handlers) #       

    マルチスレッド同期
  • スレッドライブラリthreadを用いて原生スレッド
  • を作成する。
  • サーバは、複数のクライアント
  • を並列に処理することができる。
    サービスエンドmultihread.py

    プロセス同期
  • PythonのGILは、単一のプロセスがCPUのコアをいっぱい占めているだけであり、マルチスレッドがマルチコアの利点を利用できないことをもたらしている。
  • OS.fork()は、サブプロセス
  • を生成する。
  • サブルーチンが終了した後、親プロセスはwaitpidシステムを使用して刈り取りプロセスを呼び出さなければならず、これをゾンビリソースと呼ぶことを防止する
  • サブルーチンでサーバソケットを閉じた後、親プロセスでもサーバソケット
  • を閉じます。
  • プロセスforkの後、親子プロセスは自分のソケット参照がカーネルに向けられている同じソケットオブジェクトを持っています。ソケット参照は2と数えられます。ソケットプロセスcloseに対して、ソケットオブジェクトの参照は1カウントダウンします。
  • PreForking同期
  • プロセスはスレッドに比べてリソースを消費し、PreForkingプロセス・プールモデルによってサーバ開発のプロセス数を制限し、サーバ負荷が過重な
  • を回避する。
  • 並列接続数がpreforkプロセス数を超えた場合、その後のクライアント要求はブロックされます。
  • プロセス
  • は、イベントポーリングAPIを通じて、関連ソケットに応答する読み書きイベントがあるかどうかを問い合わせる。ある場合は、イベントリストを持って戻り、ない場合は、ブロック
  • をブロックする。
  • 読み書きイベントを取得した後、イベントに関するソケットを読み書きすることができます。
  • 読み書きバッファ
  • を設定します。
  • NFingx/Nodejs/Redisは、非同期モデル
  • に基づいている。
  • 非同期モデルは符号化コストが高く、ミスしやすいです。通常は会社の業務コードに同期モデルを採用しています。高同時高性能を重視する場合のみ、非同期モデルを使用します。
  • PreForking非同期
    Tornado/Ngixは多プロセスProForking非同期モデルを採用しており、高合併処理能力を持っています。

    以上が本文の全部です。皆さんの勉強に役に立つように、私たちを応援してください。