01 NIOネットワーク通信フレームNetty serverとclientのapi
9827 ワード
storm netty, storm topology netty , netty 。
netty service client,helloworld, 。netty 。
01:EchoServer service ,
02:EchoClient client , service 。
03:EchoClientHandler client
04:EchoServerHandler server
//********************************** *********************************************************************************
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* netty
*
* @author mosi
*
*/
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
/**
* • ServerBootstrap
* • NioEventLoopGroup , 、 、
* • InetSocketAddress,
* • childHandler
* • , ServerBootstrap.bind()
*
* @throws Exception
*/
public void start() throws Exception {
// NioEventLoopGroup
EventLoopGroup group = new NioEventLoopGroup();
try {
/**
* ServerBootstrap
* • group(...), EventLoopGroup
* • channel(...),
* • channelFactory(...), ChannelFactory
* • localAddress(...), , bind(...) connect(...)
* • option(ChannelOption,
* T), , null, ChannelOption
* • childOption(ChannelOption, T),
* • attr(AttributeKey, T), Channel, null,
* • childAttr(AttributeKey, T),
* • handler(ChannelHandler), ChannelHandler
* • childHandler(ChannelHandler), ChannelHandler
* • clone(), ServerBootstrap,
* • bind(...), Channel
*/
ServerBootstrap b = new ServerBootstrap();
// Specifies NIO transport, local socket address
// Adds handler to channel pipeline
b = b.group(group);// NioEventLoopGroup
b = b.channel(NioServerSocketChannel.class);// NioServerSocketChannel
b = b.localAddress(port);// InetSocketAddress 。
// , childHandler ChannelHandler
b.childHandler(new ChannelInitializer() {// ChannelInitializer
// ChannelHandler
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new EchoServerHandler());
}
});
// Binds server, waits for server to close, and releases resources
// , sync()
ChannelFuture f = b.bind().sync();
System.out.println(EchoServer.class.getName()
+ "started and listen on “" + f.channel().localAddress());
// , sync(),
f.channel().closeFuture().sync();
} finally {
// EventLoopGroup , 。
group.shutdownGracefully().sync();// sync(),
}
}
public static void main(String[] args) throws Exception {
new EchoServer(65535).start();
}
}
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
/**
* :
* • Bootstrap
* • EventLoopGroup Bootstrap ,EventLoopGroup , 、
* 、
* • InetSocketAddress Bootstrap ,InetSocketAddress
* • ChannelHandler,
* • Bootstrap.connect()
* • EventLoopGroup
*
* @author mosi
*
*/
public class EchoClient {
private final String host;
private final int port;
public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}
/**
* Bootstrap new , Bootstrap :
* • group(...), EventLoopGroup,EventLoopGroup IO
* • channel(...),
* • channelFactory(...), ChannelFactory
* • localAddress(...), , bind(...) connect(...)
* • option(ChannelOption, T), , null, ChannelOption
* • attr(AttributeKey, T), Channel, null,
* • handler(ChannelHandler), ChannelHandler
* • clone(), Bootstrap,Bootstrap
* • remoteAddress(...),
* • connect(...),
* • bind(...), Channel
*
* @throws Exception
*/
public void start() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port))
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture f = b.connect().sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws Exception {
new EchoClient("127.0.0.1", 65535).start();
}
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
/**
* , 。 ChannelHandler ,
* SimpleChannelInboundHandler ChannelHandler ;
* :
* • channelActive():
* • channelRead0():
* • exceptionCaught():
*
* @author mosi
*
*/
public class EchoClientHandler extends SimpleChannelInboundHandler {
/**
* SimpleChannelInboundHandler ChannelInboundHandlerAdapter
* ?
* ChannelInboundHandlerAdapter
* 。 ByteBuf.release() 。
* SimpleChannelInboundHandler channelRead0
* , Netty ChannelHandler ReferenceCounted 。
* SimpleChannelInboundHandler ?
* , ,
* , ,Netty 。
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("active");
ctx.write(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8));
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
throws Exception {
System.out.println("Client received: "
+ ByteBufUtil.hexDump(msg.readBytes(msg.readableBytes())));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
}
}
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* , 。Netty Channel
* Handler , 、 、 handler
* 。Handler , , channelRead 。
*
* @author mosi
*
*/
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
/**
*
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("Server received: " + msg);
ctx.write(msg);
}
/**
* 。
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(
ChannelFutureListener.CLOSE);
}
/**
* 。 . , , " ",
* exceptionCaught , ChannelHandlerContext。
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
}
}