golang学習redis接続プール実装

10803 ワード

高同時アクセスに対応する場合、codisと組み合わせてredisの接続プールを使用します.開発言語はgolangで、githubでオープンソースのgolangで書かれた接続プールを使用しています.
golang接続プールの実装をより深く理解するために、接続プールをもう一度書き直しました.接続服の具体的な実装ロジックを学習し,1つのredis接続を生成する負荷等化問題(複数のサービスアドレスループ抽出)を最適化した.
同僚の助けにも感謝します.
プログラムをここに記録して自分の成長を記録する
プログラムリファレンスオープンソースパッケージは次のとおりです.
"github.com/alecthomas/log4go"
"github.com/garyburd/redigo/redis"
"github.com/samuel/go-zookeeper/zk"

プログラムコード:godis.go
package godis

import (
	"container/list"
	"encoding/json"
	"errors"
	"fmt"
	"github.com/alecthomas/log4go"
	"github.com/garyburd/redigo/redis"
	"github.com/samuel/go-zookeeper/zk"
	"strings"
	"sync"
	"time"
)

const (
	//  zk      
	DEFAULT_ZK_CONNECT_TIMEOUT = 100

	//         
	DEFAULT_MAX_IDLE = 10

	//       
	DEFAULT_MAX_ACTIVE = 100
)

type proxyInfo struct {
	Addr  string `json:"addr"`
	State string `json:"state"`
}

type idleConn struct {
	c redis.Conn
	t time.Time
}

type GodisPool struct {
	//zk   
	ZkDir string

	//    
	TestOnBorrow func(c redis.Conn, t time.Time) error

	//zk       
	ZkServerList []string

	//      
	MaxIdle int

	//     
	MaxActive int

	//    
	IdleTimeout time.Duration

	//redis      ,zk      addr,    (online) addr
	//pools []redis.Conn
	pools []string

	ZkConnTimeout time.Duration

	//zk     
	zkC zk.Conn

	//    
	Wait bool

	//     
	idle list.List

	//     
	active  int
	mutex   sync.Mutex
	cond    *sync.Cond
	nextIdx int
}

var (
	nowFunc = time.Now

	//  zk   (         ),                  ,
	zkMap = make(map[string]string)

	//    
	godisLogHandle = false
)

//     
func (gp *GodisPool) InitPool() {

	if gp.MaxIdle <= 0 {
		gp.MaxIdle = DEFAULT_MAX_IDLE
	}
	if gp.MaxActive <= 0 {
		gp.MaxActive = DEFAULT_MAX_ACTIVE
	}

	gp.initZK()

	//     
	rsE := gp.resetPools()
	if rsE != nil {
		fmt.Println("set pools err:", rsE.Error())
		return
	}
	//      ,       
	go gp.poolWatcher()
}

//   zookeeper
func (gp *GodisPool) initZK() {
	zkConn, _, err := zk.Connect(gp.ZkServerList, gp.ZkConnTimeout)

	if err != nil {
		if godisLogHandle {
			log4go.Error("Failed to connect to zookeeper: %+v", err)
			log4go.Error("after 100 millisecond reconnect to zk...")
		}
	} else {
		gp.zkC = *zkConn
	}

}

//     
/**
 *      zk  ,      (State online)  redis  
 *      
 *       zk    wather,          
 */
func (gp *GodisPool) resetPools() error {
	gp.pools = []string{}

	//    map
	tmpMap := make(map[string]string)
	for tk, tv := range zkMap {
		tmpMap[tk] = tv
	}

	proxys, _, err := gp.zkC.Children(gp.ZkDir)
	if err != nil {
		fmt.Println("connect to zookeeper get children err:", err)
		return err
	}

	//var _conn redis.Conn
	//var _err error

	for _, child := range proxys {
		connData, _, err := gp.zkC.Get(gp.ZkDir + "/" + child)

		if err != nil {
			continue
		}
		var p proxyInfo
		Uerr := json.Unmarshal(connData, &p)

		if Uerr != nil {
			fmt.Println(Uerr.Error())
		}

		/*_conn, _err = redis.Dial("tcp", p.Addr)

		if _err != nil {
			log4go.Error("Create redis connection err: %s", _err.Error())
			continue
		}*/

		if p.State == "online" {
			gp.pools = append(gp.pools, p.Addr)
		}

		_, gRs := tmpMap[child]
		if !gRs {
			//a new node
			zkMap[child] = p.Addr
			go gp.childWatcher(child)
		}

		delete(tmpMap, child)
	}

	//  zkmap      
	for tk, _ := range tmpMap {
		delete(zkMap, tk)
	}

	log4go.Info("new pool len----->", len(gp.pools))
	log4go.Info("zkmap------", zkMap)

	return nil
}

