Netty実戦-netty client接続プール設計

31775 ワード

概要
最近、netty clientでは、nettyのchannel接続プールがどのように設計されるべきかを問い合わせるネットユーザーが多い.これは少し複雑なテーマで、多くの技術点にかかわっていますが、ネット上で関連する比較的完全な参考文章を見つけるには、確かに容易ではありません.
この文書では、ソリューションの1つと、実行可能な完全なコードが付属しています.もしネットユーザーがもっと良い案があれば、本文に返事して、私たちは一緒に討論して、一緒に構想と視野を広げることができます.
本文を読む前にいくつかの基礎知識が必要です.
1、nettyのいくつかの基礎知識を知っていて、例えばByteBuf類の関連api;2、nettyの実行プロセスを知っている;3、私が前に書いたnetty実戦-カスタムデコーダが半パッケージメッセージを処理することを読んだことがある必要があります.本明細書のコードの一部はこの文章から来ているからです.
現在、マイクロサービスは非常に人気があり、多くの会社が利用しています.マイクロサービスフレームワークでは、thrift、grpcをデータシーケンス化フレームワークとして使用すると、通常、クライアントユーザにSDKが生成されて使用される.クライアントはこのSDKを使用すれば,サービス側のマイクロサービスインタフェースを容易に呼び出すことができる.本稿では,SDKを用いたnettyクライアントのnetty channel接続プールの設計案について論じる.netty http clientのchannel接続プールの設計については、httpに基づいて、別のテーマであり、別途文章を書いて議論する必要があります.
Netty channel接続プール設計
DB接続プールでは、あるスレッドがdb connectionを取得した後、データの読み取りや書き込みが完了しない場合、スレッドがタスクを完了するまで、このdb connectionはスレッドに独占されています.netty clientのchannel接続プール設計もこのような独占的な方法を使用すると、いくつかの問題があります.
1、nettyのchannelのwriteAndFlushメソッドは、呼び出しが完了すると戻り結果を待つ必要はありません.writeAndFlushが呼び出されると、すぐに戻ります.この場合,スレッドにchannelを独占させる必要は全くない.2、DB poolのような形で池から接続を取り、使い終わったら戻る.また,要求量が大きい場合は接続が不十分であり,他のスレッドも他のスレッドが接続を解放するのを待つしかない.
そのため、上記の方法でnetty channel接続プールを設計することはあまりお勧めできません.channel独占の代価が大きすぎます.チャンネル配列の形式を使用して、nettyのチャンネルを多重化することができます.スレッドがチャネルを必要とする場合は、配列からランダムにチャネルを選択し、チャネルが確立されていない場合は作成します.スレッドで選択したチャネルが確立されている場合は、このチャネルを多重化します.
ここに画像の説明を書きます
チャンネル配列の長さを4とする
private Channel[] channels = new Channel[4];

外部システムがclientを要求すると、clientはchannels配列からランダムにchannelを選択し、channelが確立されていない場合、channelを確立する論理をトリガーする.いくらリクエストがあっても、この4つのchannelを多重化します.10個のスレッドがあると仮定すると、一部のスレッドは同じchannelを使用してデータを送信し、受信する可能性があります.ランダムに1つのチャネルを選択するので、複数のスレッドが同じチャネルにヒットする確率は大きいです.以下の図
ここに画像の説明を書きます
10スレッドのうち、3スレッドがchannel 2を使用してデータを送信している可能性があります.これは別の問題を導入します.thread 1はchannel 2を介してメッセージmsg 1をサービス側に送信し、thread 2もchannel 2を介してメッセージmsg 2をサービス側に送信し、サービス側がデータを処理し、channel 2を介してクライアントにデータを返す場合、どのメッセージがどのスレッドであるかをどのように区別しますか?もし区別しなければ、もしthread 1が得た結果がthread 2が望んだ結果だったら、どうしますか?
では、thread 1とthread 2に自分の望む結果を得るにはどうすればいいのでしょうか.
以前、netty実戦-カスタムデコーダが半パケットメッセージを処理する文で述べたように、カスタムメッセージの場合、メッセージを一意に識別するためにメッセージにシーケンス番号を追加することが多い.thread 1がメッセージを送信すると、メッセージに一意のメッセージシーケンス番号を挿入し、thread 1のためにcallbackコールバックプログラムを確立し、サービス側がメッセージを返すと、メッセージのシーケンス番号に基づいて対応するcallbackプログラムから結果を取得する.これで上記の問題を解決できます.
メッセージ形式
ここに画像の説明を書きます
メッセージ、メッセージseqおよびcallback対応関係
ここに画像の説明を書きます
ここに画像の説明を書きます
OK、以下、上記の設計に基づいて符号化を行う.
コード#コード#
まずnettyクライアントを実現し,10スレッドの同時取得チャネルを設定し,本格的な同時取得のためにCountDownLatchを用いてスイッチを行い,同時にチャネル接続プールに4チャネルを設定する.
package nettyinaction.nettyclient.channelpool.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import nettyinaction.nettyclient.channelpool.ChannelUtils;
import nettyinaction.nettyclient.channelpool.IntegerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

