grpcエラー処理

6624 ワード

rpc呼び出し異常処理は考慮すべき一態様であり,grpcにおける異常に対する処理を紹介する.注:ここの例はすべて前のインタフェースを改造したものです.
単純
単純モードはrequestがresponseに対応するものです.
//   
public void simpleHello(ProtoObj.Person request,
                  StreamObserver responseObserver) {
    System.out.println(request.getMyName()+" calling");
    //       Exception Status       ,      Throwable,           
    responseObserver.onError(Status.INTERNAL.withDescription("error desc").asRuntimeException());
    //     onError   complete    complete
    //responseObserver.onCompleted();
}
//   
@Test
public void  simple() throws InterruptedException {
    final ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 8080).usePlaintext(true).build();
    HelloServiceGrpc.HelloServiceBlockingStub blockingStub = HelloServiceGrpc.newBlockingStub(channel);

    ProtoObj.Person person = ProtoObj.Person.newBuilder().setMyName("World").build();
    //simple
    System.out.println("---simple rpc---");
    try {
        //try catch    
        System.out.println(blockingStub.simpleHello(person).getString());
    }catch(Exception e){
        //      status           
        Status status = Status.fromThrowable(e);
        status.asException().printStackTrace();
    }
    channel.shutdown();
}
//  
---simple rpc---
io.grpc.StatusException: INTERNAL: error desc
    at io.grpc.Status.asException(Status.java:548)
    at blog.HelloClient.simple(HelloClient.java:100)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    ...

クライアントフロー
クライアントフロー式は、複数のrequestが単一responseに対応するものである.
//   
@Override
public StreamObserver clientStreamHello(
       final StreamObserver responseObserver) {
   return new StreamObserver(){
       private ProtoObj.Result.Builder builder=ProtoObj.Result.newBuilder();
       @Override
       public void onNext(ProtoObj.Person value) {
           builder.setString(builder.getString() +"," + value.getMyName());
       }

       @Override
       public void onError(Throwable t) {
           responseObserver.onError(new Exception("custom error"));
       }

       @Override
       public void onCompleted() {
           builder.setString("hello"+builder.getString());
           //    
           responseObserver.onError(Status.INTERNAL.withDescription("error desc").asRuntimeException());
           //responseObserver.onNext(builder.build());
           //responseObserver.onCompleted();
       }
   };
}
//   
@Test
public void client() throws InterruptedException {

    final ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 8080).usePlaintext(true).build();
    HelloServiceGrpc.HelloServiceStub asyncStub = HelloServiceGrpc.newStub(channel);

    ProtoObj.Person person = ProtoObj.Person.newBuilder().setMyName("World").build();
    //  latch       
    final CountDownLatch latch = new CountDownLatch(1);

    //client side
    System.out.println("---client stream rpc---");
    StreamObserver responseObserver = new StreamObserver() {
        @Override
        public void onNext(ProtoObj.Result result) {
            System.out.println("client stream--" + result.getString());
        }

        @Override
        public void onError(Throwable t) {
            latch.countDown();
            //    
            Status status = Status.fromThrowable(t);
            status.asException().printStackTrace();
        }

        @Override
        public void onCompleted() {
            latch.countDown();
        }
    };
    //    
    StreamObserver clientStreamObserver = asyncStub.clientStreamHello(responseObserver);
    clientStreamObserver.onNext(ProtoObj.Person.newBuilder().setMyName("World").build());
    clientStreamObserver.onNext(ProtoObj.Person.newBuilder().setMyName("World2").build());
    clientStreamObserver.onCompleted();
    //  latch        
    if (!Uninterruptibles.awaitUninterruptibly(latch, 1, TimeUnit.SECONDS)) {
        throw new RuntimeException("timeout!");
    }
    channel.shutdown();

}
//  
---client stream rpc---
io.grpc.StatusException: INTERNAL: error desc
    at io.grpc.Status.asException(Status.java:548)
    at blog.HelloClient$3.onError(HelloClient.java:148)
    at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:392)

サービスたんりゅうしき
サービス側フロー式は、単一requestが複数のresponseに対応するものである.
//   
@Override
public void serverStreamHello(ProtoObj.Person request,
                        StreamObserver responseObserver) {
    System.out.println(request.getMyName()+" calling");
    responseObserver.onNext(ProtoObj.Result.newBuilder().setString("hello, "+request.getMyName()).build());
    responseObserver.onError(Status.INTERNAL.withDescription("error desc").asRuntimeException());
    //error         next 
    //responseObserver.onNext(ProtoObj.Result.newBuilder().setString("hello3, "+request.getMyName()).build());
    //responseObserver.onCompleted();
}
//   
    @Test
    public void server(){
        final ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 8080).usePlaintext(true).build();
        HelloServiceGrpc.HelloServiceBlockingStub blockingStub = HelloServiceGrpc.newBlockingStub(channel);

        ProtoObj.Person person = ProtoObj.Person.newBuilder().setMyName("World").build();

        //server side
        System.out.println("---server stream rpc---");
        Iterator it = blockingStub.serverStreamHello(person);
        try{
            while (it.hasNext()) {
                System.out.print(it.next());
            }
        }catch (Exception e){
            Status status = Status.fromThrowable(e);
            status.asException().printStackTrace();
        }
        channel.shutdown();
    }
//  
---server stream rpc---
string: "hello, World"
io.grpc.StatusException: INTERNAL: error desc
    at io.grpc.Status.asException(Status.java:548)
    at blog.HelloClient.server(HelloClient.java:122)
    ...

にほうこうりゅうしき
双方向ストリームは、複数のrequestが複数のresponseに対応するものであり、例外処理はクライアントストリームと類似している(非同期newStubを使用する).ここでは後述しない.
転載先:https://www.cnblogs.com/resentment/p/6883153.html