Go Micro Brokerソース分析


概要
最初の概説文では,MicroにおけるBrokerの役割,Go Microの全体的な設計について述べた.Microは挿抜可能な分散フレームワークであることも知っています.kafka、rabbitmq、cache、redis、natsなどのさまざまな実装を使用して、git上のプラグインライブラリでgo-pluginsを見つけることができます.インタフェースを見てみましょう.
type Broker interface {
    Init(...Option) error
    Options() Options
    Address() string
    Connect() error
    Disconnect() error
    Publish(topic string, m *Message, opts ...PublishOption) error
    Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
    String() string
}

ConnectオープンbrokerSubstribe登録あるtopicの傍受Publishにあるtopicのメッセージを発行するDisconnectクローズbroker
単純な使用
この記事では、デフォルトのbrokerを使用してhttpbroker分析を実現し、比較的簡単で理解しやすい方法であり、コードアドレスです.
PubTopic関数はパブリッシュ関数です
SubTopic関数はサブスクリプション関数です
次の例では、多くのコメントは必要ありません.簡単にサブスクリプション関数をパブリッシュして実現します.
package main

import (
    "fmt"
    "github.com/micro/go-micro/broker"
    "log"
    "time"
)

func main() {
    if err := broker.Connect(); err != nil {
        fmt.Println("Broker Connect error: %v", err)
    }

    go PubTopic("SubServerName")
    go SubTopic("SubServerName")
    time.Sleep(time.Second * 10)
}

func PubTopic(topic string) {
    tick := time.NewTicker(time.Second)
    i := 0
    for _ = range tick.C {
        msg := &broker.Message{
            Header: map[string]string{
                "id": fmt.Sprintf("%d", i),
            },
            Body: []byte(fmt.Sprintf("%d: %s", i, time.Now().String())),
        }
        if err := broker.Publish(topic, msg); err != nil {
            log.Printf("[pub] failed: %v", err)
        } else {
            fmt.Println("[pub] pubbed message:", string(msg.Body))
        }
        i++
    }
}

func SubTopic(topic string) {
    _, err := broker.Subscribe(topic, func(p broker.Event) error {
        fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header)
        return nil
    })
    if err != nil {
        fmt.Println(err)
    }
}

結果
[pub] pubbed message: 0: 2019-07-15 15:48:00.377247298 +0800 CST m=+1.005764860
[sub] received message: 0: 2019-07-15 15:48:00.377247298 +0800 CST m=+1.005764860 header map[id:0]
[pub] pubbed message: 1: 2019-07-15 15:48:01.376383294 +0800 CST m=+2.004891632
[sub] received message: 1: 2019-07-15 15:48:01.376383294 +0800 CST m=+2.004891632 header map[id:1]
[pub] pubbed message: 2: 2019-07-15 15:48:02.377595797 +0800 CST m=+3.006094892
[sub] received message: 2: 2019-07-15 15:48:02.377595797 +0800 CST m=+3.006094892 header map[id:2]
[pub] pubbed message: 3: 2019-07-15 15:48:03.376685455 +0800 CST m=+4.005175327
[sub] received message: 3: 2019-07-15 15:48:03.376685455 +0800 CST m=+4.005175327 header map[id:3]
[pub] pubbed message: 4: 2019-07-15 15:48:04.377715895 +0800 CST m=+5.006196526
[sub] received message: 4: 2019-07-15 15:48:04.377715895 +0800 CST m=+5.006196526 header map[id:4]

