Nettyの群発実装2

17740 ワード

次のコードでは、クライアントをシミュレートするためにtalentが必要です.採用するのはnetty 4.1.16 JDK1.8
私が使っているXshellのように実現できます
具体的なコードは以下のように実現される.
package qunfa;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyServer {

    public void bing(int port) throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workGroup);
            b.channel(NioServerSocketChannel.class);
            b.option(ChannelOption.SO_BACKLOG, 1024);
            b.childHandler(new ChildChannelHandler());

            //     
            ChannelFuture f = b.bind(port).sync();

            //            
            f.channel().closeFuture().sync();

        } finally {
            //      
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
    public static void main(String[] args) {
        try {
            System.out.println("            ");
            new NettyServer().bing(2333);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
 
  
package qunfa;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class ChildChannelHandler extends ChannelInitializer {

    @Override
    protected void initChannel(SocketChannel e) throws Exception {

        System.out.println("  ");
        System.out.println("  :            ");
        System.out.println("IP:" + e.localAddress().getHostName());
        System.out.println("Port:" + e.localAddress().getPort());
        System.out.println("    ");

        //    
        //  "
" "\r
" ,
e.pipeline().addLast(new LineBasedFrameDecoder(1024)); // 【 , LineBasedFrameDecoder, /r/n /n 】 //e.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, false, Delimiters.lineDelimiter())); // //e.pipeline().addLast(new FixedLengthFrameDecoder(4)); e.pipeline().addLast(new StringDecoder()); e.pipeline().addLast(new StringEncoder()); e.pipeline().addLast(new MyServerHanlder()); } }
 
  
package qunfa;

import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

/**
 *
 *    ChannelGroup        ,         
 *   ChannelGroup static 
 *   :          
 *
 */
public class MyChannelHandlerPool {

    public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

}
 
  
package qunfa;

import java.util.Date;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

//public class MyServerHanlder extends ChannelHandlerAdapter{
public class MyServerHanlder extends ChannelInboundHandlerAdapter{

    /*
    * channelAction
    *
    * channel   
    * action    
    *
    *                ,          。                         
    *
    */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        System.out.println(ctx.channel().localAddress().toString() + " channelActive");

        //   channelGroup    
        MyChannelHandlerPool.channelGroup.add(ctx.channel());

        //               
        String str = "           " + " " + ctx.channel().id() + new Date() + " " + ctx.channel().localAddress();
        ctx.writeAndFlush(str);
    }

    /*
    * channelInactive
    *
    * channel   
    * Inactive     
    *
    *                ,          。                            
    *
    */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

        //  channelGroup   ,        ,  channel。
        MyChannelHandlerPool.channelGroup.remove(ctx.channel());

        System.out.println(ctx.channel().localAddress().toString() + " channelInactive");
    }

    /*
    * channelRead
    *
    * channel   
    * Read  
    *
    *               ,                
    *                ByteBuf            
    *
    */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        //              ,         
        System.out.println(ctx.channel().id() + "" + new Date() + " " + msg);

        //           [              ]
        String str = "     :" + ctx.channel().id() + new Date() + " " + msg + "\r
"; // , MyChannelHandlerPool.channelGroup.writeAndFlush(str); } /* * channelReadComplete * * channel * Read * Complete * * , * ctx.flush() * */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } /* * exceptionCaught * * exception * Caught * * , , , 、 * */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); System.out.println(" :\r
"+cause.getMessage()); } }