【Grpc(2)】2つのstub、4つのモード(unary、クライアントstream、サービス側strea)の例
7035 ワード
protobuff定義:
サービスは比較的簡単で、リクエストはintのlistを含んで、対応するkeyを返します.
サービス側実装クラス:
サービス側起動クラス:
クライアント起動クラス:
同期stubの場合、unaryおよびサービス側streamのメソッドのみが呼び出されます.非同期stubの場合、任意のメソッドを呼び出すことができます.
unaryおよびサービス側streamの書き方は簡単です.クライアントstreamの場合、リクエストパラメータのobserverを構築する必要があります.
より具体的に見るとhttps://blog.csdn.net/u010900754/article/details/106203724
syntax = "proto3";
package com.liyao;
option java_package = "com.liyao.protobuf.test.service";
option java_outer_classname = "MyServiceProto";
option java_multiple_files = true;
message MyRequest {
repeated uint32 keys = 1;
}
message MyResponse {
string value = 1;
}
service MyService {
rpc GetByKey (MyRequest) returns (MyResponse);
rpc GetByKeyServerStream (MyRequest) returns (stream MyResponse);
rpc GetByKeyClientStream (stream MyRequest) returns (MyResponse);
rpc GetByKeyBiStream (stream MyRequest) returns (stream MyResponse);
}
サービスは比較的簡単で、リクエストはintのlistを含んで、対応するkeyを返します.
サービス側実装クラス:
public class MyRpcServiceImpl extends MyServiceGrpc.MyServiceImplBase {
private final Map map = ImmutableMap.builder()
.put(1, "v1")
.put(2, "v2")
.put(3, "v3")
.put(4, "v4")
.put(5, "v5")
.build();
@Override
public void getByKey(MyRequest request, StreamObserver responseObserver) {
int key = request.getKeys(0);
String value = map.getOrDefault(key, "null");
responseObserver.onNext(MyResponse.newBuilder().setValue(value).build());
responseObserver.onCompleted();
}
@Override
public void getByKeyServerStream(MyRequest request, StreamObserver responseObserver) {
for (int key : request.getKeysList()) {
String value = map.getOrDefault(key, "null");
responseObserver.onNext(MyResponse.newBuilder().setValue(value).build());
}
responseObserver.onCompleted();
}
@Override
public StreamObserver getByKeyClientStream(StreamObserver responseObserver) {
return new StreamObserver() {
String values = "";
@Override
public void onNext(MyRequest myRequest) {
int key = myRequest.getKeys(0);
values += map.getOrDefault(key, "null");
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
responseObserver.onNext(MyResponse.newBuilder().setValue(values).build());
responseObserver.onCompleted();
}
};
}
@Override
public StreamObserver getByKeyBiStream(StreamObserver responseObserver) {
return new StreamObserver() {
@Override
public void onNext(MyRequest myRequest) {
int key = myRequest.getKeys(0);
String value = map.getOrDefault(key, "null");
responseObserver.onNext(MyResponse.newBuilder().setValue(value).build());
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
}
サービス側起動クラス:
public class RpcServer {
public static final int port = 8088;
public static void main( String[] args ) throws IOException, InterruptedException {
MyRpcServiceImpl service = new MyRpcServiceImpl();
Server server = io.grpc.ServerBuilder
.forPort(port)
.addService(service)
.build();
server.start();
server.awaitTermination();
}
}
クライアント起動クラス:
public class RpcClient {
private static ManagedChannel channel = ManagedChannelBuilder
.forAddress("127.0.0.1", RpcServer.port)
.usePlaintext()
.build();
private static MyServiceGrpc.MyServiceBlockingStub blockingStub = MyServiceGrpc.newBlockingStub(channel);
private static MyServiceGrpc.MyServiceStub asyncStub = MyServiceGrpc.newStub(channel);
private static final StreamObserver responseObserver = new StreamObserver() {
@Override
public void onNext(MyResponse response) {
System.out.println("receive: " + response.getValue());
}
@Override
public void onError(Throwable t) {
System.out.println("error");
}
@Override
public void onCompleted() {
System.out.println("completed");
}
};
public static void main(String[] args) throws InterruptedException {
simpleSync();
simpleAsync();
serverStreamSync();
serverStreamAsync();
clientStream();
biStream();
Thread.sleep(100000);
}
private static void simpleSync() {
MyRequest request = MyRequest.newBuilder()
.addKeys(1)
.build();
String value = blockingStub.getByKey(request).getValue();
System.out.println(value);
}
private static void simpleAsync() {
MyRequest request = MyRequest.newBuilder()
.addKeys(1)
.build();
asyncStub.getByKey(request, responseObserver);
}
private static void serverStreamSync() {
MyRequest request = MyRequest.newBuilder()
.addKeys(1)
.addKeys(2)
.addKeys(3)
.build();
Iterator itr = blockingStub.getByKeyServerStream(request);
while (itr.hasNext()) {
System.out.println(itr.next());
}
}
private static void serverStreamAsync() {
MyRequest request = MyRequest.newBuilder()
.addKeys(1)
.addKeys(2)
.addKeys(3)
.build();
asyncStub.getByKeyServerStream(request, responseObserver);
}
private static void clientStream() {
StreamObserver requestData = asyncStub.getByKeyClientStream(responseObserver);
for (int i = 1; i <= 5; i++) {
requestData.onNext(MyRequest.newBuilder().addKeys(i).build());
}
requestData.onCompleted();
}
private static void biStream() {
StreamObserver requestData = asyncStub.getByKeyBiStream(responseObserver);
for (int i = 1; i <= 5; i++) {
requestData.onNext(MyRequest.newBuilder().addKeys(i).build());
}
requestData.onCompleted();
}
}
同期stubの場合、unaryおよびサービス側streamのメソッドのみが呼び出されます.非同期stubの場合、任意のメソッドを呼び出すことができます.
unaryおよびサービス側streamの書き方は簡単です.クライアントstreamの場合、リクエストパラメータのobserverを構築する必要があります.
より具体的に見るとhttps://blog.csdn.net/u010900754/article/details/106203724