ソース分析Connect
  • tcpリスニング
  • をオン
  • goroutineを起動し、registerInterval間隔でsubscriberに登録し、心拍
  • に似ている.
  • サービス発見登録サービス
  • を設定する.
  • キャッシュオブジェクト
  • を設定する.
  • running=true
  • を設定
    func (h *httpBroker) Connect() error {
        h.RLock()
        if h.running {
            h.RUnlock()
            return nil
        }
        h.RUnlock()
    
        h.Lock()
        defer h.Unlock()
    
        var l net.Listener
        var err error
        //               
        if h.opts.Secure || h.opts.TLSConfig != nil {
            config := h.opts.TLSConfig
    
            fn := func(addr string) (net.Listener, error) {
                if config == nil {
                    hosts := []string{addr}
    
                    // check if its a valid host:port
                    if host, _, err := net.SplitHostPort(addr); err == nil {
                        if len(host) == 0 {
                            hosts = maddr.IPs()
                        } else {
                            hosts = []string{host}
                        }
                    }
    
                    // generate a certificate
                    cert, err := mls.Certificate(hosts...)
                    if err != nil {
                        return nil, err
                    }
                    config = &tls.Config{Certificates: []tls.Certificate{cert}}
                }
                return tls.Listen("tcp", addr, config)
            }
            l, err = mnet.Listen(h.address, fn)
        } else {
            fn := func(addr string) (net.Listener, error) {
                return net.Listen("tcp", addr)
            }
    
            l, err = mnet.Listen(h.address, fn)
        }
    
        if err != nil {
            return err
        }
    
        addr := h.address
        h.address = l.Addr().String()
    
        go http.Serve(l, h.mux)
        go func() {
            //      registerInterval     ,           
            h.run(l)
            h.Lock()
            h.opts.Addrs = []string{addr}
            h.address = addr
            h.Unlock()
        }()
    
        // get registry
        reg, ok := h.opts.Context.Value(registryKey).(registry.Registry)
        if !ok {
            reg = registry.DefaultRegistry
        }
        // set cache
        h.r = cache.New(reg)
    
        // set running
        h.running = true
        return nil
    }
    Subscribe
  • 解析address
  • 一意id
  • を作成する
  • パッチワークサービス情報最後のサービス情報を下図
  • に示す.
  • Register(デフォルトはmdns)登録サービス
  • を呼び出す.
  • serviceをsubscribers map[string]*httpSubscriber中
  • に置く
    func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
        options := NewSubscribeOptions(opts...)
    
        // parse address for host, port
        parts := strings.Split(h.Address(), ":")
        host := strings.Join(parts[:len(parts)-1], ":")
        port, _ := strconv.Atoi(parts[len(parts)-1])
    
        addr, err := maddr.Extract(host)
        if err != nil {
            return nil, err
        }
    
        // create unique id
        id := h.id + "." + uuid.New().String()
    
        var secure bool
    
        if h.opts.Secure || h.opts.TLSConfig != nil {
            secure = true
        }
    
        // register service
        node := &registry.Node{
            Id:      id,
            Address: fmt.Sprintf("%s:%d", addr, port),
            Metadata: map[string]string{
                "secure": fmt.Sprintf("%t", secure),
            },
        }
    
        // check for queue group or broadcast queue
        version := options.Queue
        if len(version) == 0 {
            version = broadcastVersion
        }
    
        service := &registry.Service{
            Name:    "topic:" + topic,
            Version: version,
            Nodes:   []*registry.Node{node},
        }
    
        //   sub
        subscriber := &httpSubscriber{
            opts:  options,
            hb:    h,
            id:    id,
            topic: topic,
            fn:    handler,
            svc:   service,
        }
    
        //   subscribe       register     
        if err := h.subscribe(subscriber); err != nil {
            return nil, err
        }
    
        // return the subscriber
        return subscriber, nil
    }
    Publish
  • registryからtopicを取得する消費者ノード
  • メッセージを符号化する
  • 符号化メッセージをノード
  • に順次非同期publishする.
    func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
        // create the message first
        m := &Message{
            Header: make(map[string]string),
            Body:   msg.Body,
        }
    
        for k, v := range msg.Header {
            m.Header[k] = v
        }
    
        m.Header[":topic"] = topic
    
        // encode the message
        b, err := h.opts.Codec.Marshal(m)
        if err != nil {
            return err
        }
    
        // save the message
        h.saveMessage(topic, b)
    
        // now attempt to get the service
        h.RLock()
        s, err := h.r.GetService("topic:" + topic)
        if err != nil {
            h.RUnlock()
            // ignore error
            return nil
        }
        h.RUnlock()
    
        pub := func(node *registry.Node, t string, b []byte) error {
            scheme := "http"
    
            // check if secure is added in metadata
            if node.Metadata["secure"] == "true" {
                scheme = "https"
            }
    
            vals := url.Values{}
            vals.Add("id", node.Id)
    
            uri := fmt.Sprintf("%s://%s%s?%s", scheme, node.Address, DefaultSubPath, vals.Encode())
            r, err := h.c.Post(uri, "application/json", bytes.NewReader(b))
            if err != nil {
                return err
            }
    
            // discard response body
            io.Copy(ioutil.Discard, r.Body)
            r.Body.Close()
            return nil
        }
    
        srv := func(s []*registry.Service, b []byte) {
            for _, service := range s {
                // only process if we have nodes
                if len(service.Nodes) == 0 {
                    continue
                }
    
                switch service.Version {
                // broadcast version means broadcast to all nodes
                case broadcastVersion:
                    var success bool
    
                    // publish to all nodes
                    for _, node := range service.Nodes {
                        // publish async
                        if err := pub(node, topic, b); err == nil {
                            success = true
                        }
                    }
    
                    // save if it failed to publish at least once
                    if !success {
                        h.saveMessage(topic, b)
                    }
                default:
                    // select node to publish to
                    node := service.Nodes[rand.Int()%len(service.Nodes)]
    
                    // publish async to one node
                    if err := pub(node, topic, b); err != nil {
                        // if failed save it
                        h.saveMessage(topic, b)
                    }
                }
            }
        }
    
        // do the rest async
        go func() {
            // get a third of the backlog
            messages := h.getMessage(topic, 8)
            delay := (len(messages) > 1)
    
            // publish all the messages
            for _, msg := range messages {
                // serialize here
                srv(s, msg)
    
                // sending a backlog of messages
                if delay {
                    time.Sleep(time.Millisecond * 100)
                }
            }
        }()
    
        return nil
    }

    まとめ
    デフォルトのbrokerインプリメンテーションはhttpを使用しますが、一般的にはhttpを使用してパブリケーションとサブスクリプションは実装されません.kafka,redisなどを用いてパブリケーションやサブスクリプションを実現することができます.