public class SocketClient {
    public static void main(String[] args) throws InterruptedException {
        //         ,  ,           netty channel
        final CountDownLatch countDownLatchBegin = new CountDownLatch(1);

        //            ,     ,          
        final CountDownLatch countDownLatchEnd = new CountDownLatch(10);

        //netty channel 
        final NettyChannelPool nettyChannelPool = new NettyChannelPool();

        final Map resultsMap = new HashMap<>();
        //  10   ,      netty channel
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        //    block 
                        countDownLatchBegin.await();

                        Channel channel = null;
                        try {
                            channel = nettyChannelPool.syncGetChannel();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }

                        //         callback,        , callback     
                        CallbackService callbackService = new CallbackService();
                        //               
                        int seq = IntegerFactory.getInstance().incrementAndGet();
                        //  Channel attr  ,     callback     
                        ChannelUtils.putCallback2DataMap(channel,seq,callbackService);

                        synchronized (callbackService) {
                            UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);
                            ByteBuf buffer = allocator.buffer(20);
                            buffer.writeInt(ChannelUtils.MESSAGE_LENGTH);

                            buffer.writeInt(seq);
                            String threadName = Thread.currentThread().getName();
                            buffer.writeBytes(threadName.getBytes());
                            buffer.writeBytes("body".getBytes());

                            // netty        ,   ,        
                            channel.writeAndFlush(buffer);

                            //      
                            callbackService.wait();

                            //    ,  result callback       。
                            ByteBuf result = callbackService.result;
                            int length = result.readInt();
                            int seqFromServer = result.readInt();

                            byte[] head = new byte[8];
                            result.readBytes(head);
                            String headString = new String(head);

                            byte[] body = new byte[4];
                            result.readBytes(body);
                            String bodyString = new String(body);
                            resultsMap.put(threadName, seqFromServer + headString + bodyString);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    finally {
                        countDownLatchEnd.countDown();
                    }
                }
            }).start();
        }

        //  , 10       netty channel
        countDownLatchBegin.countDown();

        // 10       ,      
        countDownLatchEnd.await();
        System.out.println("resultMap="+resultsMap);
    }

    public static class CallbackService{
        public volatile ByteBuf result;
        public void receiveMessage(ByteBuf receiveBuf) throws Exception {
            synchronized (this) {
                result = receiveBuf;
                this.notify();
            }
        }
    }
}

IntegerFactoryクラスがメッセージを生成するために使用する一意のシリアル番号
package nettyinaction.nettyclient.channelpool;


import java.util.concurrent.atomic.AtomicInteger;

public class IntegerFactory {
    private static class SingletonHolder {
        private static final AtomicInteger INSTANCE = new AtomicInteger();
    }

    private IntegerFactory(){}

    public static final AtomicInteger getInstance() {
        return SingletonHolder.INSTANCE;
    }
}

一方、ChannelUtilsクラスは、channel、メッセージシーケンス番号、callbackプログラムの対応関係を確立するために使用されます.
package nettyinaction.nettyclient.channelpool;

import io.netty.channel.Channel;
import io.netty.util.AttributeKey;

import java.util.Map;

public class ChannelUtils {
    public static final int MESSAGE_LENGTH = 16;
    public static final AttributeKey> DATA_MAP_ATTRIBUTEKEY = AttributeKey.valueOf("dataMap");
    public static  void putCallback2DataMap(Channel channel, int seq, T callback) {
        channel.attr(DATA_MAP_ATTRIBUTEKEY).get().put(seq, callback);
    }

    public static  T removeCallback(Channel channel, int seq) {
        return (T) channel.attr(DATA_MAP_ATTRIBUTEKEY).get().remove(seq);
    }
}

NettyChannelPoolはnettyのchannelの作成を担当します.
package nettyinaction.nettyclient.channelpool.client;


import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.Attribute;
import nettyinaction.nettyclient.channelpool.ChannelUtils;
import nettyinaction.nettyclient.channelpool.SelfDefineEncodeHandler;

