Nettyベースの高性能JAVAのRPCフレームワーク


前言
今年7月ごろ、アリババが組織した高性能ミドルウェア挑戦試合に応募したが、今回の試合はこれまでの試合とは違って、一つの工事の視点から試合をした.この試合には2つの試合問題があり、第1題はRPCフレームワークを実現し、第2題はMomメッセージミドルウェアを実現することである.RPCテーマは以下の通り
簡単なRPCフレームワークRPC(Remote Procedure Call):リモート・プロシージャ・コール.これは、ネットワークを介してリモート・コンピュータ・プログラムからサービスを要求し、下位ネットワーク技術を理解する必要がないプロトコルである.RPCプロトコルは、TCPまたはUDPのようないくつかのトランスポート・プロトコルの存在を仮定し、通信プログラム間に情報データを搬送する.OSIネットワーク通信モデルでは、RPCはトランスポート層とアプリケーション層を越えている.RPCは、ネットワーク分散型マルチプログラムを含むアプリケーションの送信が容易です.フレームワーク-プログラマがフレームワークが提供する機能を便利に使用することができ、RPCの特性のため、アプリケーションの分布式サービス化開発に焦点を当てているため、開発者に感知されないインタフェースエージェントとなり、明らかにRPCフレームワークの優れた設計である.テーマ要求1.フレームワークになるには:フレームワークの使用者に対してRPC実装を非表示にします.2.ネットワークモジュールは自分で作成することができ、IOフレームワークを使用する場合はnetty-4.0を使用する必要がある.23.Final. 3.非同期呼び出しをサポートし、future、callbackの能力を提供する.4.基本タイプ、カスタムビジネスタイプ、例外タイプ(クライアントに投げ出す).5.タイムアウトシーンを処理するには、サービス側の処理時間が長い場合、クライアントは指定時間内に今回の呼び出しを飛び出す.6.RPCコンテキストを提供し、クライアントはサービス側にデータを伝達することができる.7.Hookを提供し、開発者にRPCレベルのAOPを行う.注:第一題の難易度を下げるため、RPCフレームワークは登録センターを必要とせず、クライアントは認識する別の−DSSIPのJVMパラメータは、サービス側IPを取得する.測定基準はすべての要求を満たす.パフォーマンステスト.テスト時にrpc-use-demoのテスト例が実行され、テストのdemoパッケージはテストツールで作成されます.参加者はcomでなければならない.alibaba.middleware.race.rpc.api.impl.RpcConsumerImplは全クラス名でcomを継承する.alibaba.middleware.race.rpc.api.RpcConsumerは、すべてのpublicメソッドを上書きします.参加者はcomでなければならない.alibaba.middleware.race.rpc.api.impl.RpcProviderImplは全クラス名でcomを継承する.alibaba.middleware.race.rpc.api.RpcProviderは、すべてのpublicメソッドを上書きします.参加者はパブリックmavenセンターライブラリの3つのパケットに依存し、対応するパケット名に従ってdemoの例が表示されます.自分のプロジェクトで対応するクラス(パッケージ名、クラス名が一致する)を作成します.三方ライブラリのコードはヒントとして機能し、最終的なpomに依存しないように参考にすることができます.そのため、最終的な参加者はrpc-apiのjarパッケージを作成して、プロジェクト呼び出しをテストする必要があります.(rpc-apiの例を参照した後、pom依存から削除し、依存競合を回避してください)テストDemoエンジニアリングはTaocode SVNのコードを参照してください.
RPCの実装
タイトルの中で推薦するネットのフレームワークはNetty 4を使って実現して、このRPCのフレームワークの中で実現する必要があるのは1.RPCクライアント2.RPCサービス
RPCクライアントの実装
RPCクライアントとRPCサーバ側は同じインタフェースクラスを必要とし、RPCクライアントはプロキシクラスを通じてRPCサーバ側の関数を呼び出す.
RpcConsumerImpl   
......
package com.alibaba.middleware.race.rpc.api.impl;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;


