route_guide exampleを使いまわした gRPC + Python + 非同期の勉強


前提

  • 2018-04-29時点の情報
  • macOS Sierra + Python 3.6.5 + grpcio 1.11.0
  • あんまりよくわかっていません

きっかけ

とりあえずこのチュートリアル自体はやるとして

An asynchronous call to GetFeature is similar, but like calling a local method asynchronously in a thread pool:

そこを知りたいんですけど…… (サンプルコードでは使っていない)

feature_future = stub.GetFeature.future(point)
feature = feature_future.result()

future と書くからには concurrent.futures.Future asyncio.Future のあたりと似ていることくらいはしていると期待できるのでしょう。さぁ調べょ。

※蛇足ですが上の Future は似て非なるまたひとつのPythonの凶器だと思うのです

route_guide_server.py を若干いじる

    def RecordRoute(self, request_iterator, context):
        point_count = 0
        feature_count = 0
        distance = 0.0
        prev_point = None

        print('RecordRoute started on server side')
        start_time = time.time()
        for i, point in enumerate(request_iterator):
            print('RecordRoute {}, {}'.format(i, point))
            point_count += 1
            if get_feature(self.db, point):
                feature_count += 1
            if prev_point:
                distance += get_distance(prev_point, point)
            prev_point = point
            time.sleep(3)

        elapsed_time = time.time() - start_time
        print('RecordRoute finished on server side')
        return route_guide_pb2.RouteSummary(
            point_count=point_count,
            feature_count=feature_count,
            distance=int(distance),
            elapsed_time=int(elapsed_time))

で、サーバは起動しときます

$ python route_guide_server.py

route_guide_client.py のコード断片をターミナルから使う

今回注目するのはprotobufで言う以下の部分

// Interface exported by the server.
service RouteGuide {
// ...
  // A client-to-server streaming RPC.
  //
  // Accepts a stream of Points on a route being traversed, returning a
  // RouteSummary when traversal is completed.
  rpc RecordRoute(stream Point) returns (RouteSummary) {}
// ...
}
>>> import grpc, route_guide_pb2_grpc, route_guide_client, route_guide_resources
>>> stub = route_guide_pb2_grpc.RouteGuideStub(grpc.insecure_channel('localhost:50051'))
>>> route_summary = stub.RecordRoute(route_guide_client.generate_route(route_guide_resources.read_route_guide_database()))
# 本来のサンプルと異なりループに time.sleep(3) を挟んだのでがっつり待たされる
>>> route_summary
point_count: 10
feature_count: 10
distance: 766193
elapsed_time: 30

では、本題のFutureとやら使ってみましょ

>>> stub = route_guide_pb2_grpc.RouteGuideStub(grpc.insecure_channel('localhost:50051'))
>>> route_summary_future = stub.RecordRoute.future(route_guide_client.generate_route(route_guide_resources.read_route_guide_database()))
# 今回はすぐ戻ってくる。非同期ってやつカッ?
>>> route_summary_future
<_Rendezvous object of in-flight RPC>
>>> dir(route_summary_future)
['__abstractmethods__', '__cause__', '__class__', '__context__', '__del__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__iter__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__next__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__setstate__', '__sizeof__', '__str__', '__subclasshook__', '__suppress_context__', '__traceback__', '__weakref__', '_abc_cache', '_abc_negative_cache', '_abc_negative_cache_version', '_abc_registry', '_call', '_deadline', '_next', '_repr', '_response_deserializer', '_state', 'add_callback', 'add_done_callback', 'args', 'cancel', 'cancelled', 'code', 'details', 'done', 'exception', 'initial_metadata', 'is_active', 'next', 'result', 'running', 'time_remaining', 'traceback', 'trailing_metadata', 'with_traceback']
>>> route_summary_future.done()
False
>>> route_summary_future.running()
True
>>> route_summary_future.time_remaining()
(Noneなので出力がない)
>>> import concurrent.futures
>>> isinstance(route_summary_future, concurrent.futures.Future)
False
>>> import asyncio
>>> isinstance(route_summary_future, asyncio.Future)
False
# そうこうしているうちにサーバの処理が終わるので
>>> route_summary_future.done()
True
>>> route_summary_future.code()
<StatusCode.OK: (0, 'ok')>
>>> route_summary_future.result()
point_count: 10
feature_count: 10
distance: 649745
elapsed_time: 30

