0から1までRPCフレームワークを構築
Cool-Rpc
前言
このブログのプロジェクトコードはgithubでオープンソースになっています.皆さん、一緒に貢献してください.点此进入:Cool-RPC
最近ブログを書くのはまだ17年末で、長い間注目してくれてありがとう.本編のブログは0から1まで、簡単で効率的で開拓性の強いrpcフレームワークを構築する方法を教えてくれます.
RPCとは
アリのDubbo、グーグルのgrpc、FacebookのThriftなど、RPCフレームワークを多かれ少なかれ使用したことがあると信じています.
では、いったいrpcとは何ですか.rpcは中国語に訳して遠隔プロセス呼び出しと呼ばれ、分かりやすい点:単一のアプリケーションフレームを分布式システムアーキテクチャを構成した後、複数のシステム間のデータがどのように相互作用するか、これがrpcの職責である.
サービスの観点から、rpcはサービスプロバイダ(provider)とサービス消費者(consumer)の2つのクラスに分けられ、中間にはjavaインタフェースを共有するものがあり、オープンapiインタフェースと呼ばれる.つまり、インタフェースサービス実装クラスが置かれている場所をproviderと呼び、インタフェースサービス呼び出しクラスが置かれている場所をconsumerと呼ぶ.
分散環境にあるため、consumerがproviderを呼び出すとき、相手のサーバのIPとオープンポートをどのように知るのでしょうか.この場合、登録センターというコンポーネントが必要です.consumerはサービス名を通して、登録センターでそのサービスのIP+Portを検索し、アドレスデータを取得してから、そのアドレスのサービスを要求します.
図:
Cool-rpc技術概要
この項目はトランスポート層(TCP/IPプロトコル)に基づいて通信を行い、トランスポート層フレームワークはnettyで作成され、githubにはminaバージョンで複数のシーケンス化フレームワークが提供され、デフォルトではProtostuffシーケンス化が使用され、javaシーケンス化などの登録センターデフォルトzookeeperを使用するように構成でき、redis(ノードデータストレージとメッセージ通知機能のあるコンポーネントのみ)を使用するように構成できる
consumerはjavaダイナミックエージェント方式でリモートコールを実行して実行するクラス名、方法、パラメータなどをproviderに通知し、その後providerはデータを持ってローカル実装クラスを呼び出し、処理後の結果をconsumerに通知する
登録センター
くだらないことをたくさん言って、乾物を始めて、githubから完全なコードをクローンすることを提案して、本編の博文は重点のコードだけを話します
登録センターはapiインタフェース名key、IP+Portをvalueとし、消費者のクエリー呼び出しのためにデータを永続化します.
zookeeperを例に挙げると、
サービス登録者と発見者をより柔軟に実現するために、ここに登録センターアダプタを追加します.
このブログのTCPデータ(コーデック、プロセッサを含む)はすべてnettyで記述されています.
サービス側のnettyブートクラス:
サービス・エンド・プロセッサ(netty handler):
クライアントTCPプロセッサ
consumerはapiインタフェースのみであり、その実装クラスはないのでjavaダイナミックエージェント方式でメソッド実装をカスタマイズすることができ、エージェントのメソッド実装はTCP握手接続を確立し、providerがメソッドを実行し、得られた結果をエージェントクラスに返すことで、インタフェースだけで実装クラスメソッドを呼び出すことができる仮象をもたらす.
ステップ1:javaダイナミックエージェントnewを使用してエージェントオブジェクトをエクスポートする
クライアントブートクラス:
終わりの言葉
以上はCool-rpcの简単な说明で、もしもっと良い考えがあれば私に连络して热烈にみんなを歓迎して一绪にこのプロジェクトCool-RPCを维持します
前言
このブログのプロジェクトコードはgithubでオープンソースになっています.皆さん、一緒に貢献してください.点此进入:Cool-RPC
最近ブログを書くのはまだ17年末で、長い間注目してくれてありがとう.本編のブログは0から1まで、簡単で効率的で開拓性の強いrpcフレームワークを構築する方法を教えてくれます.
RPCとは
アリのDubbo、グーグルのgrpc、FacebookのThriftなど、RPCフレームワークを多かれ少なかれ使用したことがあると信じています.
では、いったいrpcとは何ですか.rpcは中国語に訳して遠隔プロセス呼び出しと呼ばれ、分かりやすい点:単一のアプリケーションフレームを分布式システムアーキテクチャを構成した後、複数のシステム間のデータがどのように相互作用するか、これがrpcの職責である.
サービスの観点から、rpcはサービスプロバイダ(provider)とサービス消費者(consumer)の2つのクラスに分けられ、中間にはjavaインタフェースを共有するものがあり、オープンapiインタフェースと呼ばれる.つまり、インタフェースサービス実装クラスが置かれている場所をproviderと呼び、インタフェースサービス呼び出しクラスが置かれている場所をconsumerと呼ぶ.
分散環境にあるため、consumerがproviderを呼び出すとき、相手のサーバのIPとオープンポートをどのように知るのでしょうか.この場合、登録センターというコンポーネントが必要です.consumerはサービス名を通して、登録センターでそのサービスのIP+Portを検索し、アドレスデータを取得してから、そのアドレスのサービスを要求します.
図:
Cool-rpc技術概要
この項目はトランスポート層(TCP/IPプロトコル)に基づいて通信を行い、トランスポート層フレームワークはnettyで作成され、githubにはminaバージョンで複数のシーケンス化フレームワークが提供され、デフォルトではProtostuffシーケンス化が使用され、javaシーケンス化などの登録センターデフォルトzookeeperを使用するように構成でき、redis(ノードデータストレージとメッセージ通知機能のあるコンポーネントのみ)を使用するように構成できる
consumerはjavaダイナミックエージェント方式でリモートコールを実行して実行するクラス名、方法、パラメータなどをproviderに通知し、その後providerはデータを持ってローカル実装クラスを呼び出し、処理後の結果をconsumerに通知する
登録センター
くだらないことをたくさん言って、乾物を始めて、githubから完全なコードをクローンすることを提案して、本編の博文は重点のコードだけを話します
登録センターはapiインタフェース名key、IP+Portをvalueとし、消費者のクエリー呼び出しのためにデータを永続化します.
zookeeperを例に挙げると、
サービス登録者と発見者をより柔軟に実現するために、ここに登録センターアダプタを追加します.
public abstract class ServiceCenterAdapter implements ServiceCenter{
String host;
int port = 0;
String passWord;
ServiceCenterAdapter(){}
ServiceCenterAdapter(String host){
this.host = host;
}
ServiceCenterAdapter(String host, int port) {
this.host = host;
this.port = port;
}
@Override
public String discover(String serviceName) {
return null;
}
@Override
public void register(String serviceName, String serviceAddress) {}
@Override
public void setHost(String host){
this.host = host;
};
@Override
public void setPort(int port){
this.port = port;
};
@Override
public void setPassWord(String passWord){
this.passWord = passWord;
};
// IP:
@Override
public String getAddress(){
if ("".equals(host) || host == null || port == 0){
throw new RuntimeException("the zookeeper host or port error");
}
return host+":"+String.valueOf(port);
};
}
zookeeperのサービス登録(provider使用):実際のプロジェクトでは、このような構造を構築し、対応するIPとポートを注入し、最後にbeanの形でIOCコンテナに注入する必要があるpublic class ZooKeeperServiceRegistry extends ServiceCenterAdapter {
private static final Logger log = LoggerFactory.getLogger(ZooKeeperServiceRegistry.class);
private ZkClient zkClient;
{
this.port = 2181;
zkClient = new ZkClient(getAddress(), CoolConstant.ZK_SESSION_TIMEOUT, CoolConstant.ZK_CONNECTION_TIMEOUT);
log.info("connect zookeeper");
}
public ZooKeeperServiceRegistry(String zkHost) {
super(zkHost);
}
public ZooKeeperServiceRegistry(String zkHost, int zkPort) {
super(zkHost, zkPort);
}
// serviceName= serviceAddress=IP+Port
@Override
public void register(String serviceName, String serviceAddress) {
// create cool node permanent
String registryPath = CoolConstant.ZK_REGISTRY_PATH;
if (!zkClient.exists(registryPath)) {
zkClient.createPersistent(registryPath);
log.info("create registry node: {}", registryPath);
}
// create service node permanent
String servicePath = registryPath + "/" + serviceName;
if (!zkClient.exists(servicePath)) {
zkClient.createPersistent(servicePath);
log.info("create service node: {}", servicePath);
}
// create service address node temp
String addressPath = servicePath + "/address-";
String addressNode = zkClient.createEphemeralSequential(addressPath, serviceAddress);
log.info("create address node: {}", addressNode);
}
}
zookeeperのサービス発見者(consumer使用):同様に、対応するIPとポートを構成し、beanでプロジェクトiocコンテナに注入する必要があります.public class ZooKeeperServiceDiscovery extends ServiceCenterAdapter {
private static final Logger log = LoggerFactory.getLogger(ZooKeeperServiceDiscovery.class);
{
super.port = 2181;
}
public ZooKeeperServiceDiscovery(){};
public ZooKeeperServiceDiscovery(String zkHost){
super(zkHost);
}
public ZooKeeperServiceDiscovery(String zkHost, int zkPort){
super(zkHost, zkPort);
}
// name=api
@Override
public String discover(String name) {
ZkClient zkClient = new ZkClient(getAddress(), CoolConstant.ZK_SESSION_TIMEOUT, CoolConstant.ZK_CONNECTION_TIMEOUT);
log.debug("connect zookeeper");
try {
String servicePath = CoolConstant.ZK_REGISTRY_PATH + "/" + name;
if (!zkClient.exists(servicePath)) {
throw new RuntimeException(String.format("can not find any service node on path: %s", servicePath));
}
List addressList = zkClient.getChildren(servicePath);
if (addressList.size() == 0) {
throw new RuntimeException(String.format("can not find any address node on path: %s", servicePath));
}
String address;
int size = addressList.size();
if (size == 1) {
address = addressList.get(0);
log.debug("get only address node: {}", address);
} else {
address = addressList.get(ThreadLocalRandom.current().nextInt(size));
log.debug("get random address node: {}", address);
}
String addressPath = servicePath + "/" + address;
return zkClient.readData(addressPath);
} finally {
zkClient.close();
}
}
}
サービス側TCPプロセッサこのブログのTCPデータ(コーデック、プロセッサを含む)はすべてnettyで記述されています.
サービス側のnettyブートクラス:
public class CoolRpcServer implements ApplicationContextAware {
private static Logger log = LoggerFactory.getLogger(CoolRpcServer.class);
private Channel channel;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private ServerBootstrap bootstrap;
private HandlerInitializer handlerInitializer;
private ServiceCenter serviceRegistry;
private String serviceIP;
private int port;
public static Map servicesMap ;
{
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
bootstrap = new ServerBootstrap();
handlerInitializer = new HandlerInitializer();
servicesMap = new HashMap<>(16);
}
public CoolRpcServer(ServiceCenter serviceRegistry, String serviceIP, int port){
this.serviceRegistry = serviceRegistry;
this.serviceIP = serviceIP;
this.port = port;
}
/**
* start and init tcp server if ioc contain is booting
*/
@SuppressWarnings("unchecked")
public void initServer() throws InterruptedException {
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(handlerInitializer);
bootstrap.option(ChannelOption.SO_BACKLOG, 128);
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
// the most send bytes ( 256KB )
bootstrap.childOption(ChannelOption.SO_SNDBUF, 1024 * 256);
// the most receive bytes ( 2048KB )
bootstrap.childOption(ChannelOption.SO_RCVBUF, 1024 * 1024 * 2);
channel = bootstrap.bind(serviceIP,port).sync().channel();
if (servicesMap != null && servicesMap.size() > 0){
for (String beanName: servicesMap.keySet()){
serviceRegistry.register(beanName, serviceIP + ":" + String.valueOf(port));
log.info("register service name = {}", beanName);
}
}
log.info("TCP server started successfully, port:{}", port);
channel.closeFuture().sync();
}
/**
* close ioc contain and stop tcp server
*/
public void stopServer(){
if (channel != null && channel.isActive()) {
channel.close();
}
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
log.info("TCP server stopped successfully, port: {}", port);
}
/**
* scan Annotation of CoolService
*/
@Override
public void setApplicationContext(ApplicationContext ctx) throws BeansException {
Map beans = ctx.getBeansWithAnnotation(CoolService.class);
if (beans != null && beans.size()>0){
for (Object bean : beans.values()){
String name = bean.getClass().getAnnotation(CoolService.class).value().getName();
servicesMap.put(name, bean);
}
}
}
}
このプロジェクトのオープンapiインタフェース実装クラスは@CoolService
注釈で識別する必要があり、サービス側コンテナが起動すると、この注釈付き実装クラスはすべてスキャンされ、登録センターに注入されます.サービス・エンド・プロセッサ(netty handler):
@ChannelHandler.Sharable
public class CoolServerHandler extends ChannelInboundHandlerAdapter {
private static Logger log = LoggerFactory.getLogger(CoolServerHandler.class);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
CoolResponse response = new CoolResponse();
CoolRequest request = (CoolRequest) msg;
try {
Object result = invoke(request);
response.setRequestID(request.getRequestID());
response.setResult(result);
} catch (Throwable error) {
response.setError(error);
}
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
private Object invoke(CoolRequest request) throws Throwable{
if (request == null){
throw new Throwable("cool rpc request not found");
}
String className = request.getClassName();
String methodName = request.getMethodName();
Object[] parameters = request.getParameters();
Object service = CoolRpcServer.servicesMap.get(className);
if (service == null){
throw new Throwable("cool rpc service not exist");
}
Class> serviceClass = service.getClass();
Class>[] parameterTypes = request.getParameterTypes();
FastClass fastClass = FastClass.create(serviceClass);
FastMethod fastMethod = fastClass.getMethod(methodName, parameterTypes);
return fastMethod.invoke(service, parameters);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("server caught exception", cause);
ctx.close();
}
}
クライアントから送信された要求データ(クラス名,メソッド,パラメータ)をローカルでcglibで呼び出し呼び出しの反射に成功した後,処理済みの結果符号化をクライアントに返し,TCP接続を閉じるクライアントTCPプロセッサ
consumerはapiインタフェースのみであり、その実装クラスはないのでjavaダイナミックエージェント方式でメソッド実装をカスタマイズすることができ、エージェントのメソッド実装はTCP握手接続を確立し、providerがメソッドを実行し、得られた結果をエージェントクラスに返すことで、インタフェースだけで実装クラスメソッドを呼び出すことができる仮象をもたらす.
ステップ1:javaダイナミックエージェントnewを使用してエージェントオブジェクトをエクスポートする
public class CoolProxy {
private static Logger log = LoggerFactory.getLogger(CoolProxy.class);
private ServiceCenter serviceDiscovery;
public CoolProxy(ServiceCenter serviceDiscovery){
this.serviceDiscovery = serviceDiscovery;
}
@SuppressWarnings("unchecked")
public T getInstance(Class cls){
return (T)Proxy.newProxyInstance(cls.getClassLoader(),
new Class>[]{cls},
(proxy, method, args) -> {
CoolRequest request = new CoolRequest();
request.setRequestID(UUID.randomUUID().toString());
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameters(args);
request.setParameterTypes(method.getParameterTypes());
String[] addr = serviceDiscovery.discover(cls.getName()).split(":",2);
CoolRpcClient client = new CoolRpcClient(addr[0],
Integer.parseInt(addr[1]));
CoolResponse response = client.send(request);
if (response.getError()!=null){
throw response.getError();
} else {
return response.getResult();
}
});
}
}
ステップ2:エージェントメソッドで、リモート・プロシージャ・コール(rpc)を使用クライアントブートクラス:
public class CoolRpcClient {
private static Logger log = LoggerFactory.getLogger(CoolRpcClient.class);
private CountDownLatch countDownLatch;
private EventLoopGroup group;
private Bootstrap bootstrap;
private CoolResponse response;
private String serviceIP;
private int port;
{
response = new CoolResponse();
countDownLatch = new CountDownLatch(1);
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
}
public CoolRpcClient(String serviceIP, int port){
this.serviceIP = serviceIP;
this.port = port;
}
public CoolResponse send(CoolRequest request){
try {
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new CoolRpcDecoder(CoolResponse.class))
.addLast(new CoolRpcEncoder(CoolRequest.class))
.addLast(new CoolClientHandler(countDownLatch, response));
}
});
bootstrap.option(ChannelOption.TCP_NODELAY, true);
Channel channel = bootstrap.connect(serviceIP, port).sync().channel();
channel.writeAndFlush(request).sync();
countDownLatch.await();
channel.closeFuture().sync();
return response;
} catch (Exception e){
e.printStackTrace();
return null;
} finally {
group.shutdownGracefully();
}
}
}
クライアントプロセッサ(handler):@ChannelHandler.Sharable
public class CoolClientHandler extends ChannelInboundHandlerAdapter {
private static Logger log = LoggerFactory.getLogger(CoolClientHandler.class);
private CountDownLatch latch;
private CoolResponse response;
public CoolClientHandler(CountDownLatch latch, CoolResponse response){
this.latch = latch;
this.response = response;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
CoolResponse enResponse = (CoolResponse) msg;
this.response.sync(enResponse);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
latch.countDown();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("api caught exception", cause);
ctx.close();
}
}
最後にCountDownLatch同期を使用して呼び出し者に通知し、rpc呼び出しが完了した.終わりの言葉
以上はCool-rpcの简単な说明で、もしもっと良い考えがあれば私に连络して热烈にみんなを歓迎して一绪にこのプロジェクトCool-RPCを维持します