GolangはRedis(4):AOF持続化とAOF書き換えを実現

6907 ワード

この記事では、golangを使用してredisシリーズを実装する4番目の記事で、golangを使用してAppend Only Fileの永続化とAOFファイルの書き換えを実装する方法について説明します.
本論文の完全なソースコードは著者GithubHDT 3213/godisにある.
AOFファイル
AOF永続化は典型的な非同期タスクであり、メインコヒーレンス(goroutine)はchannelを使用して非同期コヒーレンスにデータを送信し、非同期コヒーレンスによって永続化操作を実行することができる.
DBで関連フィールドを定義します.
type DB struct {
    //       channel               
    aofChan     chan *reply.MultiBulkReply 
    // append file      
    aofFile     *os.File  
    // append file   
	aofFilename string 

    // aof         ,  AOF        
    aofRewriteChan chan *reply.MultiBulkReply 
    //                   
	pausingAof     sync.RWMutex 
}

永続化には、次の2つの詳細に注意する必要があります.
  • getのような読み取りコマンドは、
  • を永続化する必要はありません.
  • expireコマンドは、等価なexpireatコマンドで置き換えます.例えば、10:00に実行されるexpire a 3600は、キーaが11:00に期限切れであることを示し、10:30にAOFファイルをロードするときに実行されるexpire a 3600は、11:30に期限切れが元のデータと一致しないことになる.

  • AOFに必要な追加情報をコマンド処理方法で返します.
    type extra struct {
        //             
        toPersist  bool 
        //       expire             
        //   specialAof == nil          ,      specialAof     
    	specialAof []*reply.MultiBulkReply 
    }
    
    type CmdFunc func(db *DB, args [][]byte) (redis.Reply, *extra)
    

    SETコマンドの例:
    func Set(db *DB, args [][]byte) (redis.Reply, *extra) {
        //....
        var result int
        switch policy {
        case upsertPolicy:
            result = db.Put(key, entity)
        case insertPolicy:
            result = db.PutIfAbsent(key, entity)
        case updatePolicy:
            result = db.PutIfExists(key, entity)
        }
        extra := &extra{toPersist: result > 0} //          toPresist=true,    XX NX           toPresist=false
        if result > 0 {
            if ttl != unlimitedTTL { //     EX   NX   
                expireTime := time.Now().Add(time.Duration(ttl) * time.Millisecond)
                db.Expire(key, expireTime)
                //        set key value   pexpireat      set key value EX ttl   
                extra.specialAof = []*reply.MultiBulkReply{ 
                    reply.MakeMultiBulkReply([][]byte{
                        []byte("SET"),
                        args[0],
                        args[1],
                    }),
                    makeExpireCmd(key, expireTime),
                }
            } else {
                db.Persist(key) // override ttl
            }
        }
        return &reply.OkReply{}, extra
    }
    
    var pExpireAtCmd = []byte("PEXPIREAT")
    
    func makeExpireCmd(key string, expireAt time.Time) *reply.MultiBulkReply {
    	args := make([][]byte, 3)
    	args[0] = pExpireAtCmd
    	args[1] = []byte(key)
    	args[2] = []byte(strconv.FormatInt(expireAt.UnixNano()/1e6, 10))
    	return reply.MakeMultiBulkReply(args)
    }
    

    処理コマンドのスケジューリング方法でaofコマンドをchannelに送信します.
    func (db *DB) Exec(c redis.Client, args [][]byte) (result redis.Reply) {
    	// ....
    	// normal commands
    	var extra *extra
    	cmdFunc, ok := router[cmd] //            
    	if !ok {
    		return reply.MakeErrReply("ERR unknown command '" + cmd + "'")
        }
        //           
    	if len(args) > 1 {
    		result, extra = cmdFunc(db, args[1:])
    	} else {
    		result, extra = cmdFunc(db, [][]byte{})
    	}
    
    	// AOF    
    	if config.Properties.AppendOnly {
    		if extra != nil && extra.toPersist {
                //    specialAof
    			if extra.specialAof != nil && len(extra.specialAof) > 0 {
    				for _, r := range extra.specialAof {
    					db.addAof(r)
    				}
    			} else {
                    //       
    				r := reply.MakeMultiBulkReply(args)
    				db.addAof(r)
    			}
    		}
    	}
    	return
    }
    

    非同期コパスにコマンドを書き込むには、次の手順に従います.
    func (db *DB) handleAof() {
    	for cmd := range db.aofChan {
            //                 ,                 
            //                       
    		db.pausingAof.RLock() 
    		if db.aofRewriteChan != nil {
    			db.aofRewriteChan 

    読み出しプロセスはプロトコル解析器のセクションと基本的に同じであり、本文では後述しない:loadAof.
    AOF書き換え
    キーaを100回付与すると、AOFファイルに100個の命令が生成されますが、最後の命令のみが有効です.永続化ファイルのサイズを減らすためには、AOF書き換えを行い、不要な命令を削除する必要があります.
    書き換えは、メモリ内のデータを直接使用することはできません.Redis書き換えの実装はforkを行い,サブプロセスでデータベース内のデータを巡回してAOFファイルを再生成することである.golangはfork操作をサポートしていないため、forkの代わりにAOFファイルを読み込んでコピーを生成するしかありません.
    AOF書き換えを行うには、次の2つの要件を満たす必要があります.
  • AOF書き換えに失敗または中断する場合、AOFファイルは書き換え前の状態を保つ必要があり、データを失うことはできない
  • .
  • AOF書き換え中に実行するコマンドは、新しいAOFファイルに保存する必要があります.
  • を失うことはできません.
    そこで、複雑なプロセスを設計しました.
  • AOF書き込みの一時停止->書き換え中の状態の変更->現在のAOFファイルのコピー->AOF書き込みの再開
  • 書き換え中には、コマンドをファイルに書き込むと同時にメモリ内の書き換えキャッシュ領域
  • にも書き込む永続化プロトコルがある.
  • Coordinate Read AOFコピーを書き換え、一時ファイル(tmp.aof)の
  • に書き換える.
  • AOF書き込みを一時停止->書き換えバッファのコマンドをtmpに書き込む.aof->一時ファイルtmpを使用する.aof AOFファイルの上書き(ファイルシステムのmvコマンドを使用してセキュリティを保証)->書き換えバッファのクリア->AOF書き込みの再開
  • オンラインサービスをブロックせずに他の操作を行うことが必要であり、AOF書き換えの考え方はこのような問題を解決する際に重要な参考価値を持っている.例えばMysql Online DDL:gh-ostは同様の戦略を採用してデータの一致を保証している.
    まず、書き換えを開始する準備をします.
    func (db *DB) startRewrite() (*os.File, error) {
        //   AOF  ,      db.aofChan      
    	db.pausingAof.Lock() 
    	defer db.pausingAof.Unlock()
    
    	//        
    	db.aofRewriteChan = make(chan *reply.MultiBulkReply, aofQueueSize)
    
    	//       
    	file, err := ioutil.TempFile("", "aof")
    	if err != nil {
    		logger.Warn("tmp file create failed")
    		return nil, err
    	}
    	return file, nil
    }
    

    書き換え中に、永続化されたコパスを二重書きにします.
    func (db *DB) handleAof() {
    	for cmd := range db.aofChan {
    		db.pausingAof.RLock() 
    		if db.aofRewriteChan != nil {
                //          
    			db.aofRewriteChan 

    書き換えの実行:
    func (db *DB) aofRewrite() {
    	file, err := db.startRewrite()
    	if err != nil {
    		logger.Warn(err)
    		return
    	}
    
    	// load aof file
    	tmpDB := &DB{
    		Data:     dict.MakeSimple(),
    		TTLMap:   dict.MakeSimple(),
    		Locker:   lock.Make(lockerSize),
    		interval: 5 * time.Second,
    
    		aofFilename: db.aofFilename,
    	}
    	tmpDB.loadAof()
    
    	// rewrite aof file
    	tmpDB.Data.ForEach(func(key string, raw interface{}) bool {
    		var cmd *reply.MultiBulkReply
    		entity, _ := raw.(*DataEntity)
    		switch val := entity.Data.(type) {
    		case []byte:
    			cmd = persistString(key, val)
    		case *List.LinkedList:
    			cmd = persistList(key, val)
    		case *set.Set:
    			cmd = persistSet(key, val)
    		case dict.Dict:
    			cmd = persistHash(key, val)
    		case *SortedSet.SortedSet:
    			cmd = persistZSet(key, val)
    
    		}
    		if cmd != nil {
    			_, _ = file.Write(cmd.ToBytes())
    		}
    		return true
    	})
    	tmpDB.TTLMap.ForEach(func(key string, raw interface{}) bool {
    		expireTime, _ := raw.(time.Time)
    		cmd := makeExpireCmd(key, expireTime)
    		if cmd != nil {
    			_, _ = file.Write(cmd.ToBytes())
    		}
    		return true
    	})
    
    	db.finishRewrite(file)
    }
    

    書き換えが完了したら、バッファ内のデータを書き込み、正式なファイルを置き換えます.
    func (db *DB) finishRewrite(tmpFile *os.File) {
        //   AOF  
    	db.pausingAof.Lock() 
    	defer db.pausingAof.Unlock()
    
    
        //                 
    	//   handleAof    ,     aofRewriteChan       
        loop:
    	for {
    		select {
    		case cmd :=