import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;

public class NettyChannelPool {
    private Channel[] channels;
    private Object [] locks;
    private static final int MAX_CHANNEL_COUNT = 4;

    public NettyChannelPool() {
        this.channels = new Channel[MAX_CHANNEL_COUNT];
        this.locks = new Object[MAX_CHANNEL_COUNT];
        for (int i = 0; i < MAX_CHANNEL_COUNT; i++) {
            this.locks[i] = new Object();
        }
    }

    /**
     *     netty channel
     */
    public Channel syncGetChannel() throws InterruptedException {
        //       ,         channel
        int index = new Random().nextInt(MAX_CHANNEL_COUNT);
        Channel channel = channels[index];
        //      ,    
        if (channel != null && channel.isActive()) {
            return channel;
        }

        synchronized (locks[index]) {
            channel = channels[index];
            //         ,      ,                  。
            if (channel != null && channel.isActive()) {
                return channel;
            }

            //        ,  channel
            channel = connectToServer();

            channels[index] = channel;
        }

        return channel;
    }

    private Channel connectToServer() throws InterruptedException {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup)
                 .channel(NioSocketChannel.class)
                 .option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
                 .option(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                 .handler(new LoggingHandler(LogLevel.INFO))
                 .handler(new ChannelInitializer() {
                     @Override
                     protected void initChannel(SocketChannel ch) throws Exception {
                         ChannelPipeline pipeline = ch.pipeline();
                         pipeline.addLast(new SelfDefineEncodeHandler());
                         pipeline.addLast(new SocketClientHandler());
                     }
                 });

        ChannelFuture channelFuture = bootstrap.connect("localhost", 8899);
        Channel channel = channelFuture.sync().channel();

        //      channel,   channel  
        Attribute> attribute = channel.attr(ChannelUtils.DATA_MAP_ATTRIBUTEKEY);
        ConcurrentHashMap dataMap = new ConcurrentHashMap<>();
        attribute.set(dataMap);
        return channel;
    }
}

まず構造法を用いてchannels配列を初期化し,長さは4であった.NettyChannelPoolクラスには2つの重要な点があります.1つ目はチャンネルを取得するときに鍵をかけなければなりません.もう1つは,チャネルを取得した後,チャネルの属性を用いてMapを作成し,このMapを用いてメッセージシーケンス番号とcallbackプログラムとの対応関係を確立する必要がある.
//   channel  
        Attribute> attribute = channel.attr(ChannelUtils.DATA_MAP_ATTRIBUTEKEY);
        ConcurrentHashMap dataMap = new ConcurrentHashMap<>();
        attribute.set(dataMap);

このmapは私たちが見たここに画像の説明を書きます.
Mapのputの動作は、SocketClientクラスの
ChannelUtils.putCallback2DataMap(channel,seq,callbackService);

を選択してもアクセスできます.クライアントがメッセージを処理するには、半パケットの問題を処理する2つのhanlder支援が必要です.1つは、サービス側から返されたメッセージを受信することです.
SelfDefineEncodeHandlerクラスは、半パッケージメッセージを処理するために使用されます.
package nettyinaction.nettyclient.channelpool;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

public class SelfDefineEncodeHandler extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf bufferIn, List out) throws Exception {
        if (bufferIn.readableBytes() < 4) {
            return;
        }

        int beginIndex = bufferIn.readerIndex();
        int length = bufferIn.readInt();

        if (bufferIn.readableBytes() < length) {
            bufferIn.readerIndex(beginIndex);
            return;
        }

        bufferIn.readerIndex(beginIndex + 4 + length);

        ByteBuf otherByteBufRef = bufferIn.slice(beginIndex, 4 + length);

        otherByteBufRef.retain();

        out.add(otherByteBufRef);
    }
}

SocketClientHandlerクラスは、サービス側から返されたメッセージを受信し、メッセージシーケンス番号に基づいて対応するcallbackプログラムを取得するために使用される.
package nettyinaction.nettyclient.channelpool.client;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import nettyinaction.nettyclient.channelpool.ChannelUtils;

public class SocketClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel channel = ctx.channel();

        ByteBuf responseBuf = (ByteBuf)msg;
        responseBuf.markReaderIndex();

        int length = responseBuf.readInt();
        int seq = responseBuf.readInt();

        responseBuf.resetReaderIndex();

        //       callback
        SocketClient.CallbackService callbackService = ChannelUtils.removeCallback(channel, seq);
        callbackService.receiveMessage(responseBuf);
    }
}