import com.alibaba.middleware.race.rpc.aop.ConsumerHook;
import com.alibaba.middleware.race.rpc.api.RpcConsumer;
import com.alibaba.middleware.race.rpc.async.ResponseCallbackListener;
import com.alibaba.middleware.race.rpc.context.RpcContext;
import com.alibaba.middleware.race.rpc.model.RpcRequest;
import com.alibaba.middleware.race.rpc.model.RpcResponse;
import com.alibaba.middleware.race.rpc.netty.RpcConnection;
import com.alibaba.middleware.race.rpc.netty.RpcNettyConnection;
import com.alibaba.middleware.race.rpc.tool.Tool;


public class RpcConsumerImpl extends RpcConsumer implements InvocationHandler {

    private static AtomicLong callTimes = new AtomicLong(0L);
    private RpcConnection connection;
    private List connection_list;
    private Map asyncMethods;
    private Class> interfaceClass;

    private String version;

    private int timeout;

    private ConsumerHook hook;

    public Class> getInterfaceClass() {
        return interfaceClass;
    }
    public String getVersion() {
        return version;
    }
    public int getTimeout() {
        this.connection.setTimeOut(timeout);
        return timeout;
    }
    public ConsumerHook getHook() {
        return hook;
    }
    RpcConnection select()
    {
        //Random rd=new Random(System.currentTimeMillis());
        int d=(int) (callTimes.getAndIncrement()%(connection_list.size()+1));
        if(d==0)
            return connection;
        else
        {
            return connection_list.get(d-1);
        }
    }
    public RpcConsumerImpl()
    {
        //String ip=System.getProperty("SIP");
        String ip="127.0.0.1";
        this.asyncMethods=new HashMap();
        this.connection=new RpcNettyConnection(ip,8888);
        this.connection.connect();
        connection_list=new ArrayList();
        int num=Runtime.getRuntime().availableProcessors()/3 -2;
        for (int i = 0; i < num; i++) {
            connection_list.add(new RpcNettyConnection(ip, 8888));
        }
        for (RpcConnection conn:connection_list) 
        {
            conn.connect();
        }

    }
    public void destroy() throws Throwable {
        if (null != connection) {
            connection.close();
        }
    }

