RPCの転送編(SocketとNetty)を深く理解する


目次
RpcRequestとRpcResponse
Socket転送
Netty転送
同期と非同期のブロックと非ブロック
まとめ 
RPCは「リモート・プロシージャ・コール」と呼ばれ、メソッド・コールがネットワークを越え、プロセスを越えるため、伝送層が不可欠であることを示しています.ネットワーク転送といえば、TCP、UDP、HTTP、同期or非同期、ブロックor非ブロック、長接続or短接続...
この論文では、Socketの使用とNettyの使用の2つの伝送層の実装について説明します.前者はブロック式の通信を実現し、比較的簡単な伝送層実現方式であり、伝送層の動作原理と動作内容を理解することができる.後者は非ブロック式であり,一般的なRPCシーンでは性能が良好に表現されるため,多くのオープンソースRPCフレームワークが伝送層の実現方式として用いられる.
RpcRequestとRpcResponse
トランスポートレイヤトランスポートの主なオブジェクトは、要求id、メソッド名、メソッドパラメータ、戻り値、異常などのRPC呼び出しに必要な一連の情報をカプセル化した2つのクラスです.
public class RpcRequest implements Serializable {
    private String interfaceName;
    private String methodName;
    private String parametersDesc;
    private Object[] arguments;
    private Map attachments;
    private int retries = 0;
    private long requestId;
    private byte rpcProtocolVersion;
}
public class RpcResponse implements Serializable {
    private Object value;
    private Exception exception;
    private long requestId;
    private long processTime;
    private int timeout;
    private Map attachments;// rpc                  
    private byte rpcProtocolVersion;
}

Socket転送
Server
public class RpcServerSocketProvider {

