go-microのリリース購読Broker分析
14342 ワード
最近は手に時間があります。go-microの発行購読を継続したいです。microのexamplesを見て疑問があります。go-microは購読プラグインBrokerを提供しています。go-micro自身もPublishとSubscribeの機能を実現しました。
Broker
Brokerはgo-microで定義されている非同期メッセージのインターフェースであり、同時にプラグインの形式を使用して、異なる実装(http,nats,rabbiitmq)間でシームレスに切り替えられます。 Brokerを初期化する( 接続Broker( 準備されたBrokerを使って発行/購読する( Brokerを閉じる( go-microでデフォルトのブロガーが実現
go-microはデフォルトでhttpベースのBrokerが実現しています。直接使用できます。microは具体的なexampleを提供しています。具体的にはsource codeの実現を見てください。
以下はgo-microの中でbroer.goの中でDefault Brokerに関するコードです。
ここにこの部分のコードを書いて、詳しいのは直接go-microの下のhttp.goを参考にすることができます。
Init Registry Registryは登録センターです。もしオプトでregistryが提供されていないなら、go-microを使ってデフォルトで実現されます。 TLSConfig TLSConfigはhttpsの構成に対して、デフォルトはhttp です。
Connect
Subscribe
Publishは、メッセージを作成し、inbox に格納する。は、topicおよびbrookのラベル(ここでは固定http)に基づいて、購読しているserverを検索する(上記の購読モジュールで作成した)。
上には複数のnodeが検索される可能性がありますので、中にはもう一つのバージョンの仕組みがあります。バージョンを指定するとすべてのマッチノードに送信されます。
http postを使用してメッセージを に送信する。
Disconnect
以上がgo-microのデフォルトhttpによるbrookで実現されました。
go-microにおけるbrookに対する包装
brook erのhttpを見て黙認して実現した後、brook erに対して大体の了解ができました。次にgo-microがbrook erに対して作った包装部分を見ています。簡単に使うべきです。
購読
Subscriber
購読比較は直接Brokerで
subscriberの中で具体的な処理は定義の中から見ることができて、中はルートとレスポンスに対応するhandlerを格納して、go-micro/server/subscriber.goで具体的なコードの実現を見ることができる興味があります。
Publisher
発表はgo-microのデフォルトclient実現(rpcuclient)でデフォルトのbrookを定義しました。
締め括りをつける
以上の過程の追跡を経て、最終的にまとめたのは何時ですか? brookはインターフェースを定義しています。microが提供するプラグインの形式はシームレスに代替できます。 を実現します。 go-microは、http に基づくデフォルトのbrookを提供します。 go-microの標準に基づくserver、clientとbroverはセットの更に簡単なpbとsub方法 を包装しました。
Broker
Brokerはgo-microで定義されている非同期メッセージのインターフェースであり、同時にプラグインの形式を使用して、異なる実装(http,nats,rabbiitmq)間でシームレスに切り替えられます。
// Broker is an interface used for asynchronous messaging.
type Broker interface {
Init(...Option) error
Options() Options
Address() string
Connect() error
Disconnect() error
Publish(topic string, m *Message, opts ...PublishOption) error
Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
String() string
}
上記のインターフェースからは、Brokerを使って購読を完了するには、以下のステップが必要であることが分かります。Init
)Connect
)Publish/Subscribe
)Disconnect
)go-microはデフォルトでhttpベースのBrokerが実現しています。直接使用できます。microは具体的なexampleを提供しています。具体的にはsource codeの実現を見てください。
以下はgo-microの中でbroer.goの中でDefault Brokerに関するコードです。
var (
DefaultBroker Broker = NewBroker()
)
func Init(opts ...Option) error {
return DefaultBroker.Init(opts...)
}
func Connect() error {
return DefaultBroker.Connect()
}
func Disconnect() error {
return DefaultBroker.Disconnect()
}
func Publish(topic string, msg *Message, opts ...PublishOption) error {
return DefaultBroker.Publish(topic, msg, opts...)
}
func Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
return DefaultBroker.Subscribe(topic, handler, opts...)
}
func String() string {
return DefaultBroker.String()
}
NewBroker()
によって返されたbrookの例に基づいて作られたパブリック方法のパッケージ化を見ることができます。さらに見てみます。// NewBroker returns a new http broker
func NewBroker(opts ...Option) Broker {
return newHttpBroker(opts...)
}
ここでは、httpで実現されたbrookに直接戻って、newHttpBroker
と続く。ここにこの部分のコードを書いて、詳しいのは直接go-microの下のhttp.goを参考にすることができます。
h := &httpBroker{
id: uuid.New().String(),
address: addr,
opts: options,
r: options.Registry,
c: &http.Client{Transport: newTransport(options.TLSConfig)},
subscribers: make(map[string][]*httpSubscriber),
exit: make(chan chan error),
mux: http.NewServeMux(),
inbox: make(map[string][][]byte),
}
ここの核心はnewです。httpBrokerとして、Brokerインターフェースの実現について、具体的な実現はここでは言いません。Init
func (h *httpBroker) Init(opts ...Option) error {
h.RLock()
if h.running {
h.RUnlock()
return errors.New("cannot init while connected")
}
h.RUnlock()
h.Lock()
defer h.Unlock()
for _, o := range opts {
o(&h.opts)
}
if len(h.opts.Addrs) > 0 && len(h.opts.Addrs[0]) > 0 {
h.address = h.opts.Addrs[0]
}
if len(h.id) == 0 {
h.id = "go.micro.http.broker-" + uuid.New().String()
}
// get registry
reg := h.opts.Registry
if reg == nil {
reg = registry.DefaultRegistry
}
// get cache
if rc, ok := h.r.(cache.Cache); ok {
rc.Stop()
}
// set registry
h.r = cache.New(reg)
// reconfigure tls config
if c := h.opts.TLSConfig; c != nil {
h.c = &http.Client{
Transport: newTransport(c),
}
}
return nil
}
上のコードから見られます。Initの役割は各種の構成を初期化することです。Optionパラメータが提供されているなら、パラメータで提供されています。もしここに一つを設置していないなら、ここに2つの点があります。Connect
func (h *httpBroker) Connect() error {
h.RLock()
if h.running {
h.RUnlock()
return nil
}
h.RUnlock()
h.Lock()
defer h.Unlock()
var l net.Listener
var err error
if h.opts.Secure || h.opts.TLSConfig != nil {
config := h.opts.TLSConfig
fn := func(addr string) (net.Listener, error) {
if config == nil {
hosts := []string{addr}
// check if its a valid host:port
if host, _, err := net.SplitHostPort(addr); err == nil {
if len(host) == 0 {
hosts = maddr.IPs()
} else {
hosts = []string{host}
}
}
// generate a certificate
cert, err := mls.Certificate(hosts...)
if err != nil {
return nil, err
}
config = &tls.Config{Certificates: []tls.Certificate{cert}}
}
return tls.Listen("tcp", addr, config)
}
l, err = mnet.Listen(h.address, fn)
} else {
fn := func(addr string) (net.Listener, error) {
return net.Listen("tcp", addr)
}
l, err = mnet.Listen(h.address, fn)
}
if err != nil {
return err
}
addr := h.address
h.address = l.Addr().String()
go http.Serve(l, h.mux)
go func() {
h.run(l)
h.Lock()
h.opts.Addrs = []string{addr}
h.address = addr
h.Unlock()
}()
// get registry
reg := h.opts.Registry
if reg == nil {
reg = registry.DefaultRegistry
}
// set cache
h.r = cache.New(reg)
// set running
h.running = true
return nil
}
Connectメソッドの主な役割は、Publishを受信するためにHtto Serverを作成するときに送信されるメッセージです。Subscribe
func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
var err error
var host, port string
options := NewSubscribeOptions(opts...)
// parse address for host, port
host, port, err = net.SplitHostPort(h.Address())
if err != nil {
return nil, err
}
addr, err := maddr.Extract(host)
if err != nil {
return nil, err
}
var secure bool
if h.opts.Secure || h.opts.TLSConfig != nil {
secure = true
}
// register service
node := ®istry.Node{
Id: topic + "-" + h.id,
Address: mnet.HostPort(addr, port),
Metadata: map[string]string{
"secure": fmt.Sprintf("%t", secure),
"broker": "http",
"topic": topic,
},
}
// check for queue group or broadcast queue
version := options.Queue
if len(version) == 0 {
version = broadcastVersion
}
service := ®istry.Service{
Name: serviceName,
Version: version,
Nodes: []*registry.Node{node},
}
// generate subscriber
subscriber := &httpSubscriber{
opts: options,
hb: h,
id: node.Id,
topic: topic,
fn: handler,
svc: service,
}
// subscribe now
if err := h.subscribe(subscriber); err != nil {
return nil, err
}
// return the subscriber
return subscriber, nil
}
この部分のコードの核心機能は購読のためのserverを作成し、一つのtopicはserverを作成してhttpSubscriberのsvcリストに収集(登録)します。Publish
func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
// create the message first
m := &Message{
Header: make(map[string]string),
Body: msg.Body,
}
for k, v := range msg.Header {
m.Header[k] = v
}
m.Header["Micro-Topic"] = topic
// encode the message
b, err := h.opts.Codec.Marshal(m)
if err != nil {
return err
}
// save the message
h.saveMessage(topic, b)
// now attempt to get the service
h.RLock()
s, err := h.r.GetService(serviceName)
if err != nil {
h.RUnlock()
return err
}
h.RUnlock()
pub := func(node *registry.Node, t string, b []byte) error {
scheme := "http"
// check if secure is added in metadata
if node.Metadata["secure"] == "true" {
scheme = "https"
}
vals := url.Values{}
vals.Add("id", node.Id)
uri := fmt.Sprintf("%s://%s%s?%s", scheme, node.Address, DefaultPath, vals.Encode ())
r, err := h.c.Post(uri, "application/json", bytes.NewReader(b))
if err != nil {
return err
}
// discard response body
io.Copy(ioutil.Discard, r.Body)
r.Body.Close()
return nil
}
srv := func(s []*registry.Service, b []byte) {
for _, service := range s {
var nodes []*registry.Node
for _, node := range service.Nodes {
// only use nodes tagged with broker http
if node.Metadata["broker"] != "http" {
continue
}
// look for nodes for the topic
if node.Metadata["topic"] != topic {
continue
}
nodes = append(nodes, node)
}
// only process if we have nodes
if len(nodes) == 0 {
continue
}
switch service.Version {
// broadcast version means broadcast to all nodes
case broadcastVersion:
var success bool
// publish to all nodes
for _, node := range nodes {
// publish async
if err := pub(node, topic, b); err == nil {
success = true
}
}
// save if it failed to publish at least once
if !success {
h.saveMessage(topic, b)
}
default:
// select node to publish to
node := nodes[rand.Int()%len(nodes)]
// publish async to one node
if err := pub(node, topic, b); err != nil {
// if failed save it
h.saveMessage(topic, b)
}
}
}
}
// do the rest async
go func() {
// get a third of the backlog
messages := h.getMessage(topic, 8)
delay := (len(messages) > 1)
// publish all the messages
for _, msg := range messages {
// serialize here
srv(s, msg)
// sending a backlog of messages
if delay {
time.Sleep(time.Millisecond * 100)
}
}
}()
return nil
上のSubscribe
を見たら、ここのPublish
は簡単です。上には複数のnodeが検索される可能性がありますので、中にはもう一つのバージョンの仕組みがあります。バージョンを指定するとすべてのマッチノードに送信されます。
Disconnect
func (h *httpBroker) Disconnect() error {
h.RLock()
if !h.running {
h.RUnlock()
return nil
}
h.RUnlock()
h.Lock()
defer h.Unlock()
// stop cache
rc, ok := h.r.(cache.Cache)
if ok {
rc.Stop()
}
// exit and return err
ch := make(chan error)
h.exit
この部分の機能は簡単で、キャッシュをクリアして終了メッセージを送信し、サービスを停止します。以上がgo-microのデフォルトhttpによるbrookで実現されました。
go-microにおけるbrookに対する包装
brook erのhttpを見て黙認して実現した後、brook erに対して大体の了解ができました。次にgo-microがbrook erに対して作った包装部分を見ています。簡単に使うべきです。
購読
RegisterSubscriber
:func main() {
// create a service
service := micro.NewService(
micro.Name("go.micro.srv.pubsub"),
)
// parse command line
service.Init()
// register subscriber
micro.RegisterSubscriber("example.topic.pubsub.1", service.Server(), new(Sub))
// register subscriber with queue, each message is delivered to a unique subscriber
micro.RegisterSubscriber("example.topic.pubsub.2", service.Server(), subEv, server.SubscriberQueue("queue.pubsub"))
if err := service.Run(); err != nil {
log.Fatal(err)
}
}
リリースNewPublisher, Publish
:func main() {
// create a service
service := micro.NewService(
micro.Name("go.micro.cli.pubsub"),
)
// parse command line
service.Init()
// create publisher
pub1 := micro.NewPublisher("example.topic.pubsub.1", service.Client())
pub2 := micro.NewPublisher("example.topic.pubsub.2", service.Client())
// pub to topic 1
go sendEv("example.topic.pubsub.1", pub1)
// pub to topic 2
go sendEv("example.topic.pubsub.2", pub2)
// block forever
select {}
}
以上はコード選択のみで、具体的な使い方はexampleのpbsubを参照することができます。Subscriber
購読比較は直接Brokerで
RegisterSubscriber
だけ必要です。中を見てみます。//go-micro/micro.go
// RegisterSubscriber is syntactic sugar for registering a subscriber
func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error {
return s.Subscribe(s.NewSubscriber(topic, h, opts...))
}
//go-micro/server/rpc_server.go
func (s *rpcServer) NewSubscriber(topic string, sb interface{}, opts ...SubscriberOption) Subscriber {
return s.router.NewSubscriber(topic, sb, opts...)
}
func (s *rpcServer) Subscribe(sb Subscriber) error {
s.Lock()
defer s.Unlock()
if err := s.router.Subscribe(sb); err != nil {
return err
}
s.subscribers[sb] = nil
return nil
}
//go-micro/server/rpc_router.go
// router represents an RPC router.
type router struct {
.......
subscribers map[string][]*subscriber
}
//go-micro/server/subscriber.go
type subscriber struct {
topic string
rcvr reflect.Value
typ reflect.Type
subscriber interface{}
handlers []*handler
endpoints []*registry.Endpoint
opts SubscriberOptions
}
上記の要約コードによると、デフォルトserverのrouterではmapタイプの変数subscribers
を定義して、購読のtopicと対応する処理のsubscriber
を格納するために使用されています。serverは、メッセージを受信した後、topicからmapに行くだけでsubscriberを見つけて、処理すればいいです。subscriberの中で具体的な処理は定義の中から見ることができて、中はルートとレスポンスに対応するhandlerを格納して、go-micro/server/subscriber.goで具体的なコードの実現を見ることができる興味があります。
Publisher
発表はgo-microのデフォルトclient実現(rpcuclient)でデフォルトのbrookを定義しました。
//go-micro/micro.go
// Deprecated: NewPublisher returns a new Publisher
func NewPublisher(topic string, c client.Client) Event {
return NewEvent(topic, c)
}
// NewEvent creates a new event publisher
func NewEvent(topic string, c client.Client) Event {
if c == nil {
c = client.NewClient()
}
return &event{c, topic}
}
//go-micro/event.go
type event struct {
c client.Client
topic string
}
func (e *event) Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error {
return e.c.Publish(ctx, e.c.NewMessage(e.topic, msg), opts...)
}
ここでは、実際には、転送されたclientを使用して、イベントを初期化し、メッセージを送信するために使用されることが見られ、送信が空であれば、デフォルトでclientを作成する(rpcClient
)。締め括りをつける
以上の過程の追跡を経て、最終的にまとめたのは何時ですか?