01 NIOネットワーク通信フレームNetty serverとclientのapi

9827 ワード

storm netty, storm topology netty , netty 。

netty service client,helloworld, 。netty 。

01:EchoServer service ,

02:EchoClient client , service 。

03:EchoClientHandler client

04:EchoServerHandler server

//**********************************     *********************************************************************************
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * netty   
 * 
 * @author mosi
 * 
 */
public class EchoServer {
	private final int port;

	public EchoServer(int port) {
		this.port = port;
	}

	/**
	 * •   ServerBootstrap              
* • NioEventLoopGroup , 、 、
* • InetSocketAddress,
* • childHandler
* • , ServerBootstrap.bind()
* * @throws Exception */ public void start() throws Exception { // NioEventLoopGroup EventLoopGroup group = new NioEventLoopGroup(); try { /** * ServerBootstrap
* • group(...), EventLoopGroup
* • channel(...),
* • channelFactory(...), ChannelFactory
* • localAddress(...), , bind(...) connect(...)
* • option(ChannelOption, * T), , null, ChannelOption
* • childOption(ChannelOption, T),
* • attr(AttributeKey, T), Channel, null,
* • childAttr(AttributeKey, T),
* • handler(ChannelHandler), ChannelHandler
* • childHandler(ChannelHandler), ChannelHandler
* • clone(), ServerBootstrap,
* • bind(...), Channel
*/ ServerBootstrap b = new ServerBootstrap(); // Specifies NIO transport, local socket address // Adds handler to channel pipeline b = b.group(group);// NioEventLoopGroup b = b.channel(NioServerSocketChannel.class);// NioServerSocketChannel b = b.localAddress(port);// InetSocketAddress 。 // , childHandler ChannelHandler b.childHandler(new ChannelInitializer() {// ChannelInitializer // ChannelHandler @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new EchoServerHandler()); } }); // Binds server, waits for server to close, and releases resources // , sync() ChannelFuture f = b.bind().sync(); System.out.println(EchoServer.class.getName() + "started and listen on “" + f.channel().localAddress()); // , sync(), f.channel().closeFuture().sync(); } finally { // EventLoopGroup , 。 group.shutdownGracefully().sync();// sync(), } } public static void main(String[] args) throws Exception { new EchoServer(65535).start(); } }
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;


import java.net.InetSocketAddress;


/**
 *                :
 * • Bootstrap
 * • EventLoopGroup Bootstrap ,EventLoopGroup , 、  * 、
 * • InetSocketAddress Bootstrap ,InetSocketAddress
 * • ChannelHandler,
 * • Bootstrap.connect()
 * • EventLoopGroup
 *   * @author mosi  *   */ public class EchoClient { private final String host; private final int port; public EchoClient(String host, int port) { this.host = host; this.port = port; } /** * Bootstrap new , Bootstrap :
* • group(...), EventLoopGroup,EventLoopGroup IO
* • channel(...),
* • channelFactory(...), ChannelFactory
* • localAddress(...), , bind(...) connect(...)
* • option(ChannelOption, T), , null, ChannelOption
* • attr(AttributeKey, T), Channel, null,
* • handler(ChannelHandler), ChannelHandler
* • clone(), Bootstrap,Bootstrap
* • remoteAddress(...),
* • connect(...),
* • bind(...), Channel
* @throws Exception */ public void start() throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress(host, port)) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoClientHandler()); } }); ChannelFuture f = b.connect().sync(); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } } public static void main(String[] args) throws Exception { new EchoClient("127.0.0.1", 65535).start(); } }
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;


/**
 *                 ,                。       ChannelHandler  ,
 *            SimpleChannelInboundHandler ChannelHandler      ;
 * :
 * • channelActive():
 * • channelRead0():
 * • exceptionCaught():
 *   * @author mosi  *   */ public class EchoClientHandler extends SimpleChannelInboundHandler { /** * SimpleChannelInboundHandler ChannelInboundHandlerAdapter * ?
* ChannelInboundHandlerAdapter * 。 ByteBuf.release() 。 * SimpleChannelInboundHandler channelRead0
* , Netty ChannelHandler ReferenceCounted 。
* SimpleChannelInboundHandler ?
* , ,
* , ,Netty 。 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("active"); ctx.write(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8)); } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("Client received: " + ByteBufUtil.hexDump(msg.readBytes(msg.readableBytes()))); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;


/**
 *      ,       。Netty    Channel
 * Handler           ,         、  、        handler
 * 。Handler   ,            ,          channelRead         。
 *
 * @author mosi
 * 
 */
public class EchoServerHandler extends ChannelInboundHandlerAdapter {


	/**
	 *              
	 */
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg)
			throws Exception {
		System.out.println("Server received: " + msg);
		ctx.write(msg);
	}


	/**
	 *        。       
	 */
	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(
				ChannelFutureListener.CLOSE);
	}


	/**
	 *       。    .        ,               ,      "           ",
	 *     exceptionCaught         ,         ChannelHandlerContext。
	 */
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
			throws Exception {
		cause.printStackTrace();
		ctx.close();
	}


}