golang+grpcの備忘録


始めに

 grpcのクイックスタートと大差ないけど自分用に書いておく。
 ソース:https://github.com/lightstaff/grpc_test

GRPCの準備

 ここはさすがに上記のクイックスタート通りで…

protocolの準備

 **.protoに定義を書く。今回は単純にHelloって文字列を持つstructを返すサービス(GetHello)とstreamを使って受けた文字列を大文字化して返すサービス(UpperCharacters)を定義する。
 インポートしている[github.com/gogo/protobuf/gogoproto/gogo.proto]はgeneratorを拡張して色々してくれるので便利です(以下省略)。

protobuf.proto

syntax = "proto3";

package gprc_test;

// 色々便利
import "github.com/gogo/protobuf/gogoproto/gogo.proto";

option go_package = "protobuf";
option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
option (gogoproto.goproto_getters_all) = false;

// サービス定義
service GRPCTestServcie {
    // Helloと返すだけのサービス
    rpc GetHello(Empty) returns (ReplyModel) {}

    // stream経由で受けた文字列を大文字化して返すサービス
    rpc UpperCharacters(stream ReqModel) returns (stream ReplyModel) {}
}

// 空
message Empty {}

// Request
message ReqModel {
    string message = 1;
}

// Replay
message ReplyModel {
    string result = 1;
}

上記を
protoc --proto_path=$GOPATH/src:$GOPATH/src/github.com/gogo/protobuf/protobuf:. --gofast_out=plugins=grpc:. ./protobuf/protobuf.proto
でgoのプロトコル(protobuf/protobuf.pb.go)ができます。

サーバーサイド

 protobuf/protobuf.pb.goを参照し、サーバーを書きます。

 現在、1.6だったか1.7だったかで標準化されたcontextは使えません。golang.org/x/net/contextを使う必要があります。混同に注意。

service.go

package main

import (
    "io"
    "strings"

    pb "github.com/lightstaff/grpc_test/protobuf"

    netCtx "golang.org/x/net/context"
)

// Service model
type Service struct{}

// 単純にHelloと返す
func (s *Service) GetHello(ctx netCtx.Context, e *pb.Empty) (*pb.ReplyModel, error) {
    return &pb.ReplyModel{
        Result: "Hello",
    }, nil
}

// stream経由で受けた文字列を大文字化して返す
func (s *Service) UpperCharacters(stream pb.GRPCTestServcie_UpperCharactersServer) error {
    for {
        // streamが終了するまで受信し続ける
        req, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            return err
        }

        // 受けたReqModelから大文字化してstreamにReplyModelを送信
        if err := stream.Send(&pb.ReplyModel{
            Result: strings.ToUpper(req.Message),
        }); err != nil {
            return err
        }
    }

    return nil
}

main.go

package main

import (
    "net"
    "os"
    "os/signal"
    "syscall"

    pb "github.com/lightstaff/grpc_test/protobuf"

    "google.golang.org/grpc"
)

func main() {
    g := grpc.NewServer()
    s := &Service{}

    pb.RegisterGRPCTestServcieServer(g, s)

    errC := make(chan error)

    go func() {
        lis, err := net.Listen("tcp", ":18080")
        if err != nil {
            errC <- err
        }

        if err := g.Serve(lis); err != nil {
            errC <- err
        }
    }()

    quitC := make(chan os.Signal)
    signal.Notify(quitC, syscall.SIGINT, syscall.SIGTERM)

    select {
    case err := <-errC:
        panic(err)
    case <-quitC:
        g.Stop()
    }
}

 これでlocalhost:18080にダイアルすることができるようになります。

クライアントサイド

 クライアントはWebサービスを想定し、echoを使います。

controller.go

package main

import (
    "io"
    "net/http"

    pb "github.com/lightstaff/grpc_test/protobuf"

    "github.com/labstack/echo"
    netCtx "golang.org/x/net/context"
)

// Heloと返すだけ
func GetHello(c echo.Context) error {
    sc, ok := c.(*ServiceContext)
    if !ok {
        return echo.NewHTTPError(http.StatusBadRequest, "コンテキストが取得できません")
    }

    rep, err := sc.ServiceClient.GetHello(netCtx.Background(), &pb.Empty{})
    if err != nil {
        return echo.NewHTTPError(http.StatusBadRequest, err.Error())
    }

    return c.JSON(http.StatusOK, map[string]interface{}{
        "reply": rep.Result,
    })
}