上記に書かれている通りで実際には Future ではなく _Rendezvous (らんでぶー) オブジェクトと主張されるので、 Future そのものというより挙動を似せたダックタイピング的なノリに見えます。別にis-a関係使いませんのでどうでも良いですがmypyとの相性が良いかは怪しい。

このふたつの関係はどっかの同人誌に書いたので略。

#追記: https://grpc.io/grpc/python/grpc.html#grpc.Future

time_remaining() は「終了予定時間」ではなくて「タイムアウト」に関する情報のようです(あるなら)

time_remaining() method of grpc._channel._Rendezvous instance
    Describes the length of allowed time remaining for the RPC.

    Returns:
      A nonnegative float indicating the length of allowed time in seconds
      remaining for the RPC to complete before it is considered to have
      timed out, or None if no deadline was specified for the RPC

add_callback() の存在意義はちょっと分かりませんが、 add_done_callback() は引数としてFuture (likeなオブジェクト)を受け取る関数を引数に取ることもあって、本来の Future とも用途が類似しているので、基本的には非同期コールバックはこれを使えば良さそうという気がします。ので、試す。

>>> route_summary_future = stub.RecordRoute.future(route_guide_client.generate_route(route_guide_resources.read_route_guide_database()))
>>> route_summary_future.add_done_callback(lambda fut: print(fut.result()))
>>> (プロンプトはすぐ返ってくる。そして非同期に)
point_count: 10
feature_count: 10
distance: 829044
elapsed_time: 30

ちなみに route_summary_future.add_done_callback() を記述した時点で終了してるとすぐ実行されるようです。イージーミスのイベント取りこぼしはなさそう?

#追記: https://grpc.io/grpc/python/grpc.html#grpc.RpcContext add_callback()はこちら派生でした

少しだけですが、エラー処理のあたりも見ておきます。サーバの処理が完了する前にクライアント側から cancel() してみます。

>>> route_summary_future = stub.RecordRoute.future(route_guide_client.generate_route(route_guide_resources.read_route_guide_database()))
>>> routroute_summary_future.add_done_callback(lambda fut: print(fut.code(), fut.cancelled()))
>>> route_summary_future.cancel()
False
StatusCode.CANCELLED True   <--- これは非同期に出力される

上記の文脈でコールバックで、状態を確認もせずに fut.result() を呼ぶとgrpc.FutureCancelledError例外が飛びます。

非同期処理中にサーバをCtrl-Cで止めてみます

>>> route_summary_future = stub.RecordRoute.future(route_guide_client.generate_route(route_guide_resources.read_route_guide_database()))
>>> route_summary_future.add.add_done_callback(lambda fut: print(fut.code(), fut.cancelled(), fut.done(), fut.details()))
>>> StatusCode.INTERNAL False True Received RST_STREAM with error code 2   <--- 非同期で出力される

StatusCode.INTERNAL では意味が分かりませんが、以下のような感じのようです

cancelled() が False なのでコールバック時にどのような状態にあるかは丁寧に見てあげる必要がありそうですが、 fut.details() とか色々揃ってるので必要な情報が揃わない、ということはこの調査の範囲では思いつきません。

細かい点はこれ以上ここでは追いません。とりあえずの結論として、実行の完了、明示的な中断、接続断等による失敗等、ささっと思いつくケースは割と簡単に処理出来るようで安心しました。

良くわからない点として、例えば同じprotobufにある

  // A Bidirectional streaming RPC.
  //
  // Accepts a stream of RouteNotes sent while a route is being traversed,
  // while receiving other RouteNotes (e.g. from other users).
  rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

特に returns で stream としている場合、 Future っぽいオブジェクトを取って非同期にやり合うということが出来るかよく分かりませんでした。素直に同期でやるか、Thread かなにかを使うんでしょうかね。asyncio使える気ははっきり言ってしませんが(使うメリットも無い気がしますが)、使えたら教えてくりゃれ?

まとめ

素直な良い子でした。