RubyでgRPCのストリーミング


概要

  • gRPCの双方向なストリーミング機能をRubyで利用する方法

前提

  • 以下でgRPC用のgemをインストール
$ gem install grpc
$ gem install grpc-tools

方法

protoファイル

  • service定義で、パラメータと戻り値に"stream"を付ける
sample.proto
syntax = 'proto3';

package sample;

service Echo {
    rpc Talk (stream Request) returns (stream Response);
}

message Request {
    string message = 1;
}

message Response {
    string message = 1;
}

コンパイル

grpc_tools_ruby_protoc --ruby_out=lib --grpc_out=lib sample.proto
  • 上記protoファイルをコンパイル
    • sample_pb.rbとsample_services_pb.rbが生成される

プログラム実装(例)

  • サーバ側、クライアント側双方、Enumeratorを介してやりとりする

サーバ側

sample_server.rb
#!/usr/bin/env ruby

this_dir = File.expand_path(File.dirname(__FILE__))
lib_dir = File.join(this_dir, 'lib')
$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)

require 'grpc'
require 'sample_services_pb'

class EchoServer < Sample::Echo::Service
  def talk(requests)
    return enum_for(:talk, requests) unless block_given?

    requests.each do |request|
      sleep rand(5)
      message = "received (#{request.message})"
      yield Sample::Response.new(message: message)
    end
  end
end

def main
  s = GRPC::RpcServer.new
  s.add_http2_port('localhost:50051', :this_port_is_insecure)
  s.handle(EchoServer)
  s.run_till_terminated
end

main
  • サーバ用クラスのメソッドの引数には、クライアント側から渡されたEnumeratorなオブジェクト(後述)が渡ってくる
  • 上記Enumeratorオブジェクトに対して、eachでブロックを渡す
    • yieldでレスポンス用のオブジェクトを渡すようにしておく
  • メソッドの戻り値としてEnumeratorオブジェクトを返す
    • 実行時にはブロックなしで呼ばれるので、ブロックなしで呼ばれたらObject#enum_forで自メソッドをEnumerator化して返すようにしておく
    • このEnumeratorオブジェクトがクライアント側のスタブの戻り値になる

クライアント側

  • サーバ側がObject#enum_forだったので、サンプル的にEnumerator.newで
sample_client.rb
#!/usr/bin/env ruby

this_dir = File.expand_path(File.dirname(__FILE__))
lib_dir = File.join(this_dir, 'lib')
$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)

require 'grpc'
require 'sample_services_pb'

def main
  requests = Enumerator.new do |yielder|
    loop do
      print "\ninput: "
      message = STDIN.gets.chomp
      next if message.size == 0
      break if %w(quit exit).include?(message)

      yielder << Sample::Request.new(message: message)
    end
  end

  stub = Sample::Echo::Stub.new('localhost:50051', :this_channel_is_insecure)

  responses = stub.talk(requests)
  responses.each do |response|
    puts "\nresponse: #{response.message}"
  end
end

main
  • リクエスト用にスタブのメソッドにEnumeratorオブジェクトを渡す
    • これがサーバ側メソッドの引数になる
  • スタブの戻り値にサーバ側から返したEnumeratorオブジェクトが渡ってくる
    • each実行でサーバ側メソッド(のenum_forでEnumerator化されたもの)が実行される
    • サーバ側メソッド内でeachが実行されることで、リクエスト用に渡したクライアント側のEnumerator(Enumerator.newしたもの)が実行される
    • Enumerator.newのブロック内でEnumerator::Yielder#<<に渡したリクエスト用オブジェクトがサーバ側のeachのブロック引数として渡る
    • サーバ側でyieldに渡されたレスポンス用オブジェクトが、クライアント側のeachのブロック引数として渡ってくる