    public static void main(String[] args) throws Exception {

        //             
        Serialization serialization = new Hessian2Serialization();

        ServerSocket serverSocket = new ServerSocket(8088);
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        while (true) {
            final Socket socket = serverSocket.accept();
            executorService.execute(() -> {
                try {
                    InputStream is = socket.getInputStream();
                    OutputStream os = socket.getOutputStream();
                    try {
                        DataInputStream dis = new DataInputStream(is);
                        int length = dis.readInt();
                        byte[] requestBody = new byte[length];
                        dis.read(requestBody);
                        //    requestBody => RpcRequest
                        RpcRequest rpcRequest = serialization.deserialize(requestBody, RpcRequest.class);
                        //              rpcResponse
                        RpcResponse rpcResponse = invoke(rpcRequest);
                        //   rpcResponse => responseBody
                        byte[] responseBody = serialization.serialize(rpcResponse);
                        DataOutputStream dos = new DataOutputStream(os);
                        dos.writeInt(responseBody.length);
                        dos.write(responseBody);
                        dos.flush();
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        is.close();
                        os.close();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    try {
                        socket.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }

    }

    public static RpcResponse invoke(RpcRequest rpcRequest) {
        //      
        RpcResponse rpcResponse = new RpcResponse();
        rpcResponse.setRequestId(rpcRequest.getRequestId());
        //... some operation
        return rpcResponse;
    }

}

Client
public class RpcSocketConsumer {

    public static void main(String[] args) throws Exception {

        //             
        Serialization serialization = new Hessian2Serialization();

        Socket socket = new Socket("localhost", 8088);
        InputStream is = socket.getInputStream();
        OutputStream os = socket.getOutputStream();
        //  rpc  
        RpcRequest rpcRequest = new RpcRequest();
        rpcRequest.setRequestId(12345L);
        //    rpcRequest => requestBody
        byte[] requestBody = serialization.serialize(rpcRequest);
        DataOutputStream dos = new DataOutputStream(os);
        dos.writeInt(requestBody.length);
        dos.write(requestBody);
        dos.flush();
        DataInputStream dis = new DataInputStream(is);
        int length = dis.readInt();
        byte[] responseBody = new byte[length];
        dis.read(responseBody);
        //     responseBody => rpcResponse
        RpcResponse rpcResponse = serialization.deserialize(responseBody, RpcResponse.class);
        is.close();
        os.close();
        socket.close();

        System.out.println(rpcResponse.getRequestId());
    }
}

dis.readInt()とdis.read(byte[]bytes)はSocket通信を使用することを決定したブロック式の操作であり、メッセージヘッダ+メッセージ体の伝送フォーマットは一般的なフォーマットであるほか、特殊な文字、例えば空行を使用してメッセージ構造を区別することもできる.例では、int(4バイト)を使用して、問題の長さを報告し、その後、新聞体を伝達します.複雑な通信プロトコルでは、新聞ヘッドは、プロトコル名、バージョン、心拍数識別など、新聞体に加えて、いくつかの情報を格納します.
ネットワーク転送ではバイトのみが認識できるため,RpcRequestとRpcResponseとバイトの相互変換を完了するSerializationインタフェースを先頭に導入した.(Serializationの仕組みは前の記事を参考にできます)
Socket通信を使用すると、サーバがClientリクエストを処理するたびにスレッドプールからスレッドを取り出してリクエストを処理するコストは、一般的なRpc呼び出しでは受け入れられず、Nettyのようなネットワークフレームワークが役立ちます.
Netty転送
ServerとServer Handler
public class RpcNettyProvider {

    public static void main(String[] args) throws Exception{

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //        Netty     Bootstrap   
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup);
            bootstrap.channel(NioServerSocketChannel.class);
            bootstrap.childHandler(new ChannelInitializer() {
                @Override
                public void initChannel(SocketChannel channel) throws Exception {
                    ChannelPipeline pipeline = channel.pipeline();
                    pipeline.addLast(new RpcDecoder(RpcRequest.class)); //    RPC   
                    pipeline.addLast(new RpcEncoder(RpcResponse.class)); //    RPC   
                    pipeline.addLast(new RpcServerHandler()); //    RPC   
                }
            });
            bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
            bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
            ChannelFuture future = bootstrap.bind("127.0.0.1", 8087).sync();
            //    RPC    
            future.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

}
public class RpcServerHandler extends SimpleChannelInboundHandler {

    @Override
    public void channelRead0(final ChannelHandlerContext ctx, RpcRequest request) throws Exception {
        RpcResponse rpcResponse = invoke(request);
        //    RPC            
        ctx.writeAndFlush(rpcResponse).addListener(ChannelFutureListener.CLOSE);
    }

    private RpcResponse invoke(RpcRequest rpcRequest) {
        //      
        RpcResponse rpcResponse = new RpcResponse();
        rpcResponse.setRequestId(rpcRequest.getRequestId());
        //... some operation
        return rpcResponse;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

ClientとClientHandler
public class RpcNettyConsumer {

    public static void main(String[] args) throws Exception{
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //        Netty     Bootstrap   
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.handler(new ChannelInitializer() {
                @Override
                public void initChannel(SocketChannel channel) throws Exception {
                    ChannelPipeline pipeline = channel.pipeline();
                    pipeline.addLast(new RpcEncoder(RpcRequest.class)); //    RPC   
                    pipeline.addLast(new RpcDecoder(RpcResponse.class)); //    RPC   
                    pipeline.addLast(new RpcClientHandler()); //    RPC   
                }
            });
            bootstrap.option(ChannelOption.TCP_NODELAY, true);
            //    RPC    
            ChannelFuture future = bootstrap.connect("127.0.0.1", 8087).sync();
            //    RPC          
            Channel channel = future.channel();

            RpcRequest rpcRequest = new RpcRequest();
            rpcRequest.setRequestId(123456L);

            channel.writeAndFlush(rpcRequest).sync();
            channel.closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

}
public class RpcClientHandler extends SimpleChannelInboundHandler {

    @Override
    public void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception {
        System.out.println(response.getRequestId());//    
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}

Nettyを使用する利点は、非ブロック呼び出しを容易に実現することであり、重要な部分にはコメントが表示されます.上記のコードは多く、よく知られているSocket通信コードとは大きく異なりますが、多くはNettyのテンプレートコード、サーバの起動、コーデックの構成などです.実際のRPCパッケージング操作の多くはHandlerのchannelReadメソッド(読み取り担当)とchannel.writeAndFlushメソッド(書き込み担当)に集中している.
public class RpcEncoder extends MessageToByteEncoder {

    private Class> genericClass;

    Serialization serialization = new Hessian2Serialization();

    public RpcEncoder(Class> genericClass) {
        this.genericClass = genericClass;
    }

    @Override
    public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
        if (genericClass.isInstance(in)) {
            byte[] data = serialization.serialize(in);
            out.writeInt(data.length);
            out.writeBytes(data);
        }
    }
}
public class RpcDecoder extends ByteToMessageDecoder {

    private Class> genericClass;

    public RpcDecoder(Class> genericClass) {
        this.genericClass = genericClass;
    }

    Serialization serialization = new Hessian2Serialization();

    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {
        if (in.readableBytes() < 4) {
            return;
        }
        in.markReaderIndex();
        int dataLength = in.readInt();
        if (in.readableBytes() < dataLength) {
            in.resetReaderIndex();
            return;
        }
        byte[] data = new byte[dataLength];
        in.readBytes(data);
        out.add(serialization.deserialize(data, genericClass));
    }
}

Nettyを使用すると返されるバイトサイズが保証されないため、in.readableBytes()<4という判断と、in.markReader Index()というマークを付けて、ヘッダとメッセージ体を区別する必要があります.
同期と非同期のブロックと非ブロック
この2組の伝送特性はよく比較され,多くの文章はSocketが同期的にブロックされていると主張しているが,Nettyは非同期非ブロックであり,実際には少し問題がある.
実際には、この2つのグループには必然的な関連はありません.同期ブロック、同期非ブロック、非同期非ブロックがあります(同期非ブロックは見たことがありませんが)、Nettyを使用して実装されるRPC呼び出しの多くは、同期非ブロックであるべきです(もちろん、一般的にはRPCも非同期非ブロックをサポートしています).
同期と非同期は、メッセージ通信メカニズムのいわゆる同期に注目し、呼び出しが発行されると、結果が得られない前に呼び出しが返されない.しかし、戻りを呼び出すと、戻り値が得られます.すなわち,呼び出し者がこの呼び出しの結果を自発的に待つ.
非同期は逆に,呼び出しが発行されると,この呼び出しは直接返されるので結果は返されない.すなわち、非同期プロシージャ呼び出しが発行されると、呼び出し者はすぐに結果を得ることはできない.呼び出しが発行された後、被呼び出し者はステータス、通知によって呼び出し者に通知するか、コールバック関数によってこの呼び出しを処理する.
RPC呼び出しが結果を返す必要がある場合、その結果はすぐに使用され、それは、大体のレートが同期呼び出しである必要があることを意味する.戻り値に関心がなければ、非同期インタフェースとして効率を向上させることができます.
ブロックと非ブロックは、プログラムが呼び出し結果(メッセージ、戻り値)を待つときの状態に注目する.
ブロック呼び出しとは、呼び出し結果が返される前に、現在のスレッドが保留されることです.呼び出しスレッドは、結果が得られた後にのみ返されます.非ブロック呼び出しとは、すぐに結果が得られないまで、現在のスレッドをブロックしないことを意味します.
上記の例では、Socket通信は、10個のスレッドを含むスレッドプールを宣言し、要求が来るたびにスレッドを割り当て、クライアントがメッセージヘッダとメッセージボディを伝達するのを待つ動作がスレッドをブロックし、全体がブロックされていることがわかります.一方、Netty通信の例では、要求ごとにスレッドが割り当てられず、Handler方式で要求(レノボNIOでSelector)が処理され、ブロックされていない.
同期非ブロック方式を用いた通信メカニズムは必ずしも同期ブロック式の通信が強いとは限らない.最適ではなく、より適切であるが、一般的な同期非ブロック通信は1.ネットワーク接続数が多い2.接続ごとのioが頻繁ではないシーンに適用され、RPC呼び出しと比較的合致する.成熟したRPCフレームワークの伝送層とプロトコル層は、通常、異なるシーンに対応するために多くの選択肢を提供する.
まとめ
本文はいくつかのコードを積み上げたが,難点は主にSocketの理解とNettyフレームワークの把握である.Nettyの学習には一定の敷居があるが、実際に身につけなければならない知識点はそれほど多くない(RPCフレームワークに関連する知識点だけについて)、Nettyの学習、個人的な推薦「Netty IN ACTION」および https://waylau.gitbooks.io/netty-4-user-guide/Getting%20Started/Before%20Getting%20Started.html このサイトの例です.
参考資料:
http://javatar.iteye.com/blog/1123915 – 梁飛
https://gitee.com/huangyong/rpc – 黄勇
転載先:https://www.cnkirito.moe/rpc-transport/