go-microのリリース購読Broker分析

14342 ワード

最近は手に時間があります。go-microの発行購読を継続したいです。microのexamplesを見て疑問があります。go-microは購読プラグインBrokerを提供しています。go-micro自身もPublishとSubscribeの機能を実現しました。
Broker
Brokerはgo-microで定義されている非同期メッセージのインターフェースであり、同時にプラグインの形式を使用して、異なる実装(http,nats,rabbiitmq)間でシームレスに切り替えられます。
// Broker is an interface used for asynchronous messaging.
type Broker interface {
   Init(...Option) error
   Options() Options
   Address() string
   Connect() error
   Disconnect() error
   Publish(topic string, m *Message, opts ...PublishOption) error
   Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
   String() string
}
上記のインターフェースからは、Brokerを使って購読を完了するには、以下のステップが必要であることが分かります。
  • Brokerを初期化する(Init
  • 接続Broker(Connect
  • 準備されたBrokerを使って発行/購読する(Publish/Subscribe
  • Brokerを閉じる(Disconnect
  • go-microでデフォルトのブロガーが実現
    go-microはデフォルトでhttpベースのBrokerが実現しています。直接使用できます。microは具体的なexampleを提供しています。具体的にはsource codeの実現を見てください。
    以下はgo-microの中でbroer.goの中でDefault Brokerに関するコードです。
    var (
     DefaultBroker Broker = NewBroker()
    )
    
    func Init(opts ...Option) error {
     return DefaultBroker.Init(opts...)
    }
    
    func Connect() error {
     return DefaultBroker.Connect()
    }
    
    func Disconnect() error {
     return DefaultBroker.Disconnect()
    }
    
    func Publish(topic string, msg *Message, opts ...PublishOption) error {
     return DefaultBroker.Publish(topic, msg, opts...)
    }
    
    func Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
     return DefaultBroker.Subscribe(topic, handler, opts...)
    }
    
    func String() string {
     return DefaultBroker.String()
    }
    
    
    NewBroker()によって返されたbrookの例に基づいて作られたパブリック方法のパッケージ化を見ることができます。さらに見てみます。
    // NewBroker returns a new http broker
    func NewBroker(opts ...Option) Broker {
     return newHttpBroker(opts...)
    }
    
    ここでは、httpで実現されたbrookに直接戻って、newHttpBrokerと続く。
    ここにこの部分のコードを書いて、詳しいのは直接go-microの下のhttp.goを参考にすることができます。
    h := &httpBroker{
        id:          uuid.New().String(),
        address:     addr,
        opts:        options,
        r:           options.Registry,
        c:           &http.Client{Transport: newTransport(options.TLSConfig)},
        subscribers: make(map[string][]*httpSubscriber),
        exit:        make(chan chan error),
        mux:         http.NewServeMux(),
        inbox:       make(map[string][][]byte),
      }
    
    ここの核心はnewです。httpBrokerとして、Brokerインターフェースの実現について、具体的な実現はここでは言いません。
    Init
    func (h *httpBroker) Init(opts ...Option) error {
      h.RLock()
      if h.running {
        h.RUnlock()
        return errors.New("cannot init while connected")
      }
      h.RUnlock()
      h.Lock()
      defer h.Unlock()
      for _, o := range opts {
        o(&h.opts)
      }
      if len(h.opts.Addrs) > 0 && len(h.opts.Addrs[0]) > 0 {
        h.address = h.opts.Addrs[0]
      }
      if len(h.id) == 0 {
        h.id = "go.micro.http.broker-" + uuid.New().String()
      }
      // get registry
      reg := h.opts.Registry
      if reg == nil {
        reg = registry.DefaultRegistry
      }
      // get cache
      if rc, ok := h.r.(cache.Cache); ok {
        rc.Stop()
      }
      // set registry
      h.r = cache.New(reg)
      // reconfigure tls config
      if c := h.opts.TLSConfig; c != nil {
        h.c = &http.Client{
          Transport: newTransport(c),
        }
      }
      return nil
    }
    
    上のコードから見られます。Initの役割は各種の構成を初期化することです。Optionパラメータが提供されているなら、パラメータで提供されています。もしここに一つを設置していないなら、ここに2つの点があります。
  • Registry Registryは登録センターです。もしオプトでregistryが提供されていないなら、go-microを使ってデフォルトで実現されます。
  • TLSConfig TLSConfigはhttpsの構成に対して、デフォルトはhttp
  • です。
    Connect
    func (h *httpBroker) Connect() error {
      h.RLock()
      if h.running {
        h.RUnlock()
        return nil
      }
      h.RUnlock()
      
      h.Lock()
      defer h.Unlock()
      
      var l net.Listener
      var err error
      
      if h.opts.Secure || h.opts.TLSConfig != nil {
        config := h.opts.TLSConfig
      
        fn := func(addr string) (net.Listener, error) {
          if config == nil {
            hosts := []string{addr}
    
            // check if its a valid host:port
            if host, _, err := net.SplitHostPort(addr); err == nil {
              if len(host) == 0 {
                hosts = maddr.IPs()
              } else {
                hosts = []string{host}
              }
            }
    
            // generate a certificate
            cert, err := mls.Certificate(hosts...)
            if err != nil {
              return nil, err
            }
            config = &tls.Config{Certificates: []tls.Certificate{cert}}
          }
          return tls.Listen("tcp", addr, config)
        }
    
        l, err = mnet.Listen(h.address, fn)
      } else {
        fn := func(addr string) (net.Listener, error) {
          return net.Listen("tcp", addr)
        }
      
        l, err = mnet.Listen(h.address, fn)
      }
      
      if err != nil {
        return err
      }
      
      addr := h.address
      h.address = l.Addr().String()
      
      go http.Serve(l, h.mux)
      go func() {
        h.run(l)
        h.Lock()
        h.opts.Addrs = []string{addr}
        h.address = addr
        h.Unlock()
      }()
      
      // get registry
      reg := h.opts.Registry
      if reg == nil {
        reg = registry.DefaultRegistry
      }
      // set cache
      h.r = cache.New(reg)
      
      // set running
      h.running = true
      return nil
    }
    
    Connectメソッドの主な役割は、Publishを受信するためにHtto Serverを作成するときに送信されるメッセージです。
    Subscribe
    func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
      var err error
      var host, port string
      options := NewSubscribeOptions(opts...)
      
      // parse address for host, port
      host, port, err = net.SplitHostPort(h.Address())
      if err != nil {
        return nil, err
      }
      
      addr, err := maddr.Extract(host)
      if err != nil {
        return nil, err
      }
      
      var secure bool
      
      if h.opts.Secure || h.opts.TLSConfig != nil {
        secure = true
      }
      
      // register service
      node := &registry.Node{
        Id:      topic + "-" + h.id,
        Address: mnet.HostPort(addr, port),
        Metadata: map[string]string{
          "secure": fmt.Sprintf("%t", secure),
          "broker": "http",
          "topic":  topic,
        },
      }
      
      // check for queue group or broadcast queue
      version := options.Queue
      if len(version) == 0 {
        version = broadcastVersion
      }
      
      service := &registry.Service{
        Name:    serviceName,
        Version: version,
        Nodes:   []*registry.Node{node},
      }
      
      // generate subscriber
      subscriber := &httpSubscriber{
        opts:  options,
        hb:    h,
        id:    node.Id,
        topic: topic,
        fn:    handler,
        svc:   service,
      }
      
      // subscribe now
      if err := h.subscribe(subscriber); err != nil {
        return nil, err
      }
      
      // return the subscriber
      return subscriber, nil
    }
    
    この部分のコードの核心機能は購読のためのserverを作成し、一つのtopicはserverを作成してhttpSubscriberのsvcリストに収集(登録)します。
    Publish
    func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
      // create the message first
      m := &Message{
        Header: make(map[string]string),
        Body:   msg.Body,
      }
      
      for k, v := range msg.Header {
        m.Header[k] = v
      }
      
      m.Header["Micro-Topic"] = topic
      
      // encode the message
      b, err := h.opts.Codec.Marshal(m)
      if err != nil {
        return err
      }
      
      // save the message
      h.saveMessage(topic, b)
      
      // now attempt to get the service
      h.RLock()
      s, err := h.r.GetService(serviceName)
      if err != nil {
        h.RUnlock()
        return err
      }
      h.RUnlock()
      
      pub := func(node *registry.Node, t string, b []byte) error {
        scheme := "http"
    
        // check if secure is added in metadata
        if node.Metadata["secure"] == "true" {
          scheme = "https"
        }
    
        vals := url.Values{}
        vals.Add("id", node.Id)
    
        uri := fmt.Sprintf("%s://%s%s?%s", scheme, node.Address, DefaultPath, vals.Encode  ())
        r, err := h.c.Post(uri, "application/json", bytes.NewReader(b))
        if err != nil {
          return err
        }
    
        // discard response body
        io.Copy(ioutil.Discard, r.Body)
        r.Body.Close()
        return nil
      }
      
      srv := func(s []*registry.Service, b []byte) {
        for _, service := range s {
          var nodes []*registry.Node
    
          for _, node := range service.Nodes {
            // only use nodes tagged with broker http
            if node.Metadata["broker"] != "http" {
              continue
            }
    
            // look for nodes for the topic
            if node.Metadata["topic"] != topic {
              continue
            }
    
            nodes = append(nodes, node)
          }
    
          // only process if we have nodes
          if len(nodes) == 0 {
            continue
          }
    
          switch service.Version {
          // broadcast version means broadcast to all nodes
          case broadcastVersion:
            var success bool
    
            // publish to all nodes
            for _, node := range nodes {
              // publish async
              if err := pub(node, topic, b); err == nil {
                success = true
              }
            }
    
            // save if it failed to publish at least once
            if !success {
              h.saveMessage(topic, b)
            }
          default:
            // select node to publish to
            node := nodes[rand.Int()%len(nodes)]
            // publish async to one node
            if err := pub(node, topic, b); err != nil {
              // if failed save it
              h.saveMessage(topic, b)
            }
          }
        }
      }
      
      // do the rest async
      go func() {
        // get a third of the backlog
        messages := h.getMessage(topic, 8)
        delay := (len(messages) > 1)
    
        // publish all the messages
        for _, msg := range messages {
          // serialize here
          srv(s, msg)
    
          // sending a backlog of messages
          if delay {
            time.Sleep(time.Millisecond * 100)
          }
        }
      }()
      
      return nil
    
    上のSubscribeを見たら、ここのPublishは簡単です。
  • は、メッセージを作成し、inbox
  • に格納する。
  • は、topicおよびbrookのラベル(ここでは固定http)に基づいて、購読しているserverを検索する(上記の購読モジュールで作成した)。
    上には複数のnodeが検索される可能性がありますので、中にはもう一つのバージョンの仕組みがあります。バージョンを指定するとすべてのマッチノードに送信されます。
  • http postを使用してメッセージを
  • に送信する。
    Disconnect
    func (h *httpBroker) Disconnect() error {
      h.RLock()
      if !h.running {
        h.RUnlock()
        return nil
      }
      h.RUnlock()
      
      h.Lock()
      defer h.Unlock()
      
      // stop cache
      rc, ok := h.r.(cache.Cache)
      if ok {
        rc.Stop()
      }
      
      // exit and return err
      ch := make(chan error)
      h.exit 
    この部分の機能は簡単で、キャッシュをクリアして終了メッセージを送信し、サービスを停止します。
    以上がgo-microのデフォルトhttpによるbrookで実現されました。
    go-microにおけるbrookに対する包装
    brook erのhttpを見て黙認して実現した後、brook erに対して大体の了解ができました。次にgo-microがbrook erに対して作った包装部分を見ています。簡単に使うべきです。
    購読RegisterSubscriber:
    func main() {
      // create a service
      service := micro.NewService(
        micro.Name("go.micro.srv.pubsub"),
      )
      // parse command line
      service.Init()
      
      // register subscriber
      micro.RegisterSubscriber("example.topic.pubsub.1", service.Server(), new(Sub))
      
      // register subscriber with queue, each message is delivered to a unique   subscriber
      micro.RegisterSubscriber("example.topic.pubsub.2", service.Server(), subEv,   server.SubscriberQueue("queue.pubsub"))
      
      if err := service.Run(); err != nil {
        log.Fatal(err)
      }
    }
    
    リリースNewPublisher, Publish
    func main() {
      // create a service
      service := micro.NewService(
        micro.Name("go.micro.cli.pubsub"),
      )
      // parse command line
      service.Init()
      
      // create publisher
      pub1 := micro.NewPublisher("example.topic.pubsub.1", service.Client())
      pub2 := micro.NewPublisher("example.topic.pubsub.2", service.Client())
      
      // pub to topic 1
      go sendEv("example.topic.pubsub.1", pub1)
      // pub to topic 2
      go sendEv("example.topic.pubsub.2", pub2)
      
      // block forever
      select {}
    }
    
    以上はコード選択のみで、具体的な使い方はexampleのpbsubを参照することができます。
    Subscriber
    購読比較は直接BrokerでRegisterSubscriberだけ必要です。中を見てみます。
    //go-micro/micro.go
    // RegisterSubscriber is syntactic sugar for registering a subscriber
    func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error {
      return s.Subscribe(s.NewSubscriber(topic, h, opts...))
    }
    
    //go-micro/server/rpc_server.go
    func (s *rpcServer) NewSubscriber(topic string, sb interface{}, opts ...SubscriberOption) Subscriber {
      return s.router.NewSubscriber(topic, sb, opts...)
    }
    func (s *rpcServer) Subscribe(sb Subscriber) error {
      s.Lock()
      defer s.Unlock()
      
      if err := s.router.Subscribe(sb); err != nil {
        return err
      }
      
      s.subscribers[sb] = nil
      return nil
    }
    
    //go-micro/server/rpc_router.go
    // router represents an RPC router.
    type router struct {
      .......
      subscribers map[string][]*subscriber
    }
    
    //go-micro/server/subscriber.go
    type subscriber struct {
      topic      string
      rcvr       reflect.Value
      typ        reflect.Type
      subscriber interface{}
      handlers   []*handler
      endpoints  []*registry.Endpoint
      opts       SubscriberOptions
    }
    
    上記の要約コードによると、デフォルトserverのrouterではmapタイプの変数subscribersを定義して、購読のtopicと対応する処理のsubscriberを格納するために使用されています。serverは、メッセージを受信した後、topicからmapに行くだけでsubscriberを見つけて、処理すればいいです。
    subscriberの中で具体的な処理は定義の中から見ることができて、中はルートとレスポンスに対応するhandlerを格納して、go-micro/server/subscriber.goで具体的なコードの実現を見ることができる興味があります。
    Publisher
    発表はgo-microのデフォルトclient実現(rpcuclient)でデフォルトのbrookを定義しました。
    //go-micro/micro.go
    // Deprecated: NewPublisher returns a new Publisher
    func NewPublisher(topic string, c client.Client) Event {
      return NewEvent(topic, c)
    }
    
    // NewEvent creates a new event publisher
    func NewEvent(topic string, c client.Client) Event {
      if c == nil {
        c = client.NewClient()
      }
      return &event{c, topic}
    }
    
    
    //go-micro/event.go
    type event struct {
      c     client.Client
      topic string
    }
    
    func (e *event) Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error {
      return e.c.Publish(ctx, e.c.NewMessage(e.topic, msg), opts...)
    }
    
    
    ここでは、実際には、転送されたclientを使用して、イベントを初期化し、メッセージを送信するために使用されることが見られ、送信が空であれば、デフォルトでclientを作成する(rpcClient)。
    締め括りをつける
    以上の過程の追跡を経て、最終的にまとめたのは何時ですか?
  • brookはインターフェースを定義しています。microが提供するプラグインの形式はシームレスに代替できます。
  • を実現します。
  • go-microは、http
  • に基づくデフォルトのbrookを提供します。
  • go-microの標準に基づくserver、clientとbroverはセットの更に簡単なpbとsub方法
  • を包装しました。