Nettyベースの高性能JAVAのRPCフレームワーク
26675 ワード
前言
今年7月ごろ、アリババが組織した高性能ミドルウェア挑戦試合に応募したが、今回の試合はこれまでの試合とは違って、一つの工事の視点から試合をした.この試合には2つの試合問題があり、第1題はRPCフレームワークを実現し、第2題はMomメッセージミドルウェアを実現することである.RPCテーマは以下の通り
簡単なRPCフレームワークRPC(Remote Procedure Call):リモート・プロシージャ・コール.これは、ネットワークを介してリモート・コンピュータ・プログラムからサービスを要求し、下位ネットワーク技術を理解する必要がないプロトコルである.RPCプロトコルは、TCPまたはUDPのようないくつかのトランスポート・プロトコルの存在を仮定し、通信プログラム間に情報データを搬送する.OSIネットワーク通信モデルでは、RPCはトランスポート層とアプリケーション層を越えている.RPCは、ネットワーク分散型マルチプログラムを含むアプリケーションの送信が容易です.フレームワーク-プログラマがフレームワークが提供する機能を便利に使用することができ、RPCの特性のため、アプリケーションの分布式サービス化開発に焦点を当てているため、開発者に感知されないインタフェースエージェントとなり、明らかにRPCフレームワークの優れた設計である.テーマ要求1.フレームワークになるには:フレームワークの使用者に対してRPC実装を非表示にします.2.ネットワークモジュールは自分で作成することができ、IOフレームワークを使用する場合はnetty-4.0を使用する必要がある.23.Final. 3.非同期呼び出しをサポートし、future、callbackの能力を提供する.4.基本タイプ、カスタムビジネスタイプ、例外タイプ(クライアントに投げ出す).5.タイムアウトシーンを処理するには、サービス側の処理時間が長い場合、クライアントは指定時間内に今回の呼び出しを飛び出す.6.RPCコンテキストを提供し、クライアントはサービス側にデータを伝達することができる.7.Hookを提供し、開発者にRPCレベルのAOPを行う.注:第一題の難易度を下げるため、RPCフレームワークは登録センターを必要とせず、クライアントは認識する別の−DSSIPのJVMパラメータは、サービス側IPを取得する.測定基準はすべての要求を満たす.パフォーマンステスト.テスト時にrpc-use-demoのテスト例が実行され、テストのdemoパッケージはテストツールで作成されます.参加者はcomでなければならない.alibaba.middleware.race.rpc.api.impl.RpcConsumerImplは全クラス名でcomを継承する.alibaba.middleware.race.rpc.api.RpcConsumerは、すべてのpublicメソッドを上書きします.参加者はcomでなければならない.alibaba.middleware.race.rpc.api.impl.RpcProviderImplは全クラス名でcomを継承する.alibaba.middleware.race.rpc.api.RpcProviderは、すべてのpublicメソッドを上書きします.参加者はパブリックmavenセンターライブラリの3つのパケットに依存し、対応するパケット名に従ってdemoの例が表示されます.自分のプロジェクトで対応するクラス(パッケージ名、クラス名が一致する)を作成します.三方ライブラリのコードはヒントとして機能し、最終的なpomに依存しないように参考にすることができます.そのため、最終的な参加者はrpc-apiのjarパッケージを作成して、プロジェクト呼び出しをテストする必要があります.(rpc-apiの例を参照した後、pom依存から削除し、依存競合を回避してください)テストDemoエンジニアリングはTaocode SVNのコードを参照してください.
RPCの実装
タイトルの中で推薦するネットのフレームワークはNetty 4を使って実現して、このRPCのフレームワークの中で実現する必要があるのは1.RPCクライアント2.RPCサービス
RPCクライアントの実装
RPCクライアントとRPCサーバ側は同じインタフェースクラスを必要とし、RPCクライアントはプロキシクラスを通じてRPCサーバ側の関数を呼び出す.
consumerオブジェクトはエージェントによって生成されるので、consumerが呼び出すとinvoke関数が呼び出され、今回ローカルの関数呼び出しの情報をネットワークを介してRPCサーバに送信し、サーバから返される情報を待ってから返すことができます.
サーバ実装
RPCサーバは主にRPCクライアントを受信した後にRPC呼び出しのインタフェース名,関数名およびパラメータを解析する.
handel関数はJavaの反射メカニズムを通じて、呼び出すインタフェースクラスを見つけて対応する関数を呼び出して実行し、結果をクライアントに返し、今回のRPC呼び出しは終了します.
RPCの主要な実装クラスは私のgithubで見ることができて、私のこのRPCのフレームワークは完璧ではありませんが、性能はやはり良いサーバーの上でテストする時TPCは9 w+があります.主な最適化は、Neety 4というフレームワークとパケットの処理、データのシーケンス化と逆シーケンス化の速度githubアドレスを使用することです.https://github.com/zhujunxxxxx/
オリジナル宣言
作者の小竹zz本文の住所http://blog.csdn.net/zhujunxxxxx/article/details/48742529、転載する場合は出典を明記してください
今年7月ごろ、アリババが組織した高性能ミドルウェア挑戦試合に応募したが、今回の試合はこれまでの試合とは違って、一つの工事の視点から試合をした.この試合には2つの試合問題があり、第1題はRPCフレームワークを実現し、第2題はMomメッセージミドルウェアを実現することである.RPCテーマは以下の通り
簡単なRPCフレームワークRPC(Remote Procedure Call):リモート・プロシージャ・コール.これは、ネットワークを介してリモート・コンピュータ・プログラムからサービスを要求し、下位ネットワーク技術を理解する必要がないプロトコルである.RPCプロトコルは、TCPまたはUDPのようないくつかのトランスポート・プロトコルの存在を仮定し、通信プログラム間に情報データを搬送する.OSIネットワーク通信モデルでは、RPCはトランスポート層とアプリケーション層を越えている.RPCは、ネットワーク分散型マルチプログラムを含むアプリケーションの送信が容易です.フレームワーク-プログラマがフレームワークが提供する機能を便利に使用することができ、RPCの特性のため、アプリケーションの分布式サービス化開発に焦点を当てているため、開発者に感知されないインタフェースエージェントとなり、明らかにRPCフレームワークの優れた設計である.テーマ要求1.フレームワークになるには:フレームワークの使用者に対してRPC実装を非表示にします.2.ネットワークモジュールは自分で作成することができ、IOフレームワークを使用する場合はnetty-4.0を使用する必要がある.23.Final. 3.非同期呼び出しをサポートし、future、callbackの能力を提供する.4.基本タイプ、カスタムビジネスタイプ、例外タイプ(クライアントに投げ出す).5.タイムアウトシーンを処理するには、サービス側の処理時間が長い場合、クライアントは指定時間内に今回の呼び出しを飛び出す.6.RPCコンテキストを提供し、クライアントはサービス側にデータを伝達することができる.7.Hookを提供し、開発者にRPCレベルのAOPを行う.注:第一題の難易度を下げるため、RPCフレームワークは登録センターを必要とせず、クライアントは認識する別の−DSSIPのJVMパラメータは、サービス側IPを取得する.測定基準はすべての要求を満たす.パフォーマンステスト.テスト時にrpc-use-demoのテスト例が実行され、テストのdemoパッケージはテストツールで作成されます.参加者はcomでなければならない.alibaba.middleware.race.rpc.api.impl.RpcConsumerImplは全クラス名でcomを継承する.alibaba.middleware.race.rpc.api.RpcConsumerは、すべてのpublicメソッドを上書きします.参加者はcomでなければならない.alibaba.middleware.race.rpc.api.impl.RpcProviderImplは全クラス名でcomを継承する.alibaba.middleware.race.rpc.api.RpcProviderは、すべてのpublicメソッドを上書きします.参加者はパブリックmavenセンターライブラリの3つのパケットに依存し、対応するパケット名に従ってdemoの例が表示されます.自分のプロジェクトで対応するクラス(パッケージ名、クラス名が一致する)を作成します.三方ライブラリのコードはヒントとして機能し、最終的なpomに依存しないように参考にすることができます.そのため、最終的な参加者はrpc-apiのjarパッケージを作成して、プロジェクト呼び出しをテストする必要があります.(rpc-apiの例を参照した後、pom依存から削除し、依存競合を回避してください)テストDemoエンジニアリングはTaocode SVNのコードを参照してください.
RPCの実装
タイトルの中で推薦するネットのフレームワークはNetty 4を使って実現して、このRPCのフレームワークの中で実現する必要があるのは1.RPCクライアント2.RPCサービス
RPCクライアントの実装
RPCクライアントとRPCサーバ側は同じインタフェースクラスを必要とし、RPCクライアントはプロキシクラスを通じてRPCサーバ側の関数を呼び出す.
RpcConsumerImpl
......
package com.alibaba.middleware.race.rpc.api.impl;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import com.alibaba.middleware.race.rpc.aop.ConsumerHook;
import com.alibaba.middleware.race.rpc.api.RpcConsumer;
import com.alibaba.middleware.race.rpc.async.ResponseCallbackListener;
import com.alibaba.middleware.race.rpc.context.RpcContext;
import com.alibaba.middleware.race.rpc.model.RpcRequest;
import com.alibaba.middleware.race.rpc.model.RpcResponse;
import com.alibaba.middleware.race.rpc.netty.RpcConnection;
import com.alibaba.middleware.race.rpc.netty.RpcNettyConnection;
import com.alibaba.middleware.race.rpc.tool.Tool;
public class RpcConsumerImpl extends RpcConsumer implements InvocationHandler {
private static AtomicLong callTimes = new AtomicLong(0L);
private RpcConnection connection;
private List connection_list;
private Map asyncMethods;
private Class> interfaceClass;
private String version;
private int timeout;
private ConsumerHook hook;
public Class> getInterfaceClass() {
return interfaceClass;
}
public String getVersion() {
return version;
}
public int getTimeout() {
this.connection.setTimeOut(timeout);
return timeout;
}
public ConsumerHook getHook() {
return hook;
}
RpcConnection select()
{
//Random rd=new Random(System.currentTimeMillis());
int d=(int) (callTimes.getAndIncrement()%(connection_list.size()+1));
if(d==0)
return connection;
else
{
return connection_list.get(d-1);
}
}
public RpcConsumerImpl()
{
//String ip=System.getProperty("SIP");
String ip="127.0.0.1";
this.asyncMethods=new HashMap();
this.connection=new RpcNettyConnection(ip,8888);
this.connection.connect();
connection_list=new ArrayList();
int num=Runtime.getRuntime().availableProcessors()/3 -2;
for (int i = 0; i < num; i++) {
connection_list.add(new RpcNettyConnection(ip, 8888));
}
for (RpcConnection conn:connection_list)
{
conn.connect();
}
}
public void destroy() throws Throwable {
if (null != connection) {
connection.close();
}
}
@SuppressWarnings("unchecked")
public T proxy(Class interfaceClass) throws Throwable {
if (!interfaceClass.isInterface()) {
throw new IllegalArgumentException(interfaceClass.getName()
+ " is not an interface");
}
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
new Class>[] { interfaceClass }, this);
}
@Override
public RpcConsumer interfaceClass(Class> interfaceClass) {
// TODO Auto-generated method stub
this.interfaceClass=interfaceClass;
return this;
}
@Override
public RpcConsumer version(String version) {
// TODO Auto-generated method stub
this.version=version;
return this;
}
@Override
public RpcConsumer clientTimeout(int clientTimeout) {
// TODO Auto-generated method stub
this.timeout=clientTimeout;
return this;
}
@Override
public RpcConsumer hook(ConsumerHook hook) {
// TODO Auto-generated method stub
this.hook=hook;
return this;
}
@Override
public Object instance() {
// TODO Auto-generated method stub
try {
return proxy(this.interfaceClass);
}
catch (Throwable e)
{
e.printStackTrace();
}
return null;
}
@Override
public void asynCall(String methodName) {
// TODO Auto-generated method stub
asynCall(methodName, null);
}
@Override
public void asynCall(
String methodName, T callbackListener) {
this.asyncMethods.put(methodName, callbackListener);
this.connection.setAsyncMethod(asyncMethods);
for (RpcConnection conn:connection_list)
{
conn.setAsyncMethod(asyncMethods);
}
}
@Override
public void cancelAsyn(String methodName) {
// TODO Auto-generated method stub
this.asyncMethods.remove(methodName);
this.connection.setAsyncMethod(asyncMethods);
for (RpcConnection conn:connection_list)
{
conn.setAsyncMethod(asyncMethods);
}
}
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
// TODO Auto-generated method stub
List parameterTypes = new LinkedList();
for (Class> parameterType : method.getParameterTypes()) {
parameterTypes.add(parameterType.getName());
}
RpcRequest request = new RpcRequest();
request.setRequestId(UUID.randomUUID().toString());
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);
if(hook!=null)
hook.before(request);
RpcResponse response = null;
try
{
request.setContext(RpcContext.props);
response = (RpcResponse) select().Send(request,asyncMethods.containsKey(request.getMethodName()));
if(hook!=null)
hook.after(request);
if(!asyncMethods.containsKey(request.getMethodName())&&response.getExption()!=null)
{
Throwable e=(Throwable) Tool.deserialize(response.getExption(),response.getClazz());
throw e.getCause();
}
}
catch (Throwable t)
{
//t.printStackTrace();
//throw new RuntimeException(t);
throw t;
}
finally
{
// if(asyncMethods.containsKey(request.getMethodName())&&asyncMethods.get(request.getMethodName())!=null)
// {
// cancelAsyn(request.getMethodName());
// }
}
if(response==null)
{
return null;
}
else if (response.getErrorMsg() != null)
{
throw response.getErrorMsg();
}
else
{
return response.getAppResponse();
}
}
}
RpcConsumer consumer;
consumer = (RpcConsumer) getConsumerImplClass().newInstance();
consumer.someMethod();
consumerオブジェクトはエージェントによって生成されるので、consumerが呼び出すとinvoke関数が呼び出され、今回ローカルの関数呼び出しの情報をネットワークを介してRPCサーバに送信し、サーバから返される情報を待ってから返すことができます.
サーバ実装
RPCサーバは主にRPCクライアントを受信した後にRPC呼び出しのインタフェース名,関数名およびパラメータを解析する.
package com.alibaba.middleware.race.rpc.api.impl;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import net.sf.cglib.reflect.FastClass;
import net.sf.cglib.reflect.FastMethod;
import com.alibaba.middleware.race.rpc.context.RpcContext;
import com.alibaba.middleware.race.rpc.model.RpcRequest;
import com.alibaba.middleware.race.rpc.model.RpcResponse;
import com.alibaba.middleware.race.rpc.serializer.KryoSerialization;
import com.alibaba.middleware.race.rpc.tool.ByteObjConverter;
import com.alibaba.middleware.race.rpc.tool.ReflectionCache;
import com.alibaba.middleware.race.rpc.tool.Tool;
/**
* RPC
* @author sei.zz
*
*/
public class RpcRequestHandler extends ChannelInboundHandlerAdapter {
// ID RpcContext Map;
private static Map> ThreadLocalMap=new HashMap>();
// -
private final Map handlerMap;
KryoSerialization kryo=new KryoSerialization();
public RpcRequestHandler(Map handlerMap) {
this.handlerMap = handlerMap;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("active");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
System.out.println("disconnected");
}
// RpcContext
private void UpdateRpcContext(String host,Map map)
{
if(ThreadLocalMap.containsKey(host))
{
Map local=ThreadLocalMap.get(host);
local.putAll(map);//
ThreadLocalMap.put(host, local);//
for(Map.Entry entry:map.entrySet()){ //
RpcContext.addProp(entry.getKey(), entry.getValue());
}
}
else
{
ThreadLocalMap.put(host, map);
// Context
for(Map.Entry entry:map.entrySet()){
RpcContext.addProp(entry.getKey(), entry.getValue());
}
}
}
//
private static Object cacheName=null;
private static Object cacheVaule=null;
@Override
public void channelRead(
ChannelHandlerContext ctx, Object msg) throws Exception {
RpcRequest request=(RpcRequest)msg;
String host=ctx.channel().remoteAddress().toString();
//
UpdateRpcContext(host,request.getContext());
//TODO
RpcResponse response = new RpcResponse();
response.setRequestId(request.getRequestId());
try
{
Object result = handle(request);
if(cacheName!=null&&cacheName.equals(result))
{
response.setAppResponse(cacheVaule);
}
else
{
response.setAppResponse(ByteObjConverter.ObjectToByte(result));
cacheName=result;
cacheVaule=ByteObjConverter.ObjectToByte(result);
}
}
catch (Throwable t)
{
//response.setErrorMsg(t);
response.setExption(Tool.serialize(t));
response.setClazz(t.getClass());
}
ctx.writeAndFlush(response);
}
/**
*
* @param request
* @return
* @throws Throwable
*/
private static RpcRequest methodCacheName=null;
private static Object methodCacheValue=null;
private Object handle(RpcRequest request) throws Throwable
{
String className = request.getClassName();
Object classimpl = handlerMap.get(className);//
Class> clazz = classimpl.getClass();
String methodName = request.getMethodName();
Class>[] parameterTypes = request.getParameterTypes();
Object[] parameters = request.getParameters();
// Method method = ReflectionCache.getMethod(clazz.getName(),methodName, parameterTypes);
// method.setAccessible(true);
//System.out.println(className+":"+methodName+":"+parameters.length);
if(methodCacheName!=null&&methodCacheName.equals(request))
{
return methodCacheValue;
}
else
{
try
{
methodCacheName=request;
if(methodMap.containsKey(methodName))
{
methodCacheValue= methodMap.get(methodName).invoke(classimpl, parameters);
return methodCacheValue;
}
else
{
FastClass serviceFastClass = FastClass.create(clazz);
FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
methodMap.put(methodName, serviceFastMethod);
methodCacheValue= serviceFastMethod.invoke(classimpl, parameters);
return methodCacheValue;
}
//return method.invoke(classimpl, parameters);
}
catch (Throwable e)
{
throw e.getCause();
}
}
}
private Map methodMap=new HashMap();
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
{
//ctx.close();
//cause.printStackTrace();
ctx.close();
}
}
handel関数はJavaの反射メカニズムを通じて、呼び出すインタフェースクラスを見つけて対応する関数を呼び出して実行し、結果をクライアントに返し、今回のRPC呼び出しは終了します.
RPCの主要な実装クラスは私のgithubで見ることができて、私のこのRPCのフレームワークは完璧ではありませんが、性能はやはり良いサーバーの上でテストする時TPCは9 w+があります.主な最適化は、Neety 4というフレームワークとパケットの処理、データのシーケンス化と逆シーケンス化の速度githubアドレスを使用することです.https://github.com/zhujunxxxxx/
オリジナル宣言
作者の小竹zz本文の住所http://blog.csdn.net/zhujunxxxxx/article/details/48742529、転載する場合は出典を明記してください