// stream経由で受けた文字列を大文字化して返すサービスを呼び出してやりとり
func UpperCharacters(c echo.Context) error {
    sc, ok := c.(*ServiceContext)
    if !ok {
        return echo.NewHTTPError(http.StatusBadRequest, "コンテキストが取得できません")
    }

    type bodyModel struct {
        Messages []string `json:"messages"`
    }

    // JSONを変換
    var m bodyModel
    if err := c.Bind(&m); err != nil {
        return echo.NewHTTPError(http.StatusBadRequest, err.Error())
    }

    // streamを生成
    stream, err := sc.ServiceClient.UpperCharacters(netCtx.Background())
    if err != nil {
        return echo.NewHTTPError(http.StatusBadRequest, err.Error())
    }

    // 受信はgoroutineで
    errC := make(chan error)
    resultC := make(chan *pb.ReplyModel)
    doneC := make(chan struct{})
    go func() {
        defer func() {
            close(errC)
            close(resultC)
            close(doneC)
        }()

        for {
            res, err := stream.Recv()
            if err == io.EOF {
                break
            }
            if err != nil {
                errC <- err
                return
            }

            resultC <- res
        }
    }()

    // 文字列をsteamに送る
    for _, message := range m.Messages {
        if err := stream.Send(&pb.ReqModel{
            Message: message,
        }); err != nil {
            return echo.NewHTTPError(http.StatusBadRequest, err.Error())
        }
    }

    if err := stream.CloseSend(); err != nil {
        return echo.NewHTTPError(http.StatusBadRequest, err.Error())
    }

    // この辺もうちょっとスマートに書きたい…
    results := make([]string, 0)
    for {
        select {
        case err := <-errC:
            if err != nil {
                return echo.NewHTTPError(http.StatusBadRequest, err.Error())
            }
        case result := <-resultC:
            if result != nil {
                results = append(results, result.Result)
            }
        case <-doneC:
            return c.JSON(http.StatusOK, map[string]interface{}{
                "results": results,
            })
        }
    }
}

main.go

package main

import (
    "context"
    "net/http"
    "os"
    "os/signal"
    "syscall"
    "time"

    pb "github.com/lightstaff/grpc_test/protobuf"

    "google.golang.org/grpc"
    "github.com/labstack/echo"
)

type ServiceContext struct {
    echo.Context
    ServiceClient pb.GRPCTestServcieClient
}

// このMiddlewareでGRPCにダイアル
func serviceContextMiddleware(grpcAddr string) echo.MiddlewareFunc {
    return func(next echo.HandlerFunc) echo.HandlerFunc {
        return func(c echo.Context) error {
            cc, err := grpc.Dial(grpcAddr, grpc.WithBlock(), grpc.WithInsecure())
            if err != nil {
                return echo.NewHTTPError(http.StatusInternalServerError, err.Error())
            }
            defer cc.Close()

            sc := &ServiceContext{
                Context:       c,
                ServiceClient: pb.NewGRPCTestServcieClient(cc),
            }

            return next(sc)
        }
    }
}

func main() {
    e := echo.New()

    e.Use(serviceContextMiddleware("localhost:18080"))

    e.GET("/hello", GetHello)
    e.POST("/upper-characters", UpperCharacters)

    errC := make(chan error)
    go func() {
        if err := e.Start(":8080"); err != nil {
            errC <- err
        }
    }()

    quitC := make(chan os.Signal)
    signal.Notify(quitC, syscall.SIGINT, syscall.SIGTERM)

    select {
    case err := <-errC:
        panic(err)
    case <-quitC:
        shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
        defer cancel()
        if err := e.Shutdown(shutdownCtx); err != nil {
            errC <- err
        }
    }
}

 /upper-charactersにはJSONで'{"messages:["aaa","bbb","ccc"]}'と渡してあげてください。
 ちなみにサーバーからReplyModelが帰ってくる前にメソッドが終了し、ServiceClientが解放されるとサーバーが送信先を失っていまいちなエラーが起こります。