Nettyのドキドキ


ハートビート定義ハートビートは、TCP長接続でクライアントとサーバの間で定期的に送信される特殊なパケットであり、TCP接続の有効性を確保するために、相手自身がオンラインであることを通知する.なぜハートビートが必要なのかは、ネットワークの信頼性が低いため、TCPが長い接続を維持している間に、ネットワークケーブルが抜かれたり、突然電源が落ちたりするなどの突発的な状況で、サーバとクライアントの接続が中断する可能性があります.これらのバーストの場合、サーバとクライアントとの間にちょうどインタラクションがなければ、短時間で相手がオフラインであることを発見することはできない.この問題を解決するためには,心拍数メカニズムを導入する必要がある.ハートビートメカニズムの動作原理は、サーバとクライアントの間に一定時間データのインタラクションがない場合、すなわちidle状態にある場合、クライアントまたはサーバは特殊なパケットを相手に送信し、受信者がこのデータメッセージを受信した後、すぐに特殊なデータメッセージを送信し、送信者に応答し、PING-PONGのインタラクションを完了する.当然,ある一端にハートビートメッセージが届くと,相手がまだオンラインであることがわかり,TCP接続の有効性が確保される.Nettyでは、心拍数メカニズムを実現する鍵はIdleStateHandlerであり、1つのchannelの読み取り/書き込みにタイマを設定することができ、channelが一定のイベント間隔内でデータインタラクションがない場合(すなわちidle状態)、指定されたイベントがトリガーされます.IdleStateHandlerの原理分析について:https://www.cnblogs.com/duan2/p/8919458.html例示的な例は、クライアントとサーバがTCP長接続を介して通信することを示し、TCPメッセージフォーマットは以下の通りである.
+--------+-----+---------------+ 
| Length |Type |   Content     |
|   17   |  1  |"HELLO, WORLD" |
+--------+-----+---------------+
  • クライアントは、ランダムな時間毎にサーバにメッセージを送信し、サーバがメッセージを受信すると、直ちに受信したメッセージをそのままクライアントに返信する.
  • クライアントが指定した時間間隔で読み書き操作を行わない場合、クライアントは自動的にサーバにPING心拍を送信し、サーバがPING心拍メッセージを受信した場合、PONGメッセージに返信する必要がある.

  • CustomHeartbeatHandlerは、IdleStateHandlerによってトリガーされたIdleStateEventイベントをキャプチャし、userEventTriggeredで異なるタイプのイベントを受信して対応する処理を行い、心拍の送信と受信を担当します.
    public abstract class CustomHeartbeatHandler extends SimpleChannelInboundHandler {
    
        public static final byte PING_MSG = 1;
        public static final byte PONG_MSG = 2;
        public static final byte DATA_MSG = 3;
        protected String name;
        private int heartbeatCount = 0;
    
        public CustomHeartbeatHandler(String name) {
            this.name = name;
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf byteBuf = (ByteBuf) msg;
            if (byteBuf.getByte(4) == PING_MSG) {
                sendPongMsg(ctx);
            } else if (byteBuf.getByte(4) == PONG_MSG) {
    
                System.out.println(name
                        + " get pong msg from " + ctx.channel().remoteAddress());
            } else {
                handleData(ctx, byteBuf);
            }
        }
    
        protected void sendPingMsg(ChannelHandlerContext context) {
            ByteBuf buf = context.alloc().buffer(Constant.PACK_BASE_LENGTH);
            buf.writeInt(Constant.PACK_BASE_LENGTH); //     4 +     1
            buf.writeByte(PING_MSG);
            context.writeAndFlush(buf);
            heartbeatCount++;
    
            System.out.println(name
                    + " sent ping msg to " + context.channel().remoteAddress()
                    + " count: " + heartbeatCount
                    + " current time: " + DateUtil.dateFormat());
        }
    
        private void sendPongMsg(ChannelHandlerContext context) {
            ByteBuf buf = context.alloc().buffer(Constant.PACK_BASE_LENGTH);
            buf.writeInt(Constant.PACK_BASE_LENGTH); //     4 +     1
            buf.writeByte(PONG_MSG);
            context.channel().writeAndFlush(buf);
            heartbeatCount++;
            System.out.println(name
                    + " sent pong msg to " + context.channel().remoteAddress()
                    + " count: " + heartbeatCount
                    + " current time: " + DateUtil.dateFormat());
        }
    
        protected abstract void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf);
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent e = (IdleStateEvent) evt;
                switch (e.state()) {
                    case READER_IDLE:
                        handleReaderIdle(ctx);
                        break;
                    case WRITER_IDLE:
                        handleWriterIdle(ctx);
                        break;
                    case ALL_IDLE:
                        handleAllIdle(ctx);
                        break;
                    default:
                        break;
                }
            }
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.err.println("--- " + ctx.channel().remoteAddress() + " is active ---");
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.err.println("--- " + ctx.channel().remoteAddress() + " is inactive ---");
        }
    
        protected void handleReaderIdle(ChannelHandlerContext ctx) {
            System.err.println("--- READER_IDLE ---");
        }
    
        protected void handleWriterIdle(ChannelHandlerContext ctx) {
            System.err.println("--- WRITER_IDLE ---");
        }
    
        protected void handleAllIdle(ChannelHandlerContext ctx) {
            System.err.println("--- ALL_IDLE ---");
        }
    
        @Override
        protected void messageReceived(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
    
        }
    }

    クライアントhandlerはサービス側の対応するデータを受信し、読み書きが空きpingパケットを送信し、tcp断線が再接続される.
    public class ClientHandler extends CustomHeartbeatHandler {
    
        private NettyClient client;
    
        public ClientHandler(NettyClient client) {
            super("client");
            this.client = client;
        }
    
        @Override
        protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
            byte[] data = new byte[byteBuf.readableBytes() - Constant.PACK_BASE_LENGTH];
            byteBuf.skipBytes(Constant.PACK_BASE_LENGTH);
            byteBuf.readBytes(data);
    
            System.out.println(name
                    + " receive message: " + new String(data));
        }
    
        @Override
        protected void handleAllIdle(ChannelHandlerContext ctx) {
            super.handleAllIdle(ctx);
            sendPingMsg(ctx);
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            super.channelInactive(ctx);
            client.doConnect(); //   tcp     
        }
    }

    クライアント起動クラス
    public class NettyClient {
    
        private NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
        private Channel channel;
        private Bootstrap bootstrap;
    
        public void start() {
            try {
                bootstrap = new Bootstrap();
                bootstrap.group(workGroup)
                        .channel(NioSocketChannel.class)
                        .handler(new ChannelInitializer() {
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline p = socketChannel.pipeline();
                                p.addLast(new IdleStateHandler(0, 0, 5));
                                p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
                                p.addLast(new ClientHandler(NettyClient.this));
                            }
                        });
                this.doConnect();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    
        protected void doConnect() {
            if (channel != null && channel.isActive()) {
                return;
            }
            ChannelFuture future = bootstrap.connect("127.0.0.1", 9527);
            future.addListener(new ChannelFutureListener() {
                public void operationComplete(ChannelFuture futureListener) throws Exception {
                    if (futureListener.isSuccess()) {
                        channel = futureListener.channel();
                        System.out.println("Connect to server successfully!");
                    } else {
                        System.out.println("Failed to connect to server, try connect after 10s");
                        futureListener.channel().eventLoop().schedule(new Runnable() {
                            @Override
                            public void run() {
                                doConnect();
                            }
                        }, 10, TimeUnit.SECONDS);
                    }
                }
            });
        }
    
        public void sendData() throws Exception {
            Random random = new Random(System.currentTimeMillis());
            for (int i = 0; i < 10000; i++) {
                if (channel != null && channel.isActive()) {
                    String content = "client req " + i;
                    ByteBuf buf = channel.alloc().buffer(Constant.PACK_BASE_LENGTH + content.getBytes().length);
                    buf.writeInt(Constant.PACK_BASE_LENGTH + content.getBytes().length);
                    buf.writeByte(CustomHeartbeatHandler.DATA_MSG);
                    buf.writeBytes(content.getBytes());
                    channel.writeAndFlush(buf);
                    System.out.println("client send message: " + content
                            + " current time: " + DateUtil.dateFormat());
                }
                Thread.sleep(random.nextInt(20000));
            }
        }
    
        public static void main(String[] args) throws Exception {
            NettyClient client = new NettyClient();
            client.start();
            client.sendData();
        }
    }

    サービス側handlerはクライアントメッセージを受信し、アイドル時にtcp接続を切断する
    public class ServerHandler extends CustomHeartbeatHandler {
    
        public ServerHandler() {
            super("server");
        }
    
        @Override
        protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf buf) {
            byte[] data = new byte[buf.readableBytes() - Constant.PACK_BASE_LENGTH];
            ByteBuf respBuf = Unpooled.copiedBuffer(buf);
            buf.skipBytes(Constant.PACK_BASE_LENGTH);
            buf.readBytes(data);
            System.out.println(name
                    + " receive message: " + new String(data)
                    + " current time: " + DateUtil.dateFormat());
            channelHandlerContext.write(respBuf);
        }
    
        @Override
        protected void handleReaderIdle(ChannelHandlerContext ctx) {
            super.handleReaderIdle(ctx);
            System.err.println("--- client " + ctx.channel().remoteAddress().toString() + " reader timeout, close it ---");
            ctx.close();
        }
    }

    サービス側起動クラス
    public class NettyServer {
        public static void main(String[] args) {
            NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
            NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap
                        .group(bossGroup, workGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer() {
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline p = socketChannel.pipeline();
                                p.addLast(new IdleStateHandler(10, 0, 0));
                                p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
                                p.addLast(new ServerHandler());
                            }
                        });
    
                Channel ch = bootstrap.bind(9527).sync().channel();
                ch.closeFuture().sync();
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                bossGroup.shutdownGracefully();
                workGroup.shutdownGracefully();
            }
        }
    }