etcd-go構成センター

27788 ワード

初期化

type SecLayerConf struct {
	Proxy2LayerRedis RedisConf
	Layer2ProxyRedis RedisConf
	EtcdConfig       EtcdConf
	LogPath          string
	LogLevel         string

	WriteGoroutineNum      int
	ReadGoroutineNum       int
	HandleUserGoroutineNum int
	Read2handleChanSize    int
	Handle2WriteChanSize   int
	MaxRequestWaitTimeout  int

	SendToWriteChanTimeout  int
	SendToHandleChanTimeout int

	SecProductInfoMap map[int]*SecProductInfoConf
	TokenPasswd       string
}

type EtcdConf struct {
	EtcdAddr          string
	Timeout           int
	EtcdSecKeyPrefix  string
	EtcdSecProductKey string
}

type SecLayerContext struct {
	proxy2LayerRedisPool *redis.Pool
	layer2ProxyRedisPool *redis.Pool
	etcdClient           *etcd_client.Client
	RWSecProductLock     sync.RWMutex

	secLayerConf     *SecLayerConf
	waitGroup        sync.WaitGroup
	Read2HandleChan  chan *SecRequest
	Handle2WriteChan chan *SecResponse

	HistoryMap     map[int]*UserBuyHistory
	HistoryMapLock sync.Mutex

	// 
	productCountMgr *ProductCountMgr
}

func initEtcd(conf *SecLayerConf) (err error) {
	cli, err := etcd_client.New(etcd_client.Config{
		Endpoints:   []string{conf.EtcdConfig.EtcdAddr},
		DialTimeout: time.Duration(conf.EtcdConfig.Timeout) * time.Second,
	})
	if err != nil {
		logs.Error("connect etcd failed, err:", err)
		return
	}

	secLayerContext.etcdClient = cli
	logs.Debug("init etcd succ")
	return
}

etcdにデータを送信

type Activity struct {
	ActivityId int `db:"id"`
	ActivityName string `db:"name"`
	ProductId int `db:"product_id"`
	StartTime int64 `db:"start_time"`
	EndTime int64 `db:"end_time"`
	Total int `db:"total"`
	Status int `db:"status"`

	StartTimeStr string
	EndTimeStr string
	StatusStr string
	Speed int `db:"sec_speed"`
	BuyLimit int `db:"buy_limit"`
	BuyRate float64 `db:"buy_rate"`
}

func (p *ActivityModel) SyncToEtcd(activity *Activity) (err error) {
	//  key
	if strings.HasSuffix(EtcdPrefix, "/") == false {
		EtcdPrefix = EtcdPrefix + "/"
	}
	etcdKey  := fmt.Sprintf("%s%s", EtcdPrefix, EtcdProductKey)
	//  etcd 
	secProductInfoList, err := loadProductFromEtcd(etcdKey)

	//  
	var secProductInfo SecProductInfoConf
	secProductInfo.EndTime =  activity.EndTime
	secProductInfo.OnePersonBuyLimit = activity.BuyLimit
	secProductInfo.ProductId = activity.ProductId
	secProductInfo.SoldMaxLimit = activity.Speed
	secProductInfo.StartTime = activity.StartTime
	secProductInfo.Status = activity.Status
	secProductInfo.Total = activity.Total
	secProductInfo.BuyRate = activity.BuyRate

	secProductInfoList = append(secProductInfoList, secProductInfo)
	//  
	data, err := json.Marshal(secProductInfoList)
	if err != nil {
		logs.Error("json marshal failed, err:%v", err)
		return
	}
	//  etcd 
	_, err = EtcdClient.Put(context.Background(), etcdKey, string(data))
	if err != nil {
		logs.Error("put to etcd failed, err:%v, data[%v]", err, string(data))
		return
	}

	logs.Debug("put to etcd succ, data:%v", string(data))
	return
}

etcdからデータを取得する

func loadProductFromEtcd(conf *SecLayerConf) (err error) {
	logs.Debug("start get from etcd succ")
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
	defer cancel()
	//  
	resp, err := secLayerContext.etcdClient.Get(ctx, conf.EtcdConfig.EtcdSecProductKey)
	if err != nil {
		logs.Error("get [%s] from etcd failed, err:%v", conf.EtcdConfig.EtcdSecProductKey, err)
		return
	}
	logs.Debug("get from etcd succ, resp:%v", resp)
	//  
	var secProductInfo []SecProductInfoConf
	for k, v := range resp.Kvs {
		logs.Debug("key[%v] valud[%v]", k, v)
		err = json.Unmarshal(v.Value, &secProductInfo)
		if err != nil {
			logs.Error("Unmarshal sec product info failed, err:%v", err)
			return
		}

		logs.Debug("sec info conf is [%v]", secProductInfo)
	}

	updateSecProductInfo(conf, secProductInfo)
	logs.Debug("update product info succ, data:%v", secProductInfo)

	initSecProductWatcher(conf)

	logs.Debug("init etcd watcher succ")
	return
}

func updateSecProductInfo(conf *SecLayerConf, secProductInfo []SecProductInfoConf) {

	var tmp = make(map[int]*SecProductInfoConf, 1024)
	for _, v := range secProductInfo {
		produtInfo := v
		produtInfo.secLimit = &SecLimit{}
		tmp[v.ProductId] = &produtInfo
	}

	secLayerContext.RWSecProductLock.Lock()
	conf.SecProductInfoMap = tmp
	secLayerContext.RWSecProductLock.Unlock()
}

func initSecProductWatcher(conf *SecLayerConf) {
	go watchSecProductKey(conf)
}



watch操作

func watchSecProductKey(conf *SecLayerConf) {
	key := conf.EtcdConfig.EtcdSecProductKey
	logs.Debug("begin watch key:%s", key)
	var err error
	for {
		rch := secLayerContext.etcdClient.Watch(context.Background(), key)
		var secProductInfo []SecProductInfoConf
		var getConfSucc = true

		for wresp := range rch {
			for _, ev := range wresp.Events {
				if ev.Type == mvccpb.DELETE {
					logs.Warn("key[%s] 's config deleted", key)
					continue
				}

				if ev.Type == mvccpb.PUT && string(ev.Kv.Key) == key {
					err = json.Unmarshal(ev.Kv.Value, &secProductInfo)
					if err != nil {
						logs.Error("key [%s], Unmarshal[%s], err:%v ", err)
						getConfSucc = false
						continue
					}
				}
				logs.Debug("get config from etcd, %s %q : %q
"
, ev.Type, ev.Kv.Key, ev.Kv.Value) } if getConfSucc { logs.Debug("get config from etcd succ, %v", secProductInfo) updateSecProductInfo(conf, secProductInfo) } } } }