Golang実装Redis(3):メモリデータベースの実装

7310 ワード

本文はgolang実装redisシリーズの第3編で、主にメモリKVデータベースを実装する方法を紹介します.本稿の完全なソースコードは著者Github:HDT 3213/godis
db.goはメモリデータベースの主なソースファイルであるdb.Execメソッドは、プロトコル解析器からコマンドパラメータを取得し、対応する処理関数を呼び出して処理します.
目次:
  • Concurrent Hash Map
  • LockMap
  • TTL

  • Concurrent Hash Map
    KVメモリデータベースのコアは同時セキュリティのハッシュテーブルであり、一般的な設計はいくつかあります.
  • sync.map:golangが公式に提供する同時ハッシュテーブルは、性能は優れているが、構造は複雑で、
  • の拡張に不便である.
  • juc.ConcurrentHashMap:javaの同時ハッシュテーブルはセグメントロックで実現される.拡張時にハッシュ・テーブル・スレッドにアクセスすると、rehash操作が支援され、rehashが終了する前にすべての読み書き操作がブロックされます.キャッシュ・データベースにおけるキー値の対数が多く、読み書き操作に対する応答時間が高いため、jucを使用するポリシーは適切ではありません.
  • memcached hashtable:バックグラウンドスレッドでrehash操作を行うと、メインスレッドはアクセスするハッシュスロットがrehashされたかどうかを判断してold_を決定するhashtableか操作primary_かhashtable. この戦略は、プライマリ・スレッドとrehashスレッドとの間の競合をハッシュ・スロット内に制限し、rehash動作が読み書き動作に及ぼす影響を最小限に抑えることが最も理想的な実現方法である.しかし、著者は知識が浅いためgolangを使ってこの戦略を実現できないため、我慢して放棄します(主な原因はgolangがvolatileキーワードがなく、スレッドの可視性を保証する操作が非常に複雑であるためです)、読者の皆さんの討論を歓迎します.

  • 本文はsync.mapリリース前にgolangコミュニティで広く使用されていたセグメントロックポリシー.keyを一定数のshardに分散してrehash動作を回避した.shardはロック保護されたmapであり、shardがrehashを行うとshard内の読み書きがブロックされるが、他のshardには影響しない.
    この戦略は簡単で信頼性が高く実現しやすいが,hash性能は2回必要であるためやや劣っている.このdict完全ソースコードはGithubで独立して使用できます(何の役にも立たないが...).
    データ構造の定義:
    type ConcurrentDict struct {
        table []*Shard
        count int32
    }
    
    type Shard struct {
        m     map[string]interface{}
        mutex sync.RWMutex
    }
    

    構築時にshardを初期化するには、この操作には比較的時間がかかります.
    func computeCapacity(param int) (size int) {
    	if param <= 16 {
    		return 16
    	}
    	n := param - 1
    	n |= n >> 1
    	n |= n >> 2
    	n |= n >> 4
    	n |= n >> 8
    	n |= n >> 16
    	if n < 0 {
    		return math.MaxInt32
    	} else {
    		return int(n + 1)
    	}
    }
    
    func MakeConcurrent(shardCount int) *ConcurrentDict {
        shardCount = computeCapacity(shardCount)
        table := make([]*Shard, shardCount)
        for i := 0; i < shardCount; i++ {
            table[i] = &Shard{
                m: make(map[string]interface{}),
            }
        }
        d := &ConcurrentDict{
            count: 0,
            table: table,
        }
        return d
    }
    

    ハッシュアルゴリズムFNVアルゴリズムを選択:
    const prime32 = uint32(16777619)
    
    func fnv32(key string) uint32 {
        hash := uint32(2166136261)
        for i := 0; i < len(key); i++ {
            hash *= prime32
            hash ^= uint32(key[i])
        }
        return hash
    }
    

    shardを位置決めし、nが2の整数べき乗のときh%n==(n-1)&h
    func (dict *ConcurrentDict) spread(hashCode uint32) uint32 {
    	if dict == nil {
    		panic("dict is nil")
    	}
    	tableSize := uint32(len(dict.table))
    	return (tableSize - 1) & uint32(hashCode)
    }
    
    func (dict *ConcurrentDict) getShard(index uint32) *Shard {
    	if dict == nil {
    		panic("dict is nil")
    	}
    	return dict.table[index]
    }
    

    GetメソッドとPutメソッドの実装:
    func (dict *ConcurrentDict) Get(key string) (val interface{}, exists bool) {
    	if dict == nil {
    		panic("dict is nil")
    	}
    	hashCode := fnv32(key)
    	index := dict.spread(hashCode)
    	shard := dict.getShard(index)
    	shard.mutex.RLock()
    	defer shard.mutex.RUnlock()
    	val, exists = shard.m[key]
    	return
    }
    
    func (dict *ConcurrentDict) Len() int {
    	if dict == nil {
    		panic("dict is nil")
    	}
    	return int(atomic.LoadInt32(&dict.count))
    }
    
    // return the number of new inserted key-value
    func (dict *ConcurrentDict) Put(key string, val interface{}) (result int) {
    	if dict == nil {
    		panic("dict is nil")
    	}
    	hashCode := fnv32(key)
    	index := dict.spread(hashCode)
    	shard := dict.getShard(index)
    	shard.mutex.Lock()
    	defer shard.mutex.Unlock()
    
    	if _, ok := shard.m[key]; ok {
    		shard.m[key] = val
    		return 0
    	} else {
    		shard.m[key] = val
    		dict.addCount()
    		return 1
    	}
    }
    

    LockMap
    前節で実装したConcurrentMapは、単一のkey操作の同時セキュリティを保証しますが、まだ要件を満たすことはできません.
  • MSETNXコマンドは、所与のキーがすべて存在しない場合にのみ、所与のキー設定値をすべてロックするので、すべてのキーのチェックと設定が完了するまで、所与のキーをすべてロックする必要があります.
  • LPOPコマンドは、リストの最後の要素を除去する後にキー値ペアを除去する必要があるので、要素を除去するまでキーをロックし、空のリスト
  • を除去する.
    dbを実現する必要がありますLockerは、1つまたは複数のkeyをロックし、必要に応じてロックを解除するために使用されます.
    dbを実現する.Lockerの最も直接的な考え方はmap[string]*sync.RWMutexを使用することであり、ロックプロセスは2つのステップに分けられる:対応するロックを初期化→ロックし、ロックプロセスも2つのステップに分けられる:ロックを解除→対応するロックを解放する.では、解決できない同時問題があります.
    時間
    協程A
    コンシステントB
    1
    locker["a"].Unlock()
    2
    locker["a"] = &sync.RWMutex
    3
    delete(locker["a"])
    4
    locker["a"].Lock()
    t 3時において、リンクBがロックを解除したため、t 4時において、リンクAがロックしようとすると失敗する.
    ロックを解除するときにロックを解除しないと、この異常は回避できますが、使用したロックごとに解放されず、深刻なメモリ漏洩が発生します.
    ハッシュテーブルの長さは可能なキーの数よりはるかに少なく,逆に複数のキーが1つのハッシュスロットを共用できることに気づいた.単一のキーではなく、ハッシュ・スロットにロックを追加すると、ハッシュ・スロットの数が非常に少ないため、ロックを解放しなくてもメモリがあまり消費されません.
    著者らは,この考えに基づいて,同時制御問題を解決するために,LockerMapを実現した.
    type Locks struct {
        table []*sync.RWMutex
    }
    
    func Make(tableSize int) *Locks {
        table := make([]*sync.RWMutex, tableSize)
        for i := 0; i < tableSize; i++ {
            table[i] = &sync.RWMutex{}
        }
        return &Locks{
            table: table,
        }
    }
    
    func (locks *Locks)Lock(key string) {
        index := locks.spread(fnv32(key))
        mu := locks.table[index]
        mu.Lock()
    }
    
    func (locks *Locks)UnLock(key string) {
        index := locks.spread(fnv32(key))
        mu := locks.table[index]
        mu.Unlock()
    }
    

    ハッシュアルゴリズムはDictの項で説明されているが,これ以上説明しない.
    複数のkeyをロックする際に注意しなければならないのは、コヒーレントAがキーaを持つロックがキーbのロックを獲得しようとすると、コヒーレントBがキーbを持つロックがキーaを獲得しようとするロックがデッドロックとなることである.
    解決策は、すべてのコヒーレンスが同じ順序でロックされ、両方のコヒーレンスがキーaとキーbのロックを取得したい場合は、キーaのロックを取得してからキーbのロックを取得しなければならず、サイクル待ちを回避することができる.
    func (locks *Locks)Locks(keys ...string) {
        keySlice := make(sort.StringSlice, len(keys))
        copy(keySlice, keys)
        sort.Sort(keySlice)
        for _, key := range keySlice {
            locks.Lock(key)
        }
    }
    
    func (locks *Locks)RLocks(keys ...string) {
        keySlice := make(sort.StringSlice, len(keys))
        copy(keySlice, keys)
        sort.Sort(keySlice)
        for _, key := range keySlice {
            locks.RLock(key)
        }
    }
    

    TTL
    Time To Live(TTL)の実現方式は非常に簡単で,その核心はstring→timeハッシュテーブルである.
    キーにアクセスすると、有効期限が切れているかどうかを確認し、有効期限が切れているキーを削除します.
    func (db *DB) Get(key string) (*DataEntity, bool) {
    	db.stopWorld.RLock()
    	defer db.stopWorld.RUnlock()
    
    	raw, ok := db.Data.Get(key)
    	if !ok {
    		return nil, false
    	}
    	if db.IsExpired(key) {
    		return nil, false
    	}
    	entity, _ := raw.(*DataEntity)
    	return entity, true
    }
    
    func (db *DB) IsExpired(key string) bool {
    	rawExpireTime, ok := db.TTLMap.Get(key)
    	if !ok {
    		return false
    	}
    	expireTime, _ := rawExpireTime.(time.Time)
    	expired := time.Now().After(expireTime)
    	if expired {
    		db.Remove(key)
    	}
    	return expired
    }
    

    期限切れキーを定期的にチェックして削除します.
    func (db *DB) CleanExpired() {
    	now := time.Now()
    	toRemove := &List.LinkedList{}
    	db.TTLMap.ForEach(func(key string, val interface{}) bool {
    		expireTime, _ := val.(time.Time)
    		if now.After(expireTime) {
    			// expired
    			db.Data.Remove(key)
    			toRemove.Add(key)
    		}
    		return true
    	})
    	toRemove.ForEach(func(i int, val interface{}) bool {
    		key, _ := val.(string)
    		db.TTLMap.Remove(key)
    		return true
    	})
    }
    
    func (db *DB) TimerTask() {
    	ticker := time.NewTicker(db.interval)
    	go func() {
    		for range ticker.C {
    			db.CleanExpired()
    		}
    	}()
    }