RPCXフレーム
RPCX RPCXはGolangがjava生態圏の流行の枠組みを参考にして構築した機能豊富なマイクロサービスプラットフォームであり、高性能で誤りを許容できるプラグイン式のRPCフレームワークを実現した. RPCXの目標:簡単:学習しやすく、開発しやすく、統合しやすく、発表しやすい高性能:grpc-goをはるかに上回っており、dubboとmotanサービスの発見とサービスガバナンスは言うまでもない:大規模なマイクロサービスクラスタのプラットフォーム間の開発を容易にする:rpcx 3.0の下層は標準rpcライブラリではなく、プラットフォーム間のバイナリプロトコルを採用している.効率的だが便利な多言語開発 その特徴は、 を含む. RPCX仕様: サービス登録:Server.Register()とServer.RegisterName() Rpcxでは、純粋な関数をサービスとして登録することもサポートされています.関数は、エクスポート可能またはエクスポート不可の3つのパラメータを受け入れることができます.1つ目はcontext.contextタイプで、他の2つはエクスポート可能(または内蔵)のタイプです.3番目のパラメータは、ポインタにerrorタイプの戻り値RegisterFunction("service",function")/純関数をサービス に登録するサービス起動:
サービス側:tokenの検証に使用する認証関数を定義する必要があります.server.AuthFuncは検証関数を受信します. RPCXには、4つのエラーパターンが含まれています.1、FailFastは、1つのノードへのアクセスエラーが発生するとすぐにエラーを返します.2、Failtryは、1つのノードへのアクセスが成功するまで、または最大アクセス回数に達するまで繰り返しアクセスします.3、FailBackup:この場合、アクセスしたノードが一定時間以内に結果を返さない場合、別のノードにアクセスします.このモードは、2つのノード4にのみ適用され、Failoverは、1つのノードへのアクセスにエラーが発生した場合、ノードへのアクセスが成功するか、失敗した最大しきい値に達するまで、他の打鍵ノードへのアクセスを継続します. 失敗モード設定はXclient作成時に設定が、同期モードのみのため、非同期モードではこのパラメータは意味のない である.アクセス時間と再試行回数Optionの関連オプションで を設定する.
Forkモード このモードはFailBackupに似ていますが、サービスを所有するすべてのノードに対して、ノードが正常に戻るとクライアントに返されます.2つに限らず、xclient.Fork()を使用してパラメータを直接呼び出します.Call と同じです.
ブロードキャストモードこのモードは、すべてのサービスノードが正常に戻ることを要求し、任意のノードエラーがある場合、エラー を返す.
ルートサービス規模が大きい場合、通常、負荷等化のためにマイクロサービスクラスタがあり、この場合、1つのサービスが複数のノードに配置され、クライアントがサービスにアクセスするノードを選択するプロセスがRPCXのルーティングである. 分類: Random_selector:ランダム選択ノードアクセス(RandomSelect) Roundrobin_selectorポーリング方式アクセス、各ノードが(RoundRobin) にアクセスできることを保証する Weighted_selector:Nginxと同じようにスムーズなウェイトベースのポーリング(WeightedRoundRobin) Ping_selector:ネットワーク品質優先、順次ping選択ネットワーク品質最高(WeightedICMP) hash_selector:コンシステンシハッシュ、JumpConsistentHashを使用してノードを選択し、同じサービスPath、サービスMethod、およびパラメータが同じノードにルーティングされます.JumpConsistentHashはコンシステンシハッシュを迅速に計算するアルゴリズムですが、ノードを削除できないという欠点があり、ノードを削除するとルーティングが不正確になるので、ノードが変動したときにコンシステンシハッシュを再計算します.(ConsistentHash) geo_selector:地理的優先:サービス登録時に所在する経緯度を指定し、経緯度の相対位置に基づいて最適な を選択する必要がある user_selecor:カスタムルーティング規則(SelectByUser) ルーティング設定Xclient作成時に を指定する.
メタデータ(metadata)メタデータは、通信中の要求または応答データではなく、補助ラベルなどの情報であり、文字列キー値ペアである. クライアントが送信メタデータを読み込むにはコンテキストでshare.ReqMetaDataKeyを設定する必要があり、受信にはshare.ResMetaDataKeyを設定する必要がある:=context.WithValue(context.Background()、share.ReqMetaDataKey、map[string]string{"test")ctx=context.WithValue(ctx,share.ResMetaDataKey,make(map[string]string)) サービス:reqMeta:=ctx.Value(share.ReqMetaDataKey).(map[string]string)resMeta:=ctx.Value(share.ResMetaDataKey).(map[string]string) Option
1、 go ,
2、 , 、tracing
3、 TCP、HTTP、QUIC、KCP
4、 , JSON、Protobuf、MessagePack
5、 、 、zookeeper、etcd、consul、mDNS
6、 Failover、Failfast、Failtry
7、 、 、 、 、
8、
9、 ( )
10、
11、 heartbeat
12、 metrics、log、timeout、 、 、TLS
:
1、
2、 (context.Context,args ,args2),
3、
* ,
* : TCP/UDP ,
* func NewServer(options ...OptionFn) *Server {
s := &Server{
Plugins: &pluginContainer{},
options: make(map[string]interface{}),
}
for _, op := range options {
op(s)
}
return s
}// , Server{}
type Server struct {
Plugins PluginContainer//
// AuthFunc ,
AuthFunc func(ctx context.Context, req *protocol.Message, token string) error
//
}
//rpcx OptionFn :
func WithReadTimeout(readTimeout time.Duration) OptionFn//
func WithTLSConfig(cfg *tls.Config) OptionFn//tls
func WithWriteTimeout(writeTimeout time.Duration) OptionFn//
* config := &tls.Config{Certificates: []tls.Certificate{cert}}
s := server.NewServer(server.WithTLSConfig(config))//TLS TCP
func (s *Server) Close() error
func (s *Server) RegisterOnShutdown(f func())
func (s *Server) Serve(network, address string) (err error)// TCP/UDP
func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request)// HTTP
* rpcx
tcp:
http: http
unix: unix domain sockets
reuseport: SO_REUSEPORT socket , Linux kernel 3.9+
quic: support quic protocol
kcp: sopport kcp protocol
* :
d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()
xclient.Call()
func (client *Client) Call(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}) error
* :
func (client *Client) Close() error
func (c *Client) Connect(network, address string) error
func (client *Client) Go(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call
func (client *Client) IsClosing() bool
func (client *Client) IsShutdown() bool
* XClient Client ,
Broadcast , 。 FailMode SelectMode 。 。
Fork , 。 FailMode SelectMode 。
* rpcx network @ Host: port 。 network tcp , http ,unix ,quic kcp。 Host IP 。
#######
1、Peer2Peer
2、MultiServer:
:
d := client.NewMultipleServersDiscovery([]*client.KVPair{{Key: *addr1}, {Key: *addr2}})
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()
MultipleServersDiscovery.Update()
3、ZooKeeper
.....
####### rpcx 4 :
* Json
* Protobuffer//google
* MsgPack//
* SerializeNone// , []byte
, json json
####### :
```bash
MsgPack
type MsgpackCodec struct{}//
// Encode encodes an object into slice of bytes.
func (c MsgpackCodec) Encode(i interface{}) ([]byte, error) {
var buf bytes.Buffer
enc := msgpack.NewEncoder(&buf)
//enc.UseJSONTag(true)
err := enc.Encode(i)
return buf.Bytes(), err
}
// Decode decodes an object from slice of bytes.
func (c MsgpackCodec) Decode(data []byte, i interface{}) error {
dec := msgpack.NewDecoder(bytes.NewReader(data))
//dec.UseJSONTag(true)
err := dec.Decode(i)
return err
}
// ,
Gob:
func main() {
flag.Parse()
share.Codecs[protocol.SerializeType(4)] = &GobCodec{}//
s := server.NewServer()
//s.RegisterName("Arith", new(example.Arith), "")
s.Register(new(example.Arith), "")
s.Serve("tcp", *addr)
}
type GobCodec struct {
}
func (c *GobCodec) Decode(data []byte, i interface{}) error {
enc := gob.NewDecoder(bytes.NewBuffer(data))
err := enc.Decode(i)
return err
}
func (c *GobCodec) Encode(i interface{}) ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err := enc.Encode(i)
return buf.Bytes(), err
}
資格認定サービス側:tokenの検証に使用する認証関数を定義する必要があります.server.AuthFuncは検証関数を受信します.
nc auth(ctx context.Context, req *protocol.Message, token string) error {
if token == "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJwYXNzd29yZCI6IjE5OTgzNyIsInVzZXJuYW1lIjoiaGFkZXMifQ.zg1dDuhn393DVyIH_1s7eSj74JsBw_vLYbUiq72tAm8" {
return nil
}
return errors.New("wrong token")
}
s.AuthFunc=auth
:
client.Auth(token)// Auth()
エラーモードForkモード
ブロードキャストモード
ルート
メタデータ(metadata)
type Option struct {
// group
Group string
/*
option.Group , , server.Register("Arith","group=test")
option.Group="test"
*/
//
Retries int
// tcp quic tls
TLSConfig *tls.Config
// kcp.BlockCrypt
Block interface{}
// RPCPath for http connection
RPCPath string
/
ConnectTimeout time.Duration
//
ReadTimeout time.Duration
//
WriteTimeout time.Duration
// BackupLatency is used for Failbackup mode. rpcx will sends another request if the first response doesn't return in BackupLatency time.// FailBackup
BackupLatency time.Duration
//
GenBreaker func() Breaker
/*
Breaker
, , ,
*/
SerializeType protocol.SerializeType//
CompressType protocol.CompressType
Heartbeat bool
HeartbeatInterval time.Duration
/*
* :
option := client.DefaultOption
option.Heartbeat = true
option.HeartbeatInterval = time.Second
*/
}