    @SuppressWarnings("unchecked")
    public  T proxy(Class interfaceClass) throws Throwable {
        if (!interfaceClass.isInterface()) {
            throw new IllegalArgumentException(interfaceClass.getName()
                    + " is not an interface");
        }
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
                new Class>[] { interfaceClass }, this);
    }
    @Override
    public RpcConsumer interfaceClass(Class> interfaceClass) {
        // TODO Auto-generated method stub
        this.interfaceClass=interfaceClass;
        return this;
    }

    @Override
    public RpcConsumer version(String version) {
        // TODO Auto-generated method stub
        this.version=version;
        return this;
    }

    @Override
    public RpcConsumer clientTimeout(int clientTimeout) {
        // TODO Auto-generated method stub
        this.timeout=clientTimeout;
        return this;
    }

    @Override
    public RpcConsumer hook(ConsumerHook hook) {
        // TODO Auto-generated method stub
        this.hook=hook;
        return this;
    }

    @Override
    public Object instance() {
        // TODO Auto-generated method stub
        try {
            return proxy(this.interfaceClass);
        }
        catch (Throwable e) 
        {
            e.printStackTrace();
        }
        return null;
    }

    @Override
    public void asynCall(String methodName) {
        // TODO Auto-generated method stub
         asynCall(methodName, null);
    }

    @Override
    public  void asynCall(
            String methodName, T callbackListener) {

        this.asyncMethods.put(methodName, callbackListener);
        this.connection.setAsyncMethod(asyncMethods);

        for (RpcConnection conn:connection_list) 
        {
            conn.setAsyncMethod(asyncMethods);
        }
    }

    @Override
    public void cancelAsyn(String methodName) {
        // TODO Auto-generated method stub
        this.asyncMethods.remove(methodName);
        this.connection.setAsyncMethod(asyncMethods);
        for (RpcConnection conn:connection_list) 
        {
            conn.setAsyncMethod(asyncMethods);
        }
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args)
            throws Throwable {
        // TODO Auto-generated method stub
        List parameterTypes = new LinkedList();
        for (Class> parameterType : method.getParameterTypes()) {
            parameterTypes.add(parameterType.getName());
        }
        RpcRequest request = new RpcRequest();
        request.setRequestId(UUID.randomUUID().toString());
        request.setClassName(method.getDeclaringClass().getName());
        request.setMethodName(method.getName());
        request.setParameterTypes(method.getParameterTypes());
        request.setParameters(args);
        if(hook!=null)
            hook.before(request);
        RpcResponse response = null;
        try
        {
            request.setContext(RpcContext.props);
            response = (RpcResponse) select().Send(request,asyncMethods.containsKey(request.getMethodName()));
            if(hook!=null)
                hook.after(request);

            if(!asyncMethods.containsKey(request.getMethodName())&&response.getExption()!=null)
            {

                Throwable e=(Throwable) Tool.deserialize(response.getExption(),response.getClazz());
                throw e.getCause();
            }
        }
        catch (Throwable t)
        {   
            //t.printStackTrace();
            //throw new RuntimeException(t);
            throw t;
        }
        finally
        {
//          if(asyncMethods.containsKey(request.getMethodName())&&asyncMethods.get(request.getMethodName())!=null)
//          {
//              cancelAsyn(request.getMethodName());
//          }
        }
        if(response==null)
        {
            return null;
        }
        else if (response.getErrorMsg() != null) 
        {
            throw response.getErrorMsg();
        } 
        else 
        {
            return response.getAppResponse();
        }

    }
}
RpcConsumer consumer;
consumer = (RpcConsumer) getConsumerImplClass().newInstance();
consumer.someMethod();

consumerオブジェクトはエージェントによって生成されるので、consumerが呼び出すとinvoke関数が呼び出され、今回ローカルの関数呼び出しの情報をネットワークを介してRPCサーバに送信し、サーバから返される情報を待ってから返すことができます.
サーバ実装
RPCサーバは主にRPCクライアントを受信した後にRPC呼び出しのインタフェース名,関数名およびパラメータを解析する.
package com.alibaba.middleware.race.rpc.api.impl;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;

import net.sf.cglib.reflect.FastClass;
import net.sf.cglib.reflect.FastMethod;
import com.alibaba.middleware.race.rpc.context.RpcContext;
import com.alibaba.middleware.race.rpc.model.RpcRequest;
import com.alibaba.middleware.race.rpc.model.RpcResponse;
import com.alibaba.middleware.race.rpc.serializer.KryoSerialization;
import com.alibaba.middleware.race.rpc.tool.ByteObjConverter;
import com.alibaba.middleware.race.rpc.tool.ReflectionCache;
import com.alibaba.middleware.race.rpc.tool.Tool;


/**
 *         RPC       
 * @author sei.zz
 *
 */
public class RpcRequestHandler extends ChannelInboundHandlerAdapter {

    //      ID         RpcContext Map;
    private static Map> ThreadLocalMap=new HashMap>();
    //     -       
    private final Map handlerMap;
    KryoSerialization kryo=new KryoSerialization();
    public RpcRequestHandler(Map handlerMap) {
        this.handlerMap = handlerMap;
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("active");
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // TODO Auto-generated method stub
        System.out.println("disconnected");
    }
    //  RpcContext   
    private void UpdateRpcContext(String host,Map map)
    {
        if(ThreadLocalMap.containsKey(host))
        {
            Map local=ThreadLocalMap.get(host);
            local.putAll(map);//        
            ThreadLocalMap.put(host, local);//   
            for(Map.Entry entry:map.entrySet()){ //    
                RpcContext.addProp(entry.getKey(), entry.getValue());
            }
        }
        else
        {
            ThreadLocalMap.put(host, map);
            //      Context  
            for(Map.Entry entry:map.entrySet()){ 
                  RpcContext.addProp(entry.getKey(), entry.getValue());
            }
        }

    }
      //             
      private static Object cacheName=null;
      private static Object cacheVaule=null;

