grpc-goのInterceptorを使ってみる


始めに

 前回の延長でちょっと分かりにくいgrpc-goのInterceptorを使ってみようと思います。

 参考:go-grpc-middleware

Interceptorとは

 WebFramework系で言うとこのMiddlewareという認識で良いのではないかと思います。RPCメソッドの呼出に割り込んで事前・事後処理を実行することができます。
 サーバーサイド・クライアントサイドどちらでも使えるようですが、今回はサーバーサイドのみに使っています。

※grpc-goのソースにまだ実験的なものとあったので今後に注意必要かも

実装

 InterceptorにはUnaryInterceptorとStreamInterceptorの2種類が用意されており、単純なリクエストとリプライを返すRPCメソッドにはUnaryInterceptorを、Streamを使うRPCメソッドにはStreamInterceptorを使います。
 前回のサーバーサイドにInterceptorを用いて処理後のロギングを実装してみようと思います(Logrusを使います)。

server/main.goより抜粋

// 単項用Interceptor
func unaryServerInterceptor(logger *logrus.Logger) grpc.UnaryServerInterceptor {
    return func(ctx netCtx.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
        // deferを使って処理後にログを出力
        var err error
        defer func(begin time.Time) {
            // infoからメソッド名を取得
            method := path.Base(info.FullMethod)
            took := time.Since(begin)
            fields := logrus.Fields{
                "method": method,
                "took":   took,
            }
            if err != nil {
                fields["error"] = err
                logger.WithFields(fields).Error("Failed")
            } else {
                logger.WithFields(fields).Info("Successed")
            }
        }(time.Now())

        // handler = RPCメソッド
        reply, hErr := handler(ctx, req)
        if hErr != nil {
            err = hErr
        }

        return reply, err
    }
}
// Stream用Iterceptor
func streamServerInterceptor(logger *logrus.Logger) grpc.StreamServerInterceptor {
    return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
        // deferを使って処理後にログを出力
        var err error
        defer func(begin time.Time) {
            // infoからメソッド名を取得
            method := path.Base(info.FullMethod)
            took := time.Since(begin)
            fields := logrus.Fields{
                "method": method,
                "took":   took,
            }
            if err != nil {
                fields["error"] = err
                logger.WithFields(fields).Error("Failed")
            } else {
                logger.WithFields(fields).Info("Successed")
            }
        }(time.Now())

        // handler = RPCメソッド
        if hErr := handler(srv, stream); err != nil {
            err = hErr
        }

        return err
    }
}

 
 UnaryServerInterceptorでは引数にあるコンテキストをいじってhandlerに渡すことも可能です。StreamServerInterceptorではコンテキストは引数stream内に内包されており、取得は可能ですが、戻すことはできない(=編集できない)模様です。
 今回、参考にしたgo-grpc-middlewareではstreamをラッパーしてコンテキストを編集していたりします。

 これらをサーバーのオプションとして渡します。

server/main.goより抜粋

logger := logrus.New()

ops := make([]grpc.ServerOption, 0)
ops = append(ops, grpc.UnaryInterceptor(unaryServerInterceptor(logger)))
ops = append(ops, grpc.StreamInterceptor(streamServerInterceptor(logger)))

g := grpc.NewServer(ops...)

 これでInterceptorの実装は完了です。

最後に

 Streamの方で素直にコンテキストをいじれたらなぁって思いましたとさ。今のところ、ロギングか認証周りくらいしか使い道を思いつかなかったりします。

 前回作ったものをInterceptorを追加して更新しています。
 https://github.com/lightstaff/grpc_test