GolangはRedis(4):AOF持続化とAOF書き換えを実現
6907 ワード
この記事では、golangを使用してredisシリーズを実装する4番目の記事で、golangを使用してAppend Only Fileの永続化とAOFファイルの書き換えを実装する方法について説明します.
本論文の完全なソースコードは著者GithubHDT 3213/godisにある.
AOFファイル
AOF永続化は典型的な非同期タスクであり、メインコヒーレンス(goroutine)はchannelを使用して非同期コヒーレンスにデータを送信し、非同期コヒーレンスによって永続化操作を実行することができる.
DBで関連フィールドを定義します.
永続化には、次の2つの詳細に注意する必要があります. getのような読み取りコマンドは、 を永続化する必要はありません. expireコマンドは、等価なexpireatコマンドで置き換えます.例えば、10:00に実行される
AOFに必要な追加情報をコマンド処理方法で返します.
SETコマンドの例:
処理コマンドのスケジューリング方法でaofコマンドをchannelに送信します.
非同期コパスにコマンドを書き込むには、次の手順に従います.
読み出しプロセスはプロトコル解析器のセクションと基本的に同じであり、本文では後述しない:loadAof.
AOF書き換え
キーaを100回付与すると、AOFファイルに100個の命令が生成されますが、最後の命令のみが有効です.永続化ファイルのサイズを減らすためには、AOF書き換えを行い、不要な命令を削除する必要があります.
書き換えは、メモリ内のデータを直接使用することはできません.Redis書き換えの実装はforkを行い,サブプロセスでデータベース内のデータを巡回してAOFファイルを再生成することである.golangはfork操作をサポートしていないため、forkの代わりにAOFファイルを読み込んでコピーを生成するしかありません.
AOF書き換えを行うには、次の2つの要件を満たす必要があります. AOF書き換えに失敗または中断する場合、AOFファイルは書き換え前の状態を保つ必要があり、データを失うことはできない . AOF書き換え中に実行するコマンドは、新しいAOFファイルに保存する必要があります. を失うことはできません.
そこで、複雑なプロセスを設計しました. AOF書き込みの一時停止->書き換え中の状態の変更->現在のAOFファイルのコピー->AOF書き込みの再開 書き換え中には、コマンドをファイルに書き込むと同時にメモリ内の書き換えキャッシュ領域 にも書き込む永続化プロトコルがある. Coordinate Read AOFコピーを書き換え、一時ファイル(tmp.aof)の に書き換える. AOF書き込みを一時停止->書き換えバッファのコマンドをtmpに書き込む.aof->一時ファイルtmpを使用する.aof AOFファイルの上書き(ファイルシステムのmvコマンドを使用してセキュリティを保証)->書き換えバッファのクリア->AOF書き込みの再開 オンラインサービスをブロックせずに他の操作を行うことが必要であり、AOF書き換えの考え方はこのような問題を解決する際に重要な参考価値を持っている.例えばMysql Online DDL:gh-ostは同様の戦略を採用してデータの一致を保証している.
まず、書き換えを開始する準備をします.
書き換え中に、永続化されたコパスを二重書きにします.
書き換えの実行:
書き換えが完了したら、バッファ内のデータを書き込み、正式なファイルを置き換えます.
本論文の完全なソースコードは著者GithubHDT 3213/godisにある.
AOFファイル
AOF永続化は典型的な非同期タスクであり、メインコヒーレンス(goroutine)はchannelを使用して非同期コヒーレンスにデータを送信し、非同期コヒーレンスによって永続化操作を実行することができる.
DBで関連フィールドを定義します.
type DB struct {
// channel
aofChan chan *reply.MultiBulkReply
// append file
aofFile *os.File
// append file
aofFilename string
// aof , AOF
aofRewriteChan chan *reply.MultiBulkReply
//
pausingAof sync.RWMutex
}
永続化には、次の2つの詳細に注意する必要があります.
expire a 3600
は、キーaが11:00に期限切れであることを示し、10:30にAOFファイルをロードするときに実行されるexpire a 3600
は、11:30に期限切れが元のデータと一致しないことになる.AOFに必要な追加情報をコマンド処理方法で返します.
type extra struct {
//
toPersist bool
// expire
// specialAof == nil , specialAof
specialAof []*reply.MultiBulkReply
}
type CmdFunc func(db *DB, args [][]byte) (redis.Reply, *extra)
SETコマンドの例:
func Set(db *DB, args [][]byte) (redis.Reply, *extra) {
//....
var result int
switch policy {
case upsertPolicy:
result = db.Put(key, entity)
case insertPolicy:
result = db.PutIfAbsent(key, entity)
case updatePolicy:
result = db.PutIfExists(key, entity)
}
extra := &extra{toPersist: result > 0} // toPresist=true, XX NX toPresist=false
if result > 0 {
if ttl != unlimitedTTL { // EX NX
expireTime := time.Now().Add(time.Duration(ttl) * time.Millisecond)
db.Expire(key, expireTime)
// set key value pexpireat set key value EX ttl
extra.specialAof = []*reply.MultiBulkReply{
reply.MakeMultiBulkReply([][]byte{
[]byte("SET"),
args[0],
args[1],
}),
makeExpireCmd(key, expireTime),
}
} else {
db.Persist(key) // override ttl
}
}
return &reply.OkReply{}, extra
}
var pExpireAtCmd = []byte("PEXPIREAT")
func makeExpireCmd(key string, expireAt time.Time) *reply.MultiBulkReply {
args := make([][]byte, 3)
args[0] = pExpireAtCmd
args[1] = []byte(key)
args[2] = []byte(strconv.FormatInt(expireAt.UnixNano()/1e6, 10))
return reply.MakeMultiBulkReply(args)
}
処理コマンドのスケジューリング方法でaofコマンドをchannelに送信します.
func (db *DB) Exec(c redis.Client, args [][]byte) (result redis.Reply) {
// ....
// normal commands
var extra *extra
cmdFunc, ok := router[cmd] //
if !ok {
return reply.MakeErrReply("ERR unknown command '" + cmd + "'")
}
//
if len(args) > 1 {
result, extra = cmdFunc(db, args[1:])
} else {
result, extra = cmdFunc(db, [][]byte{})
}
// AOF
if config.Properties.AppendOnly {
if extra != nil && extra.toPersist {
// specialAof
if extra.specialAof != nil && len(extra.specialAof) > 0 {
for _, r := range extra.specialAof {
db.addAof(r)
}
} else {
//
r := reply.MakeMultiBulkReply(args)
db.addAof(r)
}
}
}
return
}
非同期コパスにコマンドを書き込むには、次の手順に従います.
func (db *DB) handleAof() {
for cmd := range db.aofChan {
// ,
//
db.pausingAof.RLock()
if db.aofRewriteChan != nil {
db.aofRewriteChan
読み出しプロセスはプロトコル解析器のセクションと基本的に同じであり、本文では後述しない:loadAof.
AOF書き換え
キーaを100回付与すると、AOFファイルに100個の命令が生成されますが、最後の命令のみが有効です.永続化ファイルのサイズを減らすためには、AOF書き換えを行い、不要な命令を削除する必要があります.
書き換えは、メモリ内のデータを直接使用することはできません.Redis書き換えの実装はforkを行い,サブプロセスでデータベース内のデータを巡回してAOFファイルを再生成することである.golangはfork操作をサポートしていないため、forkの代わりにAOFファイルを読み込んでコピーを生成するしかありません.
AOF書き換えを行うには、次の2つの要件を満たす必要があります.
そこで、複雑なプロセスを設計しました.
まず、書き換えを開始する準備をします.
func (db *DB) startRewrite() (*os.File, error) {
// AOF , db.aofChan
db.pausingAof.Lock()
defer db.pausingAof.Unlock()
//
db.aofRewriteChan = make(chan *reply.MultiBulkReply, aofQueueSize)
//
file, err := ioutil.TempFile("", "aof")
if err != nil {
logger.Warn("tmp file create failed")
return nil, err
}
return file, nil
}
書き換え中に、永続化されたコパスを二重書きにします.
func (db *DB) handleAof() {
for cmd := range db.aofChan {
db.pausingAof.RLock()
if db.aofRewriteChan != nil {
//
db.aofRewriteChan
書き換えの実行:
func (db *DB) aofRewrite() {
file, err := db.startRewrite()
if err != nil {
logger.Warn(err)
return
}
// load aof file
tmpDB := &DB{
Data: dict.MakeSimple(),
TTLMap: dict.MakeSimple(),
Locker: lock.Make(lockerSize),
interval: 5 * time.Second,
aofFilename: db.aofFilename,
}
tmpDB.loadAof()
// rewrite aof file
tmpDB.Data.ForEach(func(key string, raw interface{}) bool {
var cmd *reply.MultiBulkReply
entity, _ := raw.(*DataEntity)
switch val := entity.Data.(type) {
case []byte:
cmd = persistString(key, val)
case *List.LinkedList:
cmd = persistList(key, val)
case *set.Set:
cmd = persistSet(key, val)
case dict.Dict:
cmd = persistHash(key, val)
case *SortedSet.SortedSet:
cmd = persistZSet(key, val)
}
if cmd != nil {
_, _ = file.Write(cmd.ToBytes())
}
return true
})
tmpDB.TTLMap.ForEach(func(key string, raw interface{}) bool {
expireTime, _ := raw.(time.Time)
cmd := makeExpireCmd(key, expireTime)
if cmd != nil {
_, _ = file.Write(cmd.ToBytes())
}
return true
})
db.finishRewrite(file)
}
書き換えが完了したら、バッファ内のデータを書き込み、正式なファイルを置き換えます.
func (db *DB) finishRewrite(tmpFile *os.File) {
// AOF
db.pausingAof.Lock()
defer db.pausingAof.Unlock()
//
// handleAof , aofRewriteChan
loop:
for {
select {
case cmd :=