      @Override
      public void channelRead(
          ChannelHandlerContext ctx, Object msg) throws Exception {
          RpcRequest request=(RpcRequest)msg;
          String host=ctx.channel().remoteAddress().toString();
          //     
          UpdateRpcContext(host,request.getContext());
          //TODO                             
          RpcResponse response = new RpcResponse();
          response.setRequestId(request.getRequestId());
          try 
          {
              Object result = handle(request);
              if(cacheName!=null&&cacheName.equals(result))
              {
                  response.setAppResponse(cacheVaule);
              }
              else
              {
                  response.setAppResponse(ByteObjConverter.ObjectToByte(result));
                  cacheName=result;
                  cacheVaule=ByteObjConverter.ObjectToByte(result);
              }
          } 
          catch (Throwable t) 
          {
              //response.setErrorMsg(t);
              response.setExption(Tool.serialize(t));
              response.setClazz(t.getClass());
          }
          ctx.writeAndFlush(response);
      }

      /**
          *            
          * @param request
          * @return
          * @throws Throwable
          */
     private static RpcRequest methodCacheName=null;
     private static Object  methodCacheValue=null;
     private Object handle(RpcRequest request) throws Throwable 
     {
         String className = request.getClassName();

         Object classimpl = handlerMap.get(className);//          

         Class> clazz = classimpl.getClass();

         String methodName = request.getMethodName();

         Class>[] parameterTypes = request.getParameterTypes();

         Object[] parameters = request.getParameters();

//       Method method = ReflectionCache.getMethod(clazz.getName(),methodName, parameterTypes);
//       method.setAccessible(true);

         //System.out.println(className+":"+methodName+":"+parameters.length);
         if(methodCacheName!=null&&methodCacheName.equals(request))
         {
             return methodCacheValue;
         }
         else
         {
             try 
             {
                 methodCacheName=request;
                 if(methodMap.containsKey(methodName))
                 {
                     methodCacheValue= methodMap.get(methodName).invoke(classimpl, parameters);
                     return methodCacheValue;
                 }
                 else
                 {
                     FastClass serviceFastClass = FastClass.create(clazz);
                     FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
                     methodMap.put(methodName, serviceFastMethod);
                     methodCacheValue= serviceFastMethod.invoke(classimpl, parameters);
                     return methodCacheValue;
                 }
                 //return method.invoke(classimpl, parameters);
             }
             catch (Throwable e) 
             {
                 throw e.getCause();
             }
         }
     }
      private Map methodMap=new HashMap();
      @Override
      public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
      }

      @Override
      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 
      {
          //ctx.close();
          //cause.printStackTrace();
          ctx.close();
      }
    }

handel関数はJavaの反射メカニズムを通じて、呼び出すインタフェースクラスを見つけて対応する関数を呼び出して実行し、結果をクライアントに返し、今回のRPC呼び出しは終了します.
RPCの主要な実装クラスは私のgithubで見ることができて、私のこのRPCのフレームワークは完璧ではありませんが、性能はやはり良いサーバーの上でテストする時TPCは9 w+があります.主な最適化は、Neety 4というフレームワークとパケットの処理、データのシーケンス化と逆シーケンス化の速度githubアドレスを使用することです.https://github.com/zhujunxxxxx/
オリジナル宣言
作者の小竹zz本文の住所http://blog.csdn.net/zhujunxxxxx/article/details/48742529、転載する場合は出典を明記してください