Redisプロトコル解析の実装
7823 ワード
この文書は、「GolangによるRedisの実装」シリーズの2番目の記事です.この文書では、Redis通信プロトコルとプロトコル解析器の実装について説明します.プロトコルについてよく知っていれば、プロトコル解析器のセクションを直接読むことができます.
Redis通信プロトコル
Redisは2.0版から統一プロトコルRESP(REdis Serialization Protocol)を使用しており、このプロトコルは実現しやすく、コンピュータは効率的に解析でき、人間に読みやすくなっている.
RESPは、TCPプロトコル上で動作するバイナリセキュリティのテキストプロトコルです.クライアントとサーバが送信するコマンドまたはデータは、
RESPは5つのフォーマットを定義しています.単純文字列(Simple String):サーバは、「OK」などの単純な結果を返すために使用されます.バイナリ以外は安全で、改行は許可されていません. エラーメッセージ(Error):サーバは、「ERR Invalid Synatx」などの簡単な結果を返すために使用されます.バイナリ以外は安全で、改行は許可されていません. 整数(Integer): 文字列(Bulk String):バイナリセキュリティ文字列、 .配列(Array、旧版ドキュメントではMulti Bulk Stringsと呼ぶ):Bulk String配列、クライアント送信命令および .
RESPは、最初の文字でフォーマットを表します.単純文字列:「+OKr」 のように「+」で開始エラー:"-"で開始します.たとえば:"-ERR Invalid Synatxr" 整数:「:」で始まります.たとえば、「:1r」 文字列: 配列: Bulk Stringは2行あり、第1の動作
Bulk Stringは、バイナリ・セキュリティで任意のバイトを含むことができます.つまり、Bulk Stringの内部に「r」文字を含めることができます(行末のCRLFが隠されています):
Arrayフォーマットの最初の動作"*"+配列長は、対応する数のBulk Stringに続く.例えば、
クライアントもArray形式でサービス側に命令を送信する.コマンド自体は、
改行を印刷:
プロトコル解析器
TCPサーバの実装については、プロトコル解析器がそのHandlerインタフェースをアプリケーション層サーバとして実装するTCPサーバの実装について説明した.
プロトコル解析器は、Socketから送信されたデータを受信し、
本明細書の完全なコード:Github:HDT 3213/godis
クライアントからの要求は、最初の行にメッセージの合計行数をマークし、
RESPは
したがって、4行目の
クライアント抽象として
解析の定義:
次に、主なセクションを作成できます.
Redis通信プロトコル
Redisは2.0版から統一プロトコルRESP(REdis Serialization Protocol)を使用しており、このプロトコルは実現しやすく、コンピュータは効率的に解析でき、人間に読みやすくなっている.
RESPは、TCPプロトコル上で動作するバイナリセキュリティのテキストプロトコルです.クライアントとサーバが送信するコマンドまたはデータは、
\r
(CRLF)で終わります.RESPは5つのフォーマットを定義しています.
llen
、scard
などのコマンドの戻り値、64ビット符号付き整数get
などのコマンドの戻り値lrange
などの命令応答のフォーマットRESPは、最初の文字でフォーマットを表します.
$
から*
から$
+本文長、第2の動作の実際の内容.次のようになります.$3\r
SET\r
Bulk Stringは、バイナリ・セキュリティで任意のバイトを含むことができます.つまり、Bulk Stringの内部に「r」文字を含めることができます(行末のCRLFが隠されています):
$4
a\r
b
$-1
はnilを表し、例えばgetコマンドを使用して存在しないkeyを問合せた場合、応答は$-1
である.Arrayフォーマットの最初の動作"*"+配列長は、対応する数のBulk Stringに続く.例えば、
["foo", "bar"]
のメッセージ:*2
$3
foo
$3
bar
クライアントもArray形式でサービス側に命令を送信する.コマンド自体は、
SET key value
コマンドのRESPメッセージのような最初のパラメータとして使用されます.*3
$3
SET
$3
key
$5
value
改行を印刷:
*3\r
$3\r
SET\r
$3\r
key\r
$5\r
value\r
プロトコル解析器
TCPサーバの実装については、プロトコル解析器がそのHandlerインタフェースをアプリケーション層サーバとして実装するTCPサーバの実装について説明した.
プロトコル解析器は、Socketから送信されたデータを受信し、
[][]byte
のように"*3\r
$3\r
SET\r
$3\r
key\r
$5\r\value\r
"
のフォーマットに復元する.本明細書の完全なコード:Github:HDT 3213/godis
クライアントからの要求は、最初の行にメッセージの合計行数をマークし、
['SET', 'key', 'value']
を支店として使用する配列形式である.CRLF
標準ライブラリでは、readerから読み込まれたデータをbufferにキャッシュし、セパレータに遭遇したり、読み取りが完了した後に戻ったりすることができるので、bufio
を使用して、完全なローを読み出すたびに保証します.RESPは
reader.ReadBytes('
')
のプロトコルであり、本文で
文字を使用できることに注意してください.例えば、Redisは、CRLF
命令を正しく受信し、実行することができ、この命令の正しいメッセージは、次のようなものである.*3
$3
SET
$4
a\r
b
$7
myvalue
SET "a\r
b" 1
が5行目の「arbr」に読み込まれた場合、2行と誤認されます.*3
$3
SET
$4
a //
b //
$7
myvalue
したがって、4行目の
ReadBytes
を読み取った後、$4
を使用して次の行を読み取るべきではなく、ReadBytes('
')
を使用して指定された長さのコンテンツを読み取るべきである.msg = make([]byte, 4 + 2) // 4 + 2
_, err = io.ReadFull(reader, msg)
クライアント抽象として
io.ReadFull(reader, msg)
構造体を定義します.type Client struct {
/* Tcp */
conn net.Conn
/*
* timeout WaitGroup,
* waiting ,
*/
waitingReply wait.Wait
/* */
sending atomic.AtomicBool
/* , Array */
expectedArgsCount uint32
/* , len(args)*/
receivedCount uint32
/*
* , []byte
*/
args [][]byte
}
解析の定義:
type Handler struct {
/*
*
* *Client -> placeholder
*/
activeConn sync.Map
/* , */
db db.DB
/* , */
closing atomic.AtomicBool
}
次に、主なセクションを作成できます.
func (h *Handler)Handle(ctx context.Context, conn net.Conn) {
if h.closing.Get() {
//
_ = conn.Close()
}
/* */
client := &Client {
conn: conn,
}
h.activeConn.Store(client, 1)
reader := bufio.NewReader(conn)
var fixedLen int64 = 0 // BulkString
var err error
var msg []byte
for {
/* */
if fixedLen == 0 { // CRLF
msg, err = reader.ReadBytes('
')
// \r
if len(msg) == 0 || msg[len(msg) - 2] != '\r' {
errReply := &reply.ProtocolErrReply{Msg:"invalid multibulk length"}
_, _ = client.conn.Write(errReply.ToBytes())
}
} else { // BulkString ,
msg = make([]byte, fixedLen + 2)
_, err = io.ReadFull(reader, msg)
// \r
if len(msg) == 0 ||
msg[len(msg) - 2] != '\r' ||
msg[len(msg) - 1] != '
'{
errReply := &reply.ProtocolErrReply{Msg:"invalid multibulk length"}
_, _ = client.conn.Write(errReply.ToBytes())
}
// Bulk String ,
fixedLen = 0
}
// IO
if err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
logger.Info("connection close")
} else {
logger.Warn(err)
}
_ = client.Close()
h.activeConn.Delete(client)
return // io error, disconnect with client
}
/* */
if !client.sending.Get() {
// sending == false
if msg[0] == '*' {
//
expectedLine, err := strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
if err != nil {
_, _ = client.conn.Write(UnknownErrReplyBytes)
continue
}
//
client.waitingReply.Add(1) // ,
client.sending.Set(true) //
//
client.expectedArgsCount = uint32(expectedLine)
client.receivedCount = 0
client.args = make([][]byte, expectedLine)
} else {
// TODO: text protocol
}
} else {
// ( )
line := msg[0:len(msg)-2] //
if line[0] == '$' {
// BulkString , String
fixedLen, err = strconv.ParseInt(string(line[1:]), 10, 64)
if err != nil {
errReply := &reply.ProtocolErrReply{Msg:err.Error()}
_, _ = client.conn.Write(errReply.ToBytes())
}
if fixedLen <= 0 {
errReply := &reply.ProtocolErrReply{Msg:"invalid multibulk length"}
_, _ = client.conn.Write(errReply.ToBytes())
}
} else {
//
client.args[client.receivedCount] = line
client.receivedCount++
}
//
if client.receivedCount == client.expectedArgsCount {
client.sending.Set(false)
//
result := h.db.Exec(client.args)
if result != nil {
_, _ = conn.Write(result.ToBytes())
} else {
_, _ = conn.Write(UnknownErrReplyBytes)
}
// ,
client.expectedArgsCount = 0
client.receivedCount = 0
client.args = nil
client.waitingReply.Done()
}
}
}
}