Nettyのドキドキ
11358 ワード
ハートビート定義ハートビートは、TCP長接続でクライアントとサーバの間で定期的に送信される特殊なパケットであり、TCP接続の有効性を確保するために、相手自身がオンラインであることを通知する.なぜハートビートが必要なのかは、ネットワークの信頼性が低いため、TCPが長い接続を維持している間に、ネットワークケーブルが抜かれたり、突然電源が落ちたりするなどの突発的な状況で、サーバとクライアントの接続が中断する可能性があります.これらのバーストの場合、サーバとクライアントとの間にちょうどインタラクションがなければ、短時間で相手がオフラインであることを発見することはできない.この問題を解決するためには,心拍数メカニズムを導入する必要がある.ハートビートメカニズムの動作原理は、サーバとクライアントの間に一定時間データのインタラクションがない場合、すなわちidle状態にある場合、クライアントまたはサーバは特殊なパケットを相手に送信し、受信者がこのデータメッセージを受信した後、すぐに特殊なデータメッセージを送信し、送信者に応答し、PING-PONGのインタラクションを完了する.当然,ある一端にハートビートメッセージが届くと,相手がまだオンラインであることがわかり,TCP接続の有効性が確保される.Nettyでは、心拍数メカニズムを実現する鍵はIdleStateHandlerであり、1つのchannelの読み取り/書き込みにタイマを設定することができ、channelが一定のイベント間隔内でデータインタラクションがない場合(すなわちidle状態)、指定されたイベントがトリガーされます.IdleStateHandlerの原理分析について:https://www.cnblogs.com/duan2/p/8919458.html例示的な例は、クライアントとサーバがTCP長接続を介して通信することを示し、TCPメッセージフォーマットは以下の通りである.クライアントは、ランダムな時間毎にサーバにメッセージを送信し、サーバがメッセージを受信すると、直ちに受信したメッセージをそのままクライアントに返信する. クライアントが指定した時間間隔で読み書き操作を行わない場合、クライアントは自動的にサーバにPING心拍を送信し、サーバがPING心拍メッセージを受信した場合、PONGメッセージに返信する必要がある.
CustomHeartbeatHandlerは、IdleStateHandlerによってトリガーされたIdleStateEventイベントをキャプチャし、userEventTriggeredで異なるタイプのイベントを受信して対応する処理を行い、心拍の送信と受信を担当します.
クライアントhandlerはサービス側の対応するデータを受信し、読み書きが空きpingパケットを送信し、tcp断線が再接続される.
クライアント起動クラス
サービス側handlerはクライアントメッセージを受信し、アイドル時にtcp接続を切断する
サービス側起動クラス
+--------+-----+---------------+
| Length |Type | Content |
| 17 | 1 |"HELLO, WORLD" |
+--------+-----+---------------+
CustomHeartbeatHandlerは、IdleStateHandlerによってトリガーされたIdleStateEventイベントをキャプチャし、userEventTriggeredで異なるタイプのイベントを受信して対応する処理を行い、心拍の送信と受信を担当します.
public abstract class CustomHeartbeatHandler extends SimpleChannelInboundHandler {
public static final byte PING_MSG = 1;
public static final byte PONG_MSG = 2;
public static final byte DATA_MSG = 3;
protected String name;
private int heartbeatCount = 0;
public CustomHeartbeatHandler(String name) {
this.name = name;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
if (byteBuf.getByte(4) == PING_MSG) {
sendPongMsg(ctx);
} else if (byteBuf.getByte(4) == PONG_MSG) {
System.out.println(name
+ " get pong msg from " + ctx.channel().remoteAddress());
} else {
handleData(ctx, byteBuf);
}
}
protected void sendPingMsg(ChannelHandlerContext context) {
ByteBuf buf = context.alloc().buffer(Constant.PACK_BASE_LENGTH);
buf.writeInt(Constant.PACK_BASE_LENGTH); // 4 + 1
buf.writeByte(PING_MSG);
context.writeAndFlush(buf);
heartbeatCount++;
System.out.println(name
+ " sent ping msg to " + context.channel().remoteAddress()
+ " count: " + heartbeatCount
+ " current time: " + DateUtil.dateFormat());
}
private void sendPongMsg(ChannelHandlerContext context) {
ByteBuf buf = context.alloc().buffer(Constant.PACK_BASE_LENGTH);
buf.writeInt(Constant.PACK_BASE_LENGTH); // 4 + 1
buf.writeByte(PONG_MSG);
context.channel().writeAndFlush(buf);
heartbeatCount++;
System.out.println(name
+ " sent pong msg to " + context.channel().remoteAddress()
+ " count: " + heartbeatCount
+ " current time: " + DateUtil.dateFormat());
}
protected abstract void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf);
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
switch (e.state()) {
case READER_IDLE:
handleReaderIdle(ctx);
break;
case WRITER_IDLE:
handleWriterIdle(ctx);
break;
case ALL_IDLE:
handleAllIdle(ctx);
break;
default:
break;
}
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.err.println("--- " + ctx.channel().remoteAddress() + " is active ---");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.err.println("--- " + ctx.channel().remoteAddress() + " is inactive ---");
}
protected void handleReaderIdle(ChannelHandlerContext ctx) {
System.err.println("--- READER_IDLE ---");
}
protected void handleWriterIdle(ChannelHandlerContext ctx) {
System.err.println("--- WRITER_IDLE ---");
}
protected void handleAllIdle(ChannelHandlerContext ctx) {
System.err.println("--- ALL_IDLE ---");
}
@Override
protected void messageReceived(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
}
}
クライアントhandlerはサービス側の対応するデータを受信し、読み書きが空きpingパケットを送信し、tcp断線が再接続される.
public class ClientHandler extends CustomHeartbeatHandler {
private NettyClient client;
public ClientHandler(NettyClient client) {
super("client");
this.client = client;
}
@Override
protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
byte[] data = new byte[byteBuf.readableBytes() - Constant.PACK_BASE_LENGTH];
byteBuf.skipBytes(Constant.PACK_BASE_LENGTH);
byteBuf.readBytes(data);
System.out.println(name
+ " receive message: " + new String(data));
}
@Override
protected void handleAllIdle(ChannelHandlerContext ctx) {
super.handleAllIdle(ctx);
sendPingMsg(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
client.doConnect(); // tcp
}
}
クライアント起動クラス
public class NettyClient {
private NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
private Channel channel;
private Bootstrap bootstrap;
public void start() {
try {
bootstrap = new Bootstrap();
bootstrap.group(workGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline p = socketChannel.pipeline();
p.addLast(new IdleStateHandler(0, 0, 5));
p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
p.addLast(new ClientHandler(NettyClient.this));
}
});
this.doConnect();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
protected void doConnect() {
if (channel != null && channel.isActive()) {
return;
}
ChannelFuture future = bootstrap.connect("127.0.0.1", 9527);
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture futureListener) throws Exception {
if (futureListener.isSuccess()) {
channel = futureListener.channel();
System.out.println("Connect to server successfully!");
} else {
System.out.println("Failed to connect to server, try connect after 10s");
futureListener.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
doConnect();
}
}, 10, TimeUnit.SECONDS);
}
}
});
}
public void sendData() throws Exception {
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < 10000; i++) {
if (channel != null && channel.isActive()) {
String content = "client req " + i;
ByteBuf buf = channel.alloc().buffer(Constant.PACK_BASE_LENGTH + content.getBytes().length);
buf.writeInt(Constant.PACK_BASE_LENGTH + content.getBytes().length);
buf.writeByte(CustomHeartbeatHandler.DATA_MSG);
buf.writeBytes(content.getBytes());
channel.writeAndFlush(buf);
System.out.println("client send message: " + content
+ " current time: " + DateUtil.dateFormat());
}
Thread.sleep(random.nextInt(20000));
}
}
public static void main(String[] args) throws Exception {
NettyClient client = new NettyClient();
client.start();
client.sendData();
}
}
サービス側handlerはクライアントメッセージを受信し、アイドル時にtcp接続を切断する
public class ServerHandler extends CustomHeartbeatHandler {
public ServerHandler() {
super("server");
}
@Override
protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf buf) {
byte[] data = new byte[buf.readableBytes() - Constant.PACK_BASE_LENGTH];
ByteBuf respBuf = Unpooled.copiedBuffer(buf);
buf.skipBytes(Constant.PACK_BASE_LENGTH);
buf.readBytes(data);
System.out.println(name
+ " receive message: " + new String(data)
+ " current time: " + DateUtil.dateFormat());
channelHandlerContext.write(respBuf);
}
@Override
protected void handleReaderIdle(ChannelHandlerContext ctx) {
super.handleReaderIdle(ctx);
System.err.println("--- client " + ctx.channel().remoteAddress().toString() + " reader timeout, close it ---");
ctx.close();
}
}
サービス側起動クラス
public class NettyServer {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline p = socketChannel.pipeline();
p.addLast(new IdleStateHandler(10, 0, 0));
p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
p.addLast(new ServerHandler());
}
});
Channel ch = bootstrap.bind(9527).sync().channel();
ch.closeFuture().sync();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}