このクライアントプログラムの作成が完了しました.サービス側のコードについては、簡単ですが、ここに直接コードを貼ります.
package nettyinaction.nettyclient.channelpool.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import nettyinaction.nettyclient.channelpool.SelfDefineEncodeHandler;

public class SocketServer {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup parentGroup = new NioEventLoopGroup();
        EventLoopGroup childGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(parentGroup, childGroup)
                           .channel(NioServerSocketChannel.class)
                           .handler(new LoggingHandler(LogLevel.INFO))
                           .childHandler(new ChannelInitializer() {
                                @Override
                                protected void initChannel(SocketChannel ch) throws Exception {
                                    ChannelPipeline pipeline = ch.pipeline();
                                    pipeline.addLast(new SelfDefineEncodeHandler());
                                    pipeline.addLast(new BusinessServerHandler());
                                }
                           });

            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            channelFuture.channel().closeFuture().sync();
        }
        finally {
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }
}

package nettyinaction.nettyclient.channelpool.server;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import nettyinaction.nettyclient.channelpool.ChannelUtils;

public class BusinessServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel channel = ctx.channel();
        ByteBuf buf = (ByteBuf)msg;
        //1、      
        int length = buf.readInt();

        //2、       
        int seq = buf.readInt();

        //3、      
        byte[] head = new byte[8];
        buf.readBytes(head);
        String headString = new String(head);

        //4、     
        byte[] body = new byte[4];
        buf.readBytes(body);
        String bodyString = new String(body);

        //5、        ,    ,      
        UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);
        ByteBuf responseBuf = allocator.buffer(20);
        responseBuf.writeInt(ChannelUtils.MESSAGE_LENGTH);
        responseBuf.writeInt(seq);
        responseBuf.writeBytes(headString.getBytes());
        responseBuf.writeBytes(bodyString.getBytes());

        //6、         
        channel.writeAndFlush(responseBuf);
    }
}

サービス側コードとクライアントコードを実行し、所望の結果は
10個のスレッドがメッセージを送信すると、サービス側から正しい対応の返信情報を取得することができ、これらの情報は乱れず、各スレッドが自分の望む結果を取得することができ、誤読が発生しない.
運転後の結果は以下の通りです.
Thread-3=9
Thread-3body,
Thread-4=8
Thread-4body,
Thread-5=5Thread-5body,
Thread-6=1Thread-6body,
Thread-7=3Thread-7body,
Thread-8=10Thread-8body,
Thread-9=4Thread-9body,
Thread-0=7Thread-0body,
Thread-1=6Thread-1body,
Thread-2=2Thread-2body
結果を観察すると,10個のスレッドが同時にchannelを取得した後,一部のスレッドは1個のchannelを共有するが,10個のスレッドは依然として結果を正しく取得できることが分かった.
コード詳細解析
1、サービス側の返却待ち
なぜならwriteAndFlushは非同期であり、スレッドがサービス側から結果を返すのを待つメカニズムが必要である.ここでは最も原始的なwaitとnotify法を採用する.writeAndFlushが呼び出すと、すぐに現在のスレッドwaitを止めてcallbackserviceオブジェクトの待機リストに配置し、サーバ側からメッセージが返されるとクライアントのSocketClientHandlerクラスのchannelReadメソッドが実行され、データを解析した後、channelのattrプロパティからDATA_を取得するMAP_ATTRIBUTEKEYというkey対応のmap.解析したseqに基づいてmapからあらかじめ配置されたcallbackserviceオブジェクトを取得し,そのreceiveMessageメソッドを実行する.結果を格納するreceiveBufというキャッシュ領域オブジェクトをcallbackserviceのresultプロパティに割り当てます.callbackserviceオブジェクトのnotifyメソッドを呼び出し、callbackserviceオブジェクトのwaitスレッドを呼び出し、下に実行させます.
2、メッセージシリアル番号の生成
                        int seq = IntegerFactory.getInstance().incrementAndGet();

プレゼンテーションの便宜上、ここでは単一サーバのグローバル一意のシーケンス番号を生成します.リクエスト量が多ければAtomicIntegerがCAS操作であっても多くの競合が生じる.チャネルレベルの一意のシリアル番号を生成し、競合を低減することを推奨します.1つのチャネル内のメッセージのシーケンス番号が重複しないことを保証すればよい.
他のコードの詳細については、読者は自分でもっと詳しく見ることができます.
テキストリンク
Netty実戦-netty client接続プール設計