//zk    ,             ,     
func (gp *GodisPool) poolWatcher() {
	log4go.Info("start to listen children node change ...")
	for {
		_, _, evtC, err := gp.zkC.ChildrenW(gp.ZkDir)

		if err != nil {
			log4go.Error("watch zkNode %s err: %s", gp.ZkDir, err.Error())
			return
		}

		evt :=  0 {
		for i, n := 0, gp.idle.Len(); i < n; i++ {
			e := gp.idle.Back()

			if e == nil {
				break
			}
			ic := e.Value.(idleConn)

			if ic.t.Add(timeout).After(nowFunc()) {
				break
			}

			gp.idle.Remove(e)
			gp.release()
			gp.mutex.Unlock()
			ic.c.Close()
			gp.mutex.Lock()

		}
	}
	log4go.Info("active=-----------------", gp.active)
	log4go.Info("idle=---------------->", gp.idle.Len())
	for {
		//// Get idle connection.
		for i, n := 0, gp.idle.Len(); i < n; i++ {
			e := gp.idle.Front()
			if e == nil {
				break
			}

			ic := e.Value.(idleConn)
			gp.idle.Remove(e)
			test := gp.TestOnBorrow
			gp.mutex.Unlock()
			if test == nil || test(ic.c, ic.t) == nil {
				fmt.Println("---get----from----idle---")
				return ic.c, nil
			}

			ic.c.Close()
			gp.mutex.Lock()
			gp.release()
		}

		// Dial new connection if under limit.
		if gp.MaxActive == 0 || gp.active < gp.MaxActive {
			if len(gp.pools) == 0 {
				rsE := gp.resetPools()
				if rsE != nil {
					gp.mutex.Unlock()
					return nil, rsE
				}
			}

			gp.nextIdx += 1
			if gp.nextIdx >= len(gp.pools) {
				gp.nextIdx = 0
			}
			if len(gp.pools) == 0 {
				gp.mutex.Unlock()
				err := errors.New("Proxy list empty")
				log4go.Error(err)
				return nil, err
			} else {
				fmt.Println("---get----from----new---")
				c := gp.pools[gp.nextIdx]
				gp.active += 1
				_conn, _err := redis.Dial("tcp", c)
				gp.mutex.Unlock()
				if _err != nil {
					log4go.Error("Create redis connection err: %s", _err.Error())
					return nil, _err
				}
				test := gp.TestOnBorrow
				if test == nil || test(_conn, nowFunc()) == nil {
					return _conn, nil
				}
				_conn = nil
				gp.mutex.Lock()
				gp.release()
				gp.mutex.Unlock()

				return _conn, errors.New("Create redis connection err")
			}
		}

		if !gp.Wait {
			gp.mutex.Unlock()
			return nil, errors.New("connect pool exhausted")
		}

		if gp.cond == nil {
			gp.cond = sync.NewCond(&gp.mutex)
		}

		gp.cond.Wait()
	}
}

//         
func (gp *GodisPool) release() {
	gp.active -= 1
	if gp.cond != nil {
		gp.cond.Signal()
	}
}

//       idle
//           ,      
func (gp *GodisPool) put(c redis.Conn) error {
	err := c.Err()
	gp.mutex.Lock()

	if err == nil {
		if gp.idle.Len() < gp.MaxIdle {
			gp.idle.PushFront(idleConn{c: c, t: nowFunc()})

			fmt.Println("add-to-idle=---------------->", gp.idle.Len())
			if gp.cond != nil {
				gp.cond.Signal()
			}
			gp.release()
			gp.mutex.Unlock()
			return nil
		}
	} else {
		fmt.Println("----errr===", err)
	}
	gp.release()
	gp.mutex.Unlock()

	return c.Close()
}

//Get          
type pooledConnection struct {
	p     *GodisPool
	c     redis.Conn
	state int
}

func (pc *pooledConnection) Close() error {
	c := pc.c

	pc.p.put(c)

	return nil
}

func (pc *pooledConnection) Err() error {
	return pc.c.Err()
}

func (pc *pooledConnection) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
	ci := LookupCommandInfo(commandName)
	pc.state = (pc.state | ci.Set) &^ ci.Clear
	return pc.c.Do(commandName, args...)
}

