【Grpc(2)】2つのstub、4つのモード(unary、クライアントstream、サービス側strea)の例

7035 ワード

protobuff定義:
syntax = "proto3";

package com.liyao;

option java_package = "com.liyao.protobuf.test.service";
option java_outer_classname = "MyServiceProto";
option java_multiple_files = true;

message MyRequest {
    repeated uint32 keys = 1;
}

message MyResponse {
    string value = 1;
}

service MyService {
    rpc GetByKey (MyRequest) returns (MyResponse);
    rpc GetByKeyServerStream (MyRequest) returns (stream MyResponse);
    rpc GetByKeyClientStream (stream MyRequest) returns (MyResponse);
    rpc GetByKeyBiStream (stream MyRequest) returns (stream MyResponse);
}

サービスは比較的簡単で、リクエストはintのlistを含んで、対応するkeyを返します.
サービス側実装クラス:
public class MyRpcServiceImpl extends MyServiceGrpc.MyServiceImplBase {

    private final Map map = ImmutableMap.builder()
            .put(1, "v1")
            .put(2, "v2")
            .put(3, "v3")
            .put(4, "v4")
            .put(5, "v5")
            .build();

    @Override
    public void getByKey(MyRequest request, StreamObserver responseObserver) {
        int key = request.getKeys(0);
        String value = map.getOrDefault(key, "null");

        responseObserver.onNext(MyResponse.newBuilder().setValue(value).build());
        responseObserver.onCompleted();
    }

    @Override
    public void getByKeyServerStream(MyRequest request, StreamObserver responseObserver) {
        for (int key : request.getKeysList()) {
            String value = map.getOrDefault(key, "null");
            responseObserver.onNext(MyResponse.newBuilder().setValue(value).build());
        }
        responseObserver.onCompleted();
    }

    @Override
    public StreamObserver getByKeyClientStream(StreamObserver responseObserver) {
        return new StreamObserver() {
            String values = "";
            @Override
            public void onNext(MyRequest myRequest) {
                int key = myRequest.getKeys(0);
                values += map.getOrDefault(key, "null");
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onCompleted() {
                responseObserver.onNext(MyResponse.newBuilder().setValue(values).build());
                responseObserver.onCompleted();
            }
        };
    }

    @Override
    public StreamObserver getByKeyBiStream(StreamObserver responseObserver) {
        return new StreamObserver() {
            @Override
            public void onNext(MyRequest myRequest) {
                int key = myRequest.getKeys(0);
                String value = map.getOrDefault(key, "null");
                responseObserver.onNext(MyResponse.newBuilder().setValue(value).build());
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onCompleted() {
                responseObserver.onCompleted();
            }
        };
    }
}

サービス側起動クラス:
public class RpcServer {
    public static final int port = 8088;

    public static void main( String[] args ) throws IOException, InterruptedException {
        MyRpcServiceImpl service = new MyRpcServiceImpl();
        Server server = io.grpc.ServerBuilder
                .forPort(port)
                .addService(service)
                .build();
        server.start();
        server.awaitTermination();
    }
}

クライアント起動クラス:
public class RpcClient {
    private static ManagedChannel channel = ManagedChannelBuilder
            .forAddress("127.0.0.1", RpcServer.port)
            .usePlaintext()
            .build();

    private static MyServiceGrpc.MyServiceBlockingStub blockingStub = MyServiceGrpc.newBlockingStub(channel);
    private static MyServiceGrpc.MyServiceStub asyncStub = MyServiceGrpc.newStub(channel);

    private static final StreamObserver responseObserver = new StreamObserver() {
        @Override
        public void onNext(MyResponse response) {
            System.out.println("receive: " + response.getValue());
        }

        @Override
        public void onError(Throwable t) {
            System.out.println("error");
        }

        @Override
        public void onCompleted() {
            System.out.println("completed");
        }
    };

    public static void main(String[] args) throws InterruptedException {
        simpleSync();
        simpleAsync();
        serverStreamSync();
        serverStreamAsync();
        clientStream();
        biStream();

        Thread.sleep(100000);
    }

    private static void simpleSync() {
        MyRequest request = MyRequest.newBuilder()
                .addKeys(1)
                .build();
        String value = blockingStub.getByKey(request).getValue();
        System.out.println(value);
    }

    private static void simpleAsync() {
        MyRequest request = MyRequest.newBuilder()
                .addKeys(1)
                .build();
        asyncStub.getByKey(request, responseObserver);
    }

    private static void serverStreamSync() {
        MyRequest request = MyRequest.newBuilder()
                .addKeys(1)
                .addKeys(2)
                .addKeys(3)
                .build();
        Iterator itr = blockingStub.getByKeyServerStream(request);
        while (itr.hasNext()) {
            System.out.println(itr.next());
        }
    }

    private static void serverStreamAsync() {
        MyRequest request = MyRequest.newBuilder()
                .addKeys(1)
                .addKeys(2)
                .addKeys(3)
                .build();
        asyncStub.getByKeyServerStream(request, responseObserver);
    }

    private static void clientStream() {
        StreamObserver requestData = asyncStub.getByKeyClientStream(responseObserver);
        for (int i = 1; i <= 5; i++) {
            requestData.onNext(MyRequest.newBuilder().addKeys(i).build());
        }
        requestData.onCompleted();
    }

    private static void biStream() {
        StreamObserver requestData = asyncStub.getByKeyBiStream(responseObserver);
        for (int i = 1; i <= 5; i++) {
            requestData.onNext(MyRequest.newBuilder().addKeys(i).build());
        }
        requestData.onCompleted();
    }
}

同期stubの場合、unaryおよびサービス側streamのメソッドのみが呼び出されます.非同期stubの場合、任意のメソッドを呼び出すことができます.
unaryおよびサービス側streamの書き方は簡単です.クライアントstreamの場合、リクエストパラメータのobserverを構築する必要があります.
より具体的に見るとhttps://blog.csdn.net/u010900754/article/details/106203724