RPCXフレーム

21270 ワード

RPCX
  • RPCXはGolangがjava生態圏の流行の枠組みを参考にして構築した機能豊富なマイクロサービスプラットフォームであり、高性能で誤りを許容できるプラグイン式のRPCフレームワークを実現した.
  • RPCXの目標:簡単:学習しやすく、開発しやすく、統合しやすく、発表しやすい高性能:grpc-goをはるかに上回っており、dubboとmotanサービスの発見とサービスガバナンスは言うまでもない:大規模なマイクロサービスクラスタのプラットフォーム間の開発を容易にする:rpcx 3.0の下層は標準rpcライブラリではなく、プラットフォーム間のバイナリプロトコルを採用している.効率的だが便利な多言語開発
  • その特徴は、
  • を含む.
            1、    go  ,         
            2、      ,            、tracing 
            3、   TCP、HTTP、QUIC、KCP   
            4、        ,   JSON、Protobuf、MessagePack         
            5、            、     、zookeeper、etcd、consul、mDNS       
            6、     Failover、Failfast、Failtry     
            7、          、    、     、       、                     
            8、    
            9、        (   )
            10、      
            11、    heartbeat     
            12、  metrics、log、timeout、  、   、TLS   
    
  • RPCX仕様:
  •            :
            1、   
            2、      (context.Context,args ,args2),              
            3、        
        *              ,                  
    
        *          :       TCP/UDP        ,           
            *   func NewServer(options ...OptionFn) *Server {
                    s := &Server{
                        Plugins: &pluginContainer{},
                        options: make(map[string]interface{}),
                    }
    
                    for _, op := range options {
                        op(s)
                    }
                    return s
                }//  ,       Server{}            
                    type Server struct {
                        Plugins PluginContainer//          
                        // AuthFunc       ,        
                        AuthFunc func(ctx context.Context, req *protocol.Message, token string) error
                        //               
                        }
                //rpcx        OptionFn    :
                    func WithReadTimeout(readTimeout time.Duration) OptionFn//   
                    func WithTLSConfig(cfg *tls.Config) OptionFn//tls  
                    func WithWriteTimeout(writeTimeout time.Duration) OptionFn//   
                *   config := &tls.Config{Certificates: []tls.Certificate{cert}}
                     s := server.NewServer(server.WithTLSConfig(config))//TLS  TCP 
    
  • サービス登録:Server.Register()とServer.RegisterName()
  • Rpcxでは、純粋な関数をサービスとして登録することもサポートされています.関数は、エクスポート可能またはエクスポート不可の3つのパラメータを受け入れることができます.1つ目はcontext.contextタイプで、他の2つはエクスポート可能(または内蔵)のタイプです.3番目のパラメータは、ポインタにerrorタイプの戻り値RegisterFunction("service",function")/純関数をサービス
  • に登録する
  • サービス起動:
  •            func (s *Server) Close() error
               func (s *Server) RegisterOnShutdown(f func())
               func (s *Server) Serve(network, address string) (err error)//  TCP/UDP     
               func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request)// HTTP     
                 
              
           * rpcx       
         
               tcp:     
               http:     http    
               unix: unix domain sockets
               reuseport:    SO_REUSEPORT socket   ,     Linux kernel 3.9+
               quic: support quic protocol
               kcp: sopport kcp protocol
    
       *          :
           d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
           xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
           defer xclient.Close()
           xclient.Call()
           func (client *Client) Call(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}) error
       *       :
               
               func (client *Client) Close() error
               func (c *Client) Connect(network, address string) error
               func (client *Client) Go(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call
               func (client *Client) IsClosing() bool
               func (client *Client) IsShutdown() bool
           *   XClient  Client   ,  
           Broadcast             ,                。  FailMode   SelectMode       。          。
           Fork             ,                。  FailMode   SelectMode       。
    *   rpcx  network @ Host: port        。 network    tcp , http ,unix ,quic kcp。 Host       IP  。
    
    #######       
    1、Peer2Peer
    2、MultiServer:
                :
       d := client.NewMultipleServersDiscovery([]*client.KVPair{{Key: *addr1}, {Key: *addr2}})
       xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
       defer xclient.Close()
       MultipleServersDiscovery.Update()       
    3、ZooKeeper
    .....
    #######   rpcx  4      :
       *   Json
       *   Protobuffer//google
       *   MsgPack//         
       *   SerializeNone//       ,     []byte
                          ,     json          json
    #######         :
    ```bash
      MsgPack
    type MsgpackCodec struct{}//           
    
    // Encode encodes an object into slice of bytes.
    func (c MsgpackCodec) Encode(i interface{}) ([]byte, error) {
       var buf bytes.Buffer
       enc := msgpack.NewEncoder(&buf)
       //enc.UseJSONTag(true)
       err := enc.Encode(i)
       return buf.Bytes(), err
    }
    
    // Decode decodes an object from slice of bytes.
    func (c MsgpackCodec) Decode(data []byte, i interface{}) error {
       dec := msgpack.NewDecoder(bytes.NewReader(data))
       //dec.UseJSONTag(true)
       err := dec.Decode(i)
       return err
    }
    //                 ,                        
    
         Gob: 
    
    func main() {
       flag.Parse()
    
       share.Codecs[protocol.SerializeType(4)] = &GobCodec{}//       
       s := server.NewServer()
       //s.RegisterName("Arith", new(example.Arith), "")
       s.Register(new(example.Arith), "")
       s.Serve("tcp", *addr)
    }
    
    type GobCodec struct {
    }
    
    func (c *GobCodec) Decode(data []byte, i interface{}) error {
       enc := gob.NewDecoder(bytes.NewBuffer(data))
       err := enc.Decode(i)
       return err
    }
    
    func (c *GobCodec) Encode(i interface{}) ([]byte, error) {
       var buf bytes.Buffer
       enc := gob.NewEncoder(&buf)
       err := enc.Encode(i)
       return buf.Bytes(), err
    }
            
    
    資格認定
    サービス側:tokenの検証に使用する認証関数を定義する必要があります.server.AuthFuncは検証関数を受信します.
    nc auth(ctx context.Context, req *protocol.Message, token string) error {
        if token == "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJwYXNzd29yZCI6IjE5OTgzNyIsInVzZXJuYW1lIjoiaGFkZXMifQ.zg1dDuhn393DVyIH_1s7eSj74JsBw_vLYbUiq72tAm8" {
            return nil
        }
     return errors.New("wrong token")
    }
    s.AuthFunc=auth
       :
    client.Auth(token)//    Auth()          
    
    
    エラーモード
  • RPCXには、4つのエラーパターンが含まれています.1、FailFastは、1つのノードへのアクセスエラーが発生するとすぐにエラーを返します.2、Failtryは、1つのノードへのアクセスが成功するまで、または最大アクセス回数に達するまで繰り返しアクセスします.3、FailBackup:この場合、アクセスしたノードが一定時間以内に結果を返さない場合、別のノードにアクセスします.このモードは、2つのノード4にのみ適用され、Failoverは、1つのノードへのアクセスにエラーが発生した場合、ノードへのアクセスが成功するか、失敗した最大しきい値に達するまで、他の打鍵ノードへのアクセスを継続します.
  • 失敗モード設定はXclient作成時に設定が、同期モードのみのため、非同期モードではこのパラメータは意味のない
  • である.
  • アクセス時間と再試行回数Optionの関連オプションで
  • を設定する.
    Forkモード
  • このモードはFailBackupに似ていますが、サービスを所有するすべてのノードに対して、ノードが正常に戻るとクライアントに返されます.2つに限らず、xclient.Fork()を使用してパラメータを直接呼び出します.Call
  • と同じです.
    ブロードキャストモード
  • このモードは、すべてのサービスノードが正常に戻ることを要求し、任意のノードエラーがある場合、エラー
  • を返す.
    ルート
  • サービス規模が大きい場合、通常、負荷等化のためにマイクロサービスクラスタがあり、この場合、1つのサービスが複数のノードに配置され、クライアントがサービスにアクセスするノードを選択するプロセスがRPCXのルーティングである.
  • 分類:
  • Random_selector:ランダム選択ノードアクセス(RandomSelect)
  • Roundrobin_selectorポーリング方式アクセス、各ノードが(RoundRobin)
  • にアクセスできることを保証する
  • Weighted_selector:Nginxと同じようにスムーズなウェイトベースのポーリング(WeightedRoundRobin)
  • Ping_selector:ネットワーク品質優先、順次ping選択ネットワーク品質最高(WeightedICMP)
  • hash_selector:コンシステンシハッシュ、JumpConsistentHashを使用してノードを選択し、同じサービスPath、サービスMethod、およびパラメータが同じノードにルーティングされます.JumpConsistentHashはコンシステンシハッシュを迅速に計算するアルゴリズムですが、ノードを削除できないという欠点があり、ノードを削除するとルーティングが不正確になるので、ノードが変動したときにコンシステンシハッシュを再計算します.(ConsistentHash)
  • geo_selector:地理的優先:サービス登録時に所在する経緯度を指定し、経緯度の相対位置に基づいて最適な
  • を選択する必要がある
  • user_selecor:カスタムルーティング規則(SelectByUser)
  • ルーティング設定Xclient作成時に
  • を指定する.
    メタデータ(metadata)
  • メタデータは、通信中の要求または応答データではなく、補助ラベルなどの情報であり、文字列キー値ペアである.
  • クライアントが送信メタデータを読み込むにはコンテキストでshare.ReqMetaDataKeyを設定する必要があり、受信にはshare.ResMetaDataKeyを設定する必要がある:=context.WithValue(context.Background()、share.ReqMetaDataKey、map[string]string{"test")ctx=context.WithValue(ctx,share.ResMetaDataKey,make(map[string]string))
  • サービス:reqMeta:=ctx.Value(share.ReqMetaDataKey).(map[string]string)resMeta:=ctx.Value(share.ResMetaDataKey).(map[string]string)
  • Option
    type Option struct {
        // group        
        Group string
        /*
         option.Group           ,         ,                server.Register("Arith","group=test")
                                    option.Group="test"
        */
    
        //     
        Retries int
    
        // tcp quic tls  
        TLSConfig *tls.Config
        // kcp.BlockCrypt
        Block interface{}
        // RPCPath for http connection
        RPCPath string
        /      
        ConnectTimeout time.Duration
        //   
        ReadTimeout time.Duration
        //      
        WriteTimeout time.Duration
    
        // BackupLatency is used for Failbackup mode. rpcx will sends another request if the first response doesn't return in BackupLatency time.//  FailBackup  
        BackupLatency time.Duration
    
        //          
        GenBreaker func() Breaker
        /*
        Breaker
           ,                                     ,        ,        
        */
    
        SerializeType protocol.SerializeType//     
        CompressType  protocol.CompressType
    
        Heartbeat         bool
        HeartbeatInterval time.Duration
        /*
         *                  :
            option := client.DefaultOption
            option.Heartbeat = true
            option.HeartbeatInterval = time.Second
        */
    }