func (pc *pooledConnection) Send(commandName string, args ...interface{}) error {
	ci := LookupCommandInfo(commandName)
	pc.state = (pc.state | ci.Set) &^ ci.Clear
	return pc.c.Send(commandName, args...)
}

func (pc *pooledConnection) Flush() error {
	return pc.c.Flush()
}

func (pc *pooledConnection) Receive() (reply interface{}, err error) {
	return pc.c.Receive()
}

const (
	WatchState = 1 << iota
	MultiState
	SubscribeState
	MonitorState
)

type CommandInfo struct {
	Set, Clear int
}

var commandInfos = map[string]CommandInfo{
	"WATCH":      {Set: WatchState},
	"UNWATCH":    {Clear: WatchState},
	"MULTI":      {Set: MultiState},
	"EXEC":       {Clear: WatchState | MultiState},
	"DISCARD":    {Clear: WatchState | MultiState},
	"PSUBSCRIBE": {Set: SubscribeState},
	"SUBSCRIBE":  {Set: SubscribeState},
	"MONITOR":    {Set: MonitorState},
}

//     
func init() {
	for n, ci := range commandInfos {
		commandInfos[strings.ToLower(n)] = ci
	}

	log4go.LoadConfiguration("log4g.xml")

	godisLogHandle = true

}

func LookupCommandInfo(commandName string) CommandInfo {
	if ci, ok := commandInfos[commandName]; ok {
		return ci
	}
	return commandInfos[strings.ToUpper(commandName)]
}

type errorConnection struct{ err error }

func (ec errorConnection) Do(string, ...interface{}) (interface{}, error) { return nil, ec.err }
func (ec errorConnection) Send(string, ...interface{}) error              { return ec.err }
func (ec errorConnection) Err() error                                     { return ec.err }
func (ec errorConnection) Close() error                                   { return ec.err }
func (ec errorConnection) Flush() error                                   { return ec.err }
func (ec errorConnection) Receive() (interface{}, error)                  { return nil, ec.err }

テストプログラムコード:godis-test.go
package main

import (
	"fmt"
	"github.com/garyburd/redigo/redis"
	"gotest/godis"
	"time"
)

var (
	Godispool *godis.GodisPool
)

func main() {
	Godispool = &godis.GodisPool{
		MaxIdle:       5,
		MaxActive:     6,
		ZkServerList:  []string{"127.0.0.1:2181"}, //10.20.30.91
		ZkDir:         "/serverlist",
		IdleTimeout:   (time.Duration(3) * time.Second),
		ZkConnTimeout: 3 * time.Second,
		Wait:          true,
		TestOnBorrow: func(c redis.Conn, t time.Time) error {
			if _, err := c.Do("PING"); err != nil {
				return err
			}
			return nil
		},
	}
	Godispool.InitPool()

	gConn := Godispool.Get()

	_, sErr := gConn.Do("set", "aaa", "aaaaaaaaa")

	if sErr != nil {
		fmt.Println("set err -: ", sErr.Error())
	}

	rs, err := redis.String(gConn.Do("get", "aaa"))

	if err != nil {
		fmt.Println("get err:", err.Error())
	}

	fmt.Println("rs--", rs)
	//
	//time.Sleep(time.Second)
	//
	//CConn := Godispool.Get()
	//
	//fmt.Println("----222222222222222-----")
	//rs2, err2 := redis.String(gConn.Do("get", "aaa"))
	//
	//if err2 != nil {
	//	fmt.Println("get err:", err.Error())
	//}
	//
	//fmt.Println("rs----------------------", rs2)
	//
	//defer CConn.Close()
	defer gConn.Close()
	for i := 0; i < 50; i++ {
		go testg()
		//time.Sleep(500 * time.Millisecond)
	}

	time.Sleep(1000 * time.Second)

}

func testg() {
	gConn := Godispool.Get()
	defer gConn.Close()

	rs, err := redis.String(gConn.Do("get", "aaa"))

	if err != nil {
		fmt.Println("get err:", err.Error())
	}

	fmt.Println("rs----------------------", rs)

	time.Sleep(time.Second)
	return

}