Go Micro Brokerソース分析
概要
最初の概説文では,MicroにおけるBrokerの役割,Go Microの全体的な設計について述べた.Microは挿抜可能な分散フレームワークであることも知っています.kafka、rabbitmq、cache、redis、natsなどのさまざまな実装を使用して、git上のプラグインライブラリでgo-pluginsを見つけることができます.インタフェースを見てみましょう.
ConnectオープンbrokerSubstribe登録あるtopicの傍受Publishにあるtopicのメッセージを発行するDisconnectクローズbroker
単純な使用
この記事では、デフォルトのbrokerを使用してhttpbroker分析を実現し、比較的簡単で理解しやすい方法であり、コードアドレスです.
PubTopic関数はパブリッシュ関数です
SubTopic関数はサブスクリプション関数です
次の例では、多くのコメントは必要ありません.簡単にサブスクリプション関数をパブリッシュして実現します.
結果
ソース分析 tcpリスニング をオン goroutineを起動し、registerInterval間隔でsubscriberに登録し、心拍 に似ている.サービス発見登録サービス を設定する.キャッシュオブジェクト を設定する. running=true を設定解析address 一意id を作成するパッチワークサービス情報最後のサービス情報を下図 に示す. Register(デフォルトはmdns)登録サービス を呼び出す. serviceをsubscribers map[string]*httpSubscriber中 に置く registryからtopicを取得する消費者ノード メッセージを符号化する 符号化メッセージをノード に順次非同期publishする.
まとめ
デフォルトのbrokerインプリメンテーションはhttpを使用しますが、一般的にはhttpを使用してパブリケーションとサブスクリプションは実装されません.kafka,redisなどを用いてパブリケーションやサブスクリプションを実現することができます.
最初の概説文では,MicroにおけるBrokerの役割,Go Microの全体的な設計について述べた.Microは挿抜可能な分散フレームワークであることも知っています.kafka、rabbitmq、cache、redis、natsなどのさまざまな実装を使用して、git上のプラグインライブラリでgo-pluginsを見つけることができます.インタフェースを見てみましょう.
type Broker interface {
Init(...Option) error
Options() Options
Address() string
Connect() error
Disconnect() error
Publish(topic string, m *Message, opts ...PublishOption) error
Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
String() string
}
ConnectオープンbrokerSubstribe登録あるtopicの傍受Publishにあるtopicのメッセージを発行するDisconnectクローズbroker
単純な使用
この記事では、デフォルトのbrokerを使用してhttpbroker分析を実現し、比較的簡単で理解しやすい方法であり、コードアドレスです.
PubTopic関数はパブリッシュ関数です
SubTopic関数はサブスクリプション関数です
次の例では、多くのコメントは必要ありません.簡単にサブスクリプション関数をパブリッシュして実現します.
package main
import (
"fmt"
"github.com/micro/go-micro/broker"
"log"
"time"
)
func main() {
if err := broker.Connect(); err != nil {
fmt.Println("Broker Connect error: %v", err)
}
go PubTopic("SubServerName")
go SubTopic("SubServerName")
time.Sleep(time.Second * 10)
}
func PubTopic(topic string) {
tick := time.NewTicker(time.Second)
i := 0
for _ = range tick.C {
msg := &broker.Message{
Header: map[string]string{
"id": fmt.Sprintf("%d", i),
},
Body: []byte(fmt.Sprintf("%d: %s", i, time.Now().String())),
}
if err := broker.Publish(topic, msg); err != nil {
log.Printf("[pub] failed: %v", err)
} else {
fmt.Println("[pub] pubbed message:", string(msg.Body))
}
i++
}
}
func SubTopic(topic string) {
_, err := broker.Subscribe(topic, func(p broker.Event) error {
fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header)
return nil
})
if err != nil {
fmt.Println(err)
}
}
結果
[pub] pubbed message: 0: 2019-07-15 15:48:00.377247298 +0800 CST m=+1.005764860
[sub] received message: 0: 2019-07-15 15:48:00.377247298 +0800 CST m=+1.005764860 header map[id:0]
[pub] pubbed message: 1: 2019-07-15 15:48:01.376383294 +0800 CST m=+2.004891632
[sub] received message: 1: 2019-07-15 15:48:01.376383294 +0800 CST m=+2.004891632 header map[id:1]
[pub] pubbed message: 2: 2019-07-15 15:48:02.377595797 +0800 CST m=+3.006094892
[sub] received message: 2: 2019-07-15 15:48:02.377595797 +0800 CST m=+3.006094892 header map[id:2]
[pub] pubbed message: 3: 2019-07-15 15:48:03.376685455 +0800 CST m=+4.005175327
[sub] received message: 3: 2019-07-15 15:48:03.376685455 +0800 CST m=+4.005175327 header map[id:3]
[pub] pubbed message: 4: 2019-07-15 15:48:04.377715895 +0800 CST m=+5.006196526
[sub] received message: 4: 2019-07-15 15:48:04.377715895 +0800 CST m=+5.006196526 header map[id:4]
ソース分析
Connect
func (h *httpBroker) Connect() error {
h.RLock()
if h.running {
h.RUnlock()
return nil
}
h.RUnlock()
h.Lock()
defer h.Unlock()
var l net.Listener
var err error
//
if h.opts.Secure || h.opts.TLSConfig != nil {
config := h.opts.TLSConfig
fn := func(addr string) (net.Listener, error) {
if config == nil {
hosts := []string{addr}
// check if its a valid host:port
if host, _, err := net.SplitHostPort(addr); err == nil {
if len(host) == 0 {
hosts = maddr.IPs()
} else {
hosts = []string{host}
}
}
// generate a certificate
cert, err := mls.Certificate(hosts...)
if err != nil {
return nil, err
}
config = &tls.Config{Certificates: []tls.Certificate{cert}}
}
return tls.Listen("tcp", addr, config)
}
l, err = mnet.Listen(h.address, fn)
} else {
fn := func(addr string) (net.Listener, error) {
return net.Listen("tcp", addr)
}
l, err = mnet.Listen(h.address, fn)
}
if err != nil {
return err
}
addr := h.address
h.address = l.Addr().String()
go http.Serve(l, h.mux)
go func() {
// registerInterval ,
h.run(l)
h.Lock()
h.opts.Addrs = []string{addr}
h.address = addr
h.Unlock()
}()
// get registry
reg, ok := h.opts.Context.Value(registryKey).(registry.Registry)
if !ok {
reg = registry.DefaultRegistry
}
// set cache
h.r = cache.New(reg)
// set running
h.running = true
return nil
}
Subscribe
func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
options := NewSubscribeOptions(opts...)
// parse address for host, port
parts := strings.Split(h.Address(), ":")
host := strings.Join(parts[:len(parts)-1], ":")
port, _ := strconv.Atoi(parts[len(parts)-1])
addr, err := maddr.Extract(host)
if err != nil {
return nil, err
}
// create unique id
id := h.id + "." + uuid.New().String()
var secure bool
if h.opts.Secure || h.opts.TLSConfig != nil {
secure = true
}
// register service
node := ®istry.Node{
Id: id,
Address: fmt.Sprintf("%s:%d", addr, port),
Metadata: map[string]string{
"secure": fmt.Sprintf("%t", secure),
},
}
// check for queue group or broadcast queue
version := options.Queue
if len(version) == 0 {
version = broadcastVersion
}
service := ®istry.Service{
Name: "topic:" + topic,
Version: version,
Nodes: []*registry.Node{node},
}
// sub
subscriber := &httpSubscriber{
opts: options,
hb: h,
id: id,
topic: topic,
fn: handler,
svc: service,
}
// subscribe register
if err := h.subscribe(subscriber); err != nil {
return nil, err
}
// return the subscriber
return subscriber, nil
}
Publish
func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
// create the message first
m := &Message{
Header: make(map[string]string),
Body: msg.Body,
}
for k, v := range msg.Header {
m.Header[k] = v
}
m.Header[":topic"] = topic
// encode the message
b, err := h.opts.Codec.Marshal(m)
if err != nil {
return err
}
// save the message
h.saveMessage(topic, b)
// now attempt to get the service
h.RLock()
s, err := h.r.GetService("topic:" + topic)
if err != nil {
h.RUnlock()
// ignore error
return nil
}
h.RUnlock()
pub := func(node *registry.Node, t string, b []byte) error {
scheme := "http"
// check if secure is added in metadata
if node.Metadata["secure"] == "true" {
scheme = "https"
}
vals := url.Values{}
vals.Add("id", node.Id)
uri := fmt.Sprintf("%s://%s%s?%s", scheme, node.Address, DefaultSubPath, vals.Encode())
r, err := h.c.Post(uri, "application/json", bytes.NewReader(b))
if err != nil {
return err
}
// discard response body
io.Copy(ioutil.Discard, r.Body)
r.Body.Close()
return nil
}
srv := func(s []*registry.Service, b []byte) {
for _, service := range s {
// only process if we have nodes
if len(service.Nodes) == 0 {
continue
}
switch service.Version {
// broadcast version means broadcast to all nodes
case broadcastVersion:
var success bool
// publish to all nodes
for _, node := range service.Nodes {
// publish async
if err := pub(node, topic, b); err == nil {
success = true
}
}
// save if it failed to publish at least once
if !success {
h.saveMessage(topic, b)
}
default:
// select node to publish to
node := service.Nodes[rand.Int()%len(service.Nodes)]
// publish async to one node
if err := pub(node, topic, b); err != nil {
// if failed save it
h.saveMessage(topic, b)
}
}
}
}
// do the rest async
go func() {
// get a third of the backlog
messages := h.getMessage(topic, 8)
delay := (len(messages) > 1)
// publish all the messages
for _, msg := range messages {
// serialize here
srv(s, msg)
// sending a backlog of messages
if delay {
time.Sleep(time.Millisecond * 100)
}
}
}()
return nil
}
まとめ
デフォルトのbrokerインプリメンテーションはhttpを使用しますが、一般的にはhttpを使用してパブリケーションとサブスクリプションは実装されません.kafka,redisなどを用いてパブリケーションやサブスクリプションを実現することができます.