grpc java io通信モデル

31286 ワード

1 gRPCサービスのgRPCを呼び出す通信プロトコルは、標準的なHTTP/2設計に基づいており、主に2つのRPC呼び出し方式を提供している。
1   一般的なRPC呼び出し方式は、要求応答モードである。
2  HTTP/2.0のstreamming呼び出し方式に基づいています。
1.1一般RPC呼び出しの一般的なRPC呼び出しは、3つの実施形態を提供する。
1  同期ブロッキングサービスの呼び出しは、通常の実装クラスはxxBlocking Stubである(proto定義に基づいて生成される)。
2  非同期非ブロッキングコールは、Future-Leistenser機構に基づいて、通常の実装クラスはxxFutureStubである。
3  非同期非ブロッキングコールは、Reactive(Async)の応答式プログラミングモードに基づいており、通常の実装クラスはxxStubである。
一般rpc demoは以下の通りである。
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.ylifegroup.protobuf.PhoneServiceGrpc;
import com.ylifegroup.protobuf.Phonebook.AddPhoneToUserRequest;
import com.ylifegroup.protobuf.Phonebook.AddPhoneToUserResponse;
import com.ylifegroup.protobuf.Phonebook.PhoneType;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;

/**
 * @describe GRpcClient Block demo
 * @author zhikai.chen
 * @date 2018 5 7    4:00:58
 */
public class GRpcClientBlock {
	
	private static final Logger logger = LoggerFactory.getLogger(GRpcClientBlock.class);

    private final ManagedChannel channel;

    private final PhoneServiceGrpc.PhoneServiceBlockingStub blockingStub;

    /** Construct client connecting to gRPC server at {@code host:port}. */
    public GRpcClientBlock(String host, int port) {
        ManagedChannelBuilder> channelBuilder = ManagedChannelBuilder.forAddress(host, port).usePlaintext();
        channel = channelBuilder.build();
        blockingStub = PhoneServiceGrpc.newBlockingStub(channel);
    }

    public void shutdown() throws InterruptedException {
        channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
    }

    /** add phone to user. */
    public void addPhoneToUser(int uid, PhoneType phoneType, String phoneNubmer) {
        logger.info("Will try to add phone to user " + uid);
        AddPhoneToUserRequest request = AddPhoneToUserRequest.newBuilder().setUid(uid).setPhoneType(phoneType)
                .setPhoneNumber(phoneNubmer).build();
        AddPhoneToUserResponse response;
        try {
            response = blockingStub.addPhoneToUser(request);
        } catch (StatusRuntimeException e) {
            logger.warn("RPC failed: {0} --> "+e.getLocalizedMessage(), e.getStatus());
            return;
        }
        logger.info("Result: " + response.getResult());
    }

    public static void main(String[] args) throws Exception {
        GRpcClientBlock client = new GRpcClientBlock("localhost", 50051);
        try {
            client.addPhoneToUser(1, PhoneType.WORK, "13888888888");
        } finally {
            client.shutdown();
        }
    }

}
Futureモデルのdemoは以下の通りです。
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import javax.annotation.Nullable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.ylifegroup.protobuf.PhoneServiceGrpc;
import com.ylifegroup.protobuf.Phonebook.AddPhoneToUserRequest;
import com.ylifegroup.protobuf.Phonebook.AddPhoneToUserResponse;
import com.ylifegroup.protobuf.Phonebook.PhoneType;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

/**
 * @describe GRpcClient Future demo
 * @author zhikai.chen
 * @date 2018 5 7    4:00:58
 */
public class GRpcClientFuture {
	
	private static final Logger logger = LoggerFactory.getLogger(GRpcClientFuture.class);

    private final ManagedChannel channel;

    private final PhoneServiceGrpc.PhoneServiceFutureStub futureStub;

    /** Construct client connecting to gRPC server at {@code host:port}. */
    public GRpcClientFuture(String host, int port) {
        ManagedChannelBuilder> channelBuilder = ManagedChannelBuilder.forAddress(host, port).usePlaintext();
        channel = channelBuilder.build();
        futureStub = PhoneServiceGrpc.newFutureStub(channel);
    }

    public void shutdown() throws InterruptedException {
        channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
    }

    /** add phone to user. */
    public void addPhoneToUserFuture1(int uid, PhoneType phoneType, String phoneNubmer) {
        logger.info("Will try to add phone to user " + uid);
        AddPhoneToUserRequest request = AddPhoneToUserRequest.newBuilder().setUid(uid).setPhoneType(phoneType)
        		.setPhoneNumber(phoneNubmer).build();

        try {
        	com.google.common.util.concurrent.ListenableFuture
        	listenableFuture = futureStub.addPhoneToUser(request);
        	Futures.addCallback(listenableFuture, new FutureCallback() {
        		@Override
        		public void onSuccess(@Nullable AddPhoneToUserResponse result) {
        			logger.info("result: " + result.getResult());
        		}
        		@Override
        		public void onFailure(Throwable t) {
        			logger.warn(t.getMessage());
        		}
        	});
        } catch (Exception e) {
        	logger.warn("RPC failed: {0}", e);
        	return;
        }
    }
    
    /** add phone to user. */
    public void addPhoneToUserFuture2(int uid, PhoneType phoneType, String phoneNubmer) {
        logger.info("Will try to add phone to user " + uid);
        AddPhoneToUserRequest request = AddPhoneToUserRequest.newBuilder().setUid(uid).setPhoneType(phoneType)
        		.setPhoneNumber(phoneNubmer).build();

        try {
        	com.google.common.util.concurrent.ListenableFuture
        	listenableFuture = futureStub.addPhoneToUser(request);
        	listenableFuture.addListener(()->
        	{
        		try {
        			AddPhoneToUserResponse response = listenableFuture.get();
        			logger.info("result: " + response.getResult());
        		}
        		catch(Exception e)
        		{
        			e.printStackTrace();
        		}
        	}, Executors.newFixedThreadPool(1));
        } catch (Exception e) {
        	logger.warn("RPC failed: {0}", e);
        	return;
        }

    }

    public static void main(String[] args) throws Exception {
        GRpcClientFuture client = new GRpcClientFuture("localhost", 50051);
        try {
            client.addPhoneToUserFuture1(1, PhoneType.WORK, "13888888888");
            client.addPhoneToUserFuture2(2, PhoneType.WORK, "13888888888");
            TimeUnit.SECONDS.sleep(3);
            //TODO             
            Runtime.getRuntime().exit(0);
        } finally {
            client.shutdown();
        }
    }

}
Reactiveモデルのdemoは以下の通りです。
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.ylifegroup.protobuf.PhoneServiceGrpc;
import com.ylifegroup.protobuf.Phonebook.AddPhoneToUserRequest;
import com.ylifegroup.protobuf.Phonebook.AddPhoneToUserResponse;
import com.ylifegroup.protobuf.Phonebook.PhoneType;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

/**
 * @describe GRpcClient Async demo
 * @author zhikai.chen
 * @date 2018 5 7    4:00:58
 */
public class GRpcClientAsync {
	
    private static final Logger logger = LoggerFactory.getLogger(GRpcClientAsync.class);

    private final ManagedChannel channel;

    private final PhoneServiceGrpc.PhoneServiceStub stub;

    /** Construct client connecting to gRPC server at {@code host:port}. */
    public GRpcClientAsync(String host, int port) {
        ManagedChannelBuilder> channelBuilder = ManagedChannelBuilder.forAddress(host, port).usePlaintext();
        channel = channelBuilder.build();
        stub = PhoneServiceGrpc.newStub(channel);
    }

    public void shutdown() throws InterruptedException {
        channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
    }

    /** add phone to user. */
    public void addPhoneToUserAsync(int uid, PhoneType phoneType, String phoneNubmer) {
        logger.info("Will try to add phone to user " + uid);
        AddPhoneToUserRequest request = AddPhoneToUserRequest.newBuilder().setUid(uid).setPhoneType(phoneType)
                .setPhoneNumber(phoneNubmer).build();
        
        io.grpc.stub.StreamObserver responseObserver =
        		new io.grpc.stub.StreamObserver()
        {
        	public  void onNext(AddPhoneToUserResponse response)
        	{
        		logger.info("Result: " + response.getResult());
        	}
        	public void onError(Throwable t){
        		logger.warn(t.getMessage());
        	}
        	public void onCompleted(){}
        };
        stub.addPhoneToUser(request,responseObserver);
    }

    public static void main(String[] args) throws Exception {
        GRpcClientAsync client = new GRpcClientAsync("localhost", 50051);
        try {
            client.addPhoneToUserAsync(1, PhoneType.WORK, "13888888888");
            TimeUnit.SECONDS.sleep(5);
        } finally {
            client.shutdown();
        }
    }

}
1.2 Streamingモードサービス呼び出しはHTTP/2.0に基づいて、gRPCは三つのstreamingモードを提供しています。
1  サービスエンドstreamming
2  クライアントストリーム
3  サービスとクライアントの双方向streamming
サービスエンドのstreaming demoは以下の通りです。
import static java.lang.Math.atan2;
import static java.lang.Math.cos;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.lang.Math.sin;
import static java.lang.Math.sqrt;
import static java.lang.Math.toRadians;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.examples.routeguide.Feature;
import io.grpc.examples.routeguide.Point;
import io.grpc.examples.routeguide.Rectangle;
import io.grpc.examples.routeguide.RouteGuideGrpc;
import io.grpc.examples.routeguide.RouteNote;
import io.grpc.examples.routeguide.RouteSummary;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * A sample gRPC server that serve the RouteGuide (see route_guide.proto) service.
 */
public class RouteGuideServer {
  private static final Logger logger = Logger.getLogger(RouteGuideServer.class.getName());

  private final int port;
  private final Server server;

  public RouteGuideServer(int port) throws IOException {
    this(port, RouteGuideUtil.getDefaultFeaturesFile());
  }

  /** Create a RouteGuide server listening on {@code port} using {@code featureFile} database. */
  public RouteGuideServer(int port, URL featureFile) throws IOException {
    this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
  }

  /** Create a RouteGuide server using serverBuilder as a base and features as data. */
  public RouteGuideServer(ServerBuilder> serverBuilder, int port, Collection features) {
    this.port = port;
    server = serverBuilder.addService(new RouteGuideService(features))
        .build();
  }

  /** Start serving requests. */
  public void start() throws IOException {
    server.start();
    logger.info("Server started, listening on " + port);
    Runtime.getRuntime().addShutdownHook(new Thread() {
      @Override
      public void run() {
        // Use stderr here since the logger may has been reset by its JVM shutdown hook.
        System.err.println("*** shutting down gRPC server since JVM is shutting down");
        RouteGuideServer.this.stop();
        System.err.println("*** server shut down");
      }
    });
  }

  /** Stop serving requests and shutdown resources. */
  public void stop() {
    if (server != null) {
      server.shutdown();
    }
  }

  /**
   * Await termination on the main thread since the grpc library uses daemon threads.
   */
  private void blockUntilShutdown() throws InterruptedException {
    if (server != null) {
      server.awaitTermination();
    }
  }

  /**
   * Main method.  This comment makes the linter happy.
   */
  public static void main(String[] args) throws Exception {
    RouteGuideServer server = new RouteGuideServer(8980);
    server.start();
    server.blockUntilShutdown();
  }

  /**
   * Our implementation of RouteGuide service.
   *
   * 

See route_guide.proto for details of the methods. */ private static class RouteGuideService extends RouteGuideGrpc.RouteGuideImplBase { private final Collection features; private final ConcurrentMap> routeNotes = new ConcurrentHashMap>(); RouteGuideService(Collection features) { this.features = features; } /** * Gets the {@link Feature} at the requested {@link Point}. If no feature at that location * exists, an unnamed feature is returned at the provided location. * * @param request the requested location for the feature. * @param responseObserver the observer that will receive the feature at the requested point. */ @Override public void getFeature(Point request, StreamObserver responseObserver) { responseObserver.onNext(checkFeature(request)); responseObserver.onCompleted(); } /** * Gets all features contained within the given bounding {@link Rectangle}. * * @param request the bounding rectangle for the requested features. * @param responseObserver the observer that will receive the features. */ @Override public void listFeatures(Rectangle request, StreamObserver responseObserver) { int left = min(request.getLo().getLongitude(), request.getHi().getLongitude()); int right = max(request.getLo().getLongitude(), request.getHi().getLongitude()); int top = max(request.getLo().getLatitude(), request.getHi().getLatitude()); int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude()); for (Feature feature : features) { if (!RouteGuideUtil.exists(feature)) { continue; } int lat = feature.getLocation().getLatitude(); int lon = feature.getLocation().getLongitude(); if (lon >= left && lon <= right && lat >= bottom && lat <= top) { responseObserver.onNext(feature); } } responseObserver.onCompleted(); } /** * Gets a stream of points, and responds with statistics about the "trip": number of points, * number of known features visited, total distance traveled, and total time spent. * * @param responseObserver an observer to receive the response summary. * @return an observer to receive the requested route points. */ @Override public StreamObserver recordRoute(final StreamObserver responseObserver) { return new StreamObserver() { int pointCount; int featureCount; int distance; Point previous; final long startTime = System.nanoTime(); @Override public void onNext(Point point) { pointCount++; if (RouteGuideUtil.exists(checkFeature(point))) { featureCount++; } // For each point after the first, add the incremental distance from the previous point to // the total distance value. if (previous != null) { distance += calcDistance(previous, point); } previous = point; } @Override public void onError(Throwable t) { logger.log(Level.WARNING, "recordRoute cancelled"); } @Override public void onCompleted() { long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime); responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount) .setFeatureCount(featureCount).setDistance(distance) .setElapsedTime((int) seconds).build()); responseObserver.onCompleted(); } }; } /** * Receives a stream of message/location pairs, and responds with a stream of all previous * messages at each of those locations. * * @param responseObserver an observer to receive the stream of previous messages. * @return an observer to handle requested message/location pairs. */ @Override public StreamObserver routeChat(final StreamObserver responseObserver) { return new StreamObserver() { @Override public void onNext(RouteNote note) { List notes = getOrCreateNotes(note.getLocation()); // Respond with all previous notes at this location. for (RouteNote prevNote : notes.toArray(new RouteNote[0])) { responseObserver.onNext(prevNote); } // Now add the new note to the list notes.add(note); } @Override public void onError(Throwable t) { logger.log(Level.WARNING, "routeChat cancelled"); } @Override public void onCompleted() { responseObserver.onCompleted(); } }; } /** * Get the notes list for the given location. If missing, create it. */ private List getOrCreateNotes(Point location) { List notes = Collections.synchronizedList(new ArrayList()); List prevNotes = routeNotes.putIfAbsent(location, notes); return prevNotes != null ? prevNotes : notes; } /** * Gets the feature at the given point. * * @param location the location to check. * @return The feature object at the point. Note that an empty name indicates no feature. */ private Feature checkFeature(Point location) { for (Feature feature : features) { if (feature.getLocation().getLatitude() == location.getLatitude() && feature.getLocation().getLongitude() == location.getLongitude()) { return feature; } } // No feature was found, return an unnamed feature. return Feature.newBuilder().setName("").setLocation(location).build(); } /** * Calculate the distance between two points using the "haversine" formula. * The formula is based on http://mathforum.org/library/drmath/view/51879.html. * * @param start The starting point * @param end The end point * @return The distance between the points in meters */ private static int calcDistance(Point start, Point end) { int r = 6371000; // earth radius in meters double lat1 = toRadians(RouteGuideUtil.getLatitude(start)); double lat2 = toRadians(RouteGuideUtil.getLatitude(end)); double lon1 = toRadians(RouteGuideUtil.getLongitude(start)); double lon2 = toRadians(RouteGuideUtil.getLongitude(end)); double deltaLat = lat2 - lat1; double deltaLon = lon2 - lon1; double a = sin(deltaLat / 2) * sin(deltaLat / 2) + cos(lat1) * cos(lat2) * sin(deltaLon / 2) * sin(deltaLon / 2); double c = 2 * atan2(sqrt(a), sqrt(1 - a)); return (int) (r * c); } } }

クライアントのstreamming demoは以下の通りです。
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Message;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.examples.routeguide.Feature;
import io.grpc.examples.routeguide.Point;
import io.grpc.examples.routeguide.Rectangle;
import io.grpc.examples.routeguide.RouteGuideGrpc;
import io.grpc.examples.routeguide.RouteGuideGrpc.RouteGuideBlockingStub;
import io.grpc.examples.routeguide.RouteGuideGrpc.RouteGuideStub;
import io.grpc.examples.routeguide.RouteNote;
import io.grpc.examples.routeguide.RouteSummary;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * Sample client code that makes gRPC calls to the server.
 */
public class RouteGuideClient {
  private static final Logger logger = Logger.getLogger(RouteGuideClient.class.getName());

  private final ManagedChannel channel;
  private final RouteGuideBlockingStub blockingStub;
  private final RouteGuideStub asyncStub;

  private Random random = new Random();
  private TestHelper testHelper;

  /** Construct client for accessing RouteGuide server at {@code host:port}. */
  public RouteGuideClient(String host, int port) {
    this(ManagedChannelBuilder.forAddress(host, port).usePlaintext());
  }

  /** Construct client for accessing RouteGuide server using the existing channel. */
  public RouteGuideClient(ManagedChannelBuilder> channelBuilder) {
    channel = channelBuilder.build();
    blockingStub = RouteGuideGrpc.newBlockingStub(channel);
    asyncStub = RouteGuideGrpc.newStub(channel);
  }

  public void shutdown() throws InterruptedException {
    channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
  }

  /**
   * Blocking unary call example.  Calls getFeature and prints the response.
   */
  public void getFeature(int lat, int lon) {
    info("*** GetFeature: lat={0} lon={1}", lat, lon);

    Point request = Point.newBuilder().setLatitude(lat).setLongitude(lon).build();

    Feature feature;
    try {
      feature = blockingStub.getFeature(request);
      if (testHelper != null) {
        testHelper.onMessage(feature);
      }
    } catch (StatusRuntimeException e) {
      warning("RPC failed: {0}", e.getStatus());
      if (testHelper != null) {
        testHelper.onRpcError(e);
      }
      return;
    }
    if (RouteGuideUtil.exists(feature)) {
      info("Found feature called \"{0}\" at {1}, {2}",
          feature.getName(),
          RouteGuideUtil.getLatitude(feature.getLocation()),
          RouteGuideUtil.getLongitude(feature.getLocation()));
    } else {
      info("Found no feature at {0}, {1}",
          RouteGuideUtil.getLatitude(feature.getLocation()),
          RouteGuideUtil.getLongitude(feature.getLocation()));
    }
  }

  /**
   * Blocking server-streaming example. Calls listFeatures with a rectangle of interest. Prints each
   * response feature as it arrives.
   */
  public void listFeatures(int lowLat, int lowLon, int hiLat, int hiLon) {
    info("*** ListFeatures: lowLat={0} lowLon={1} hiLat={2} hiLon={3}", lowLat, lowLon, hiLat,
        hiLon);

    Rectangle request =
        Rectangle.newBuilder()
            .setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
            .setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();
    Iterator features;
    try {
      features = blockingStub.listFeatures(request);
      for (int i = 1; features.hasNext(); i++) {
        Feature feature = features.next();
        info("Result #" + i + ": {0}", feature);
        if (testHelper != null) {
          testHelper.onMessage(feature);
        }
      }
    } catch (StatusRuntimeException e) {
      warning("RPC failed: {0}", e.getStatus());
      if (testHelper != null) {
        testHelper.onRpcError(e);
      }
    }
  }

  /**
   * Async client-streaming example. Sends {@code numPoints} randomly chosen points from {@code
   * features} with a variable delay in between. Prints the statistics when they are sent from the
   * server.
   */
  public void recordRoute(List features, int numPoints) throws InterruptedException {
    info("*** RecordRoute");
    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver responseObserver = new StreamObserver() {
      @Override
      public void onNext(RouteSummary summary) {
        info("Finished trip with {0} points. Passed {1} features. "
            + "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
            summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
        if (testHelper != null) {
          testHelper.onMessage(summary);
        }
      }

      @Override
      public void onError(Throwable t) {
        warning("RecordRoute Failed: {0}", Status.fromThrowable(t));
        if (testHelper != null) {
          testHelper.onRpcError(t);
        }
        finishLatch.countDown();
      }

      @Override
      public void onCompleted() {
        info("Finished RecordRoute");
        finishLatch.countDown();
      }
    };

    StreamObserver requestObserver = asyncStub.recordRoute(responseObserver);
    try {
      // Send numPoints points randomly selected from the features list.
      for (int i = 0; i < numPoints; ++i) {
        int index = random.nextInt(features.size());
        Point point = features.get(index).getLocation();
        info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
            RouteGuideUtil.getLongitude(point));
        requestObserver.onNext(point);
        // Sleep for a bit before sending the next one.
        Thread.sleep(random.nextInt(1000) + 500);
        if (finishLatch.getCount() == 0) {
          // RPC completed or errored before we finished sending.
          // Sending further requests won't error, but they will just be thrown away.
          return;
        }
      }
    } catch (RuntimeException e) {
      // Cancel RPC
      requestObserver.onError(e);
      throw e;
    }
    // Mark the end of requests
    requestObserver.onCompleted();

    // Receiving happens asynchronously
    if (!finishLatch.await(1, TimeUnit.MINUTES)) {
      warning("recordRoute can not finish within 1 minutes");
    }
  }

  /**
   * Bi-directional example, which can only be asynchronous. Send some chat messages, and print any
   * chat messages that are sent from the server.
   */
  public CountDownLatch routeChat() {
    info("*** RouteChat");
    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver requestObserver =
        asyncStub.routeChat(new StreamObserver() {
          @Override
          public void onNext(RouteNote note) {
            info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
                .getLatitude(), note.getLocation().getLongitude());
            if (testHelper != null) {
              testHelper.onMessage(note);
            }
          }

          @Override
          public void onError(Throwable t) {
            warning("RouteChat Failed: {0}", Status.fromThrowable(t));
            if (testHelper != null) {
              testHelper.onRpcError(t);
            }
            finishLatch.countDown();
          }

          @Override
          public void onCompleted() {
            info("Finished RouteChat");
            finishLatch.countDown();
          }
        });

    try {
      RouteNote[] requests =
          {newNote("First message", 0, 0), newNote("Second message", 0, 1),
              newNote("Third message", 1, 0), newNote("Fourth message", 1, 1)};

      for (RouteNote request : requests) {
        info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
            .getLatitude(), request.getLocation().getLongitude());
        requestObserver.onNext(request);
      }
    } catch (RuntimeException e) {
      // Cancel RPC
      requestObserver.onError(e);
      throw e;
    }
    // Mark the end of requests
    requestObserver.onCompleted();

    // return the latch while receiving happens asynchronously
    return finishLatch;
  }

  /** Issues several different requests and then exits. */
  public static void main(String[] args) throws InterruptedException {
    List features;
    try {
      features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
    } catch (IOException ex) {
      ex.printStackTrace();
      return;
    }

    RouteGuideClient client = new RouteGuideClient("localhost", 8980);
    try {
      // Looking for a valid feature
      client.getFeature(409146138, -746188906);

      // Feature missing.
      client.getFeature(0, 0);

      // Looking for features between 40, -75 and 42, -73.
      client.listFeatures(400000000, -750000000, 420000000, -730000000);

      // Record a few randomly selected points from the features file.
      client.recordRoute(features, 10);

      // Send and receive some notes.
      CountDownLatch finishLatch = client.routeChat();

      if (!finishLatch.await(1, TimeUnit.MINUTES)) {
        client.warning("routeChat can not finish within 1 minutes");
      }
    } finally {
      client.shutdown();
    }
  }

  private void info(String msg, Object... params) {
    logger.log(Level.INFO, msg, params);
  }

  private void warning(String msg, Object... params) {
    logger.log(Level.WARNING, msg, params);
  }

  private RouteNote newNote(String message, int lat, int lon) {
    return RouteNote.newBuilder().setMessage(message)
        .setLocation(Point.newBuilder().setLatitude(lat).setLongitude(lon).build()).build();
  }

  /**
   * Only used for unit test, as we do not want to introduce randomness in unit test.
   */
  @VisibleForTesting
  void setRandom(Random random) {
    this.random = random;
  }

  /**
   * Only used for helping unit test.
   */
  @VisibleForTesting
  interface TestHelper {
    /**
     * Used for verify/inspect message received from server.
     */
    void onMessage(Message message);

    /**
     * Used for verify/inspect error received from server.
     */
    void onRpcError(Throwable exception);
  }

  @VisibleForTesting
  void setTestHelper(TestHelper testHelper) {
    this.testHelper = testHelper;
  }
}
サービスとクライアントの両方のstreammingは、上の2つの組み合わせです。
2まとめ
  gRPCサービスの呼び出しは同期と非同期方式をサポートしています。普通のRPCとstreammingモードもサポートしています。業務のニーズを最大限に満たすことができます。streammingモードでは、HTTP/2.0プロトコルの多重化機能を十分に利用して、HTTPリンク上でデータを並列に双方向に伝送することを実現し、HTTP/1.Xのデータ一方向転送問題を効果的に解決し、HTTP接続を大幅に低減する場合、単一リンクの性能を十分に利用し、従来のRPCプライベート接続プロトコルに匹敵する:より少ないリンク、より高い性能
  gRPCのネットワークI/O通信は、Netty構築に基づいて、サービス呼び出しの下層部に非同期方式を統一して使用し、同期呼び出しは非同期に基づいて上層パッケージを作成した。したがって、gRPCの非同期化は比較的徹底的であり、I/O密集型トラヒックのスループットと信頼性の向上に大きな助けとなる。
 次の章では、SSLに関するgrpcの部分について説明します。