Nettyインスタンスの数

183695 ワード

Nettyインスタンスの数
NettyはJDK NIOベースのネットワークフレームワーク
NIOプログラミングをシンプル化し、プログラムを使わずにselectorを自分でメンテナンスし、ネットワーク通信とデータ処理の部分を分離
下層のデータ通信、心拍検出(keepalived)に多く用いられる
 
1.データ通信
1.1 Hello World
public class Server {

    public static void main(String[] args) throws Exception {
        // 1           
        //                    
        //           (     )
        EventLoopGroup pGroup = new NioEventLoopGroup();
        EventLoopGroup cGroup = new NioEventLoopGroup();

        // 2        ServerBootstrap,             
        ServerBootstrap b = new ServerBootstrap();
        b.group(pGroup, cGroup) //        
                .channel(NioServerSocketChannel.class) //   NIO   .NioServerSocketChannel  TCP, NioDatagramChannel  UDP
                .option(ChannelOption.SO_BACKLOG, 1024) //   TCP   
                .option(ChannelOption.SO_SNDBUF, 32 * 1024) //         
                .option(ChannelOption.SO_RCVBUF, 32 * 1024) //         
                .option(ChannelOption.SO_KEEPALIVE, true) //     
                .childHandler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(SocketChannel sc) throws Exception {  //SocketChannel        
                        // 3                ,   addLast  ...
                        sc.pipeline().addLast(new ServerHandler());
                    }
                });

        // 4     , bind  future(  ),   sync        
        ChannelFuture cf1 = b.bind(8765).sync();
        //ChannelFuture cf2 = b.bind(8764).sync();   //        
        // 5     ,   sync        
        cf1.channel().closeFuture().sync();
        //cf2.channel().closeFuture().sync();
        pGroup.shutdownGracefully();
        cGroup.shutdownGracefully();
    }
}

SO_BACKLOG詳細:サーバのTCPカーネルメンテナンスの2つのキューAとBクライアントがサービス側にconnectを要求した場合、SYN(1回目の握手)を送信サービス側がSYNを受け取った後、クライアントにSYN ACK(2回目の握手)を送信し、  TCPカーネルは、キューAクライアントに接続を入れて受信し、サービス側にACKを送信する(3回目の握手)、  TCPカーネルはA->B、acceptから接続を戻し、接続完了A/Bキューの長さとBACKLOGであり、accept速度が追いつかない場合、A/BキューがBACKLOGをいっぱいにすると、クライアント接続がTCPカーネルに拒否されてbacklogを大きくして緩和できるという現象を経験値~100
public class ServerHandler extends ChannelHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("server channel active... ");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            String body = new String(req, "utf-8");
            System.out.println("Server :" + body );
            String response = "         :" + body ;
            ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
            // future        ,         (   ).          ,    server   .        ctx[.channel()].close()
            //.addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx)
            throws Exception {
        System.out.println("   ");
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable t)
            throws Exception {
        ctx.close();
    }
}
public class Client {

    public static void main(String[] args) throws Exception {
        
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(group)
        .channel(NioSocketChannel.class)
        .handler(new ChannelInitializer() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception { 
                sc.pipeline().addLast(new ClientHandler());
            }
        });
        
        ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync();
        //ChannelFuture cf2 = b.connect("127.0.0.1", 8764).sync();  //        
        //    , Buffer  . write  flush   ,   writeFlush  
        cf1.channel().writeAndFlush(Unpooled.copiedBuffer("777".getBytes()));
        cf1.channel().writeAndFlush(Unpooled.copiedBuffer("666".getBytes()));
        Thread.sleep(2000);
        cf1.channel().writeAndFlush(Unpooled.copiedBuffer("888".getBytes()));
        //cf2.channel().writeAndFlush(Unpooled.copiedBuffer("999".getBytes()));
        
        cf1.channel().closeFuture().sync();
        //cf2.channel().closeFuture().sync();
        group.shutdownGracefully();
    }
}
public class ClientHandler extends ChannelHandlerAdapter{

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            String body = new String(req, "utf-8");
            System.out.println("Client :" + body );
        } finally {
            //     xxxHandler      msg  :  (write)  , msg              ;       ,!         
             ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        ctx.close();
    }
}

 
1.2パッケージの取り外しの問題
TCP/IPはパケットの転送,パケットの順序などを確保しているが,プログラミングではパケットの取り外し粘着問題の解決が必要である.
->受信した一連のパケットのデータ、処理の区切りはどこですか?  基本的なソリューション:
1)終了区切り文字として特殊文字
2)メッセージの長さを決める.パケットの長さを固定して、長さはスペースで補完するのに足りない.受信者はtrimを必要として、効率が高くなくて推薦しない
3)カスタムプロトコル.メッセージヘッダにメッセージの全長のフィールドを含める.セキュリティが必要な場合に考慮する.
特殊文字
public class Server {

    public static void main(String[] args) throws Exception {
        EventLoopGroup pGroup = new NioEventLoopGroup();
        EventLoopGroup cGroup = new NioEventLoopGroup();
        
        ServerBootstrap b = new ServerBootstrap();
        b.group(pGroup, cGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 1024)
         .option(ChannelOption.SO_SNDBUF, 32*1024)
         .option(ChannelOption.SO_RCVBUF, 32*1024)
         .childHandler(new ChannelInitializer() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                //   DelimiterBasedFrameDecoder       $_
                ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
                sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
                //           .    StringDecoder, Handler        msg       String (   ByteBuffer).         ByteBuffer
                sc.pipeline().addLast(new StringDecoder());
                //          
                sc.pipeline().addLast(new ServerHandler());
            }
        });
        //4     
        ChannelFuture cf = b.bind(8765).sync();
        
        //           
        cf.channel().closeFuture().sync();
        pGroup.shutdownGracefully();
        cGroup.shutdownGracefully();
    }
}
public class ServerHandler extends ChannelHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(" server channel active... ");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("Server :" + msg);
        String response = "     : " + msg + "$_";
        ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception {
        ctx.close();
    }
}
public class Client {

    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        
        Bootstrap b = new Bootstrap();
        b.group(group)
         .channel(NioSocketChannel.class)
         .handler(new ChannelInitializer() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
                sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
                sc.pipeline().addLast(new StringDecoder()); 
                sc.pipeline().addLast(new ClientHandler());
            }
        });
        
        ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
        
        cf.channel().writeAndFlush(Unpooled.wrappedBuffer("bbbb$_".getBytes()));
        cf.channel().writeAndFlush(Unpooled.wrappedBuffer("cccc$_".getBytes()));
        
        cf.channel().closeFuture().sync();
        group.shutdownGracefully();
        
    }
}
public class ClientHandler extends ChannelHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client channel active... ");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            String response = (String) msg;
            System.out.println("Client: " + response);
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

 
ていちょう
public class Server {

    public static void main(String[] args) throws Exception{
        EventLoopGroup pGroup = new NioEventLoopGroup();
        EventLoopGroup cGroup = new NioEventLoopGroup();
        
        ServerBootstrap b = new ServerBootstrap();
        b.group(pGroup, cGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 1024)
         .option(ChannelOption.SO_SNDBUF, 32*1024)
         .option(ChannelOption.SO_RCVBUF, 32*1024)
         .childHandler(new ChannelInitializer() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                //         ,    5,    5           
                sc.pipeline().addLast(new FixedLengthFrameDecoder(5));
                //          
                sc.pipeline().addLast(new StringDecoder());
                sc.pipeline().addLast(new ServerHandler());
            }
        });
        
        ChannelFuture cf = b.bind(8765).sync();
        cf.channel().closeFuture().sync();
        pGroup.shutdownGracefully();
        cGroup.shutdownGracefully();
    }
}
public class ServerHandler extends ChannelHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(" server channel active... ");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String request = (String)msg;
        System.out.println("Server :" + msg);
        String response =  request ;
        ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception {
    }
}
public class Client {

    public static void main(String[] args) throws Exception {
        
        EventLoopGroup group = new NioEventLoopGroup();
        
        Bootstrap b = new Bootstrap();
        b.group(group)
         .channel(NioSocketChannel.class)
         .handler(new ChannelInitializer() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                sc.pipeline().addLast(new FixedLengthFrameDecoder(5)); 
                sc.pipeline().addLast(new StringDecoder());
                sc.pipeline().addLast(new ClientHandler());
            }
        });
        
        ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
        
        cf.channel().writeAndFlush(Unpooled.wrappedBuffer("aaa".getBytes()));
        cf.channel().writeAndFlush(Unpooled.copiedBuffer("bbccccc".getBytes()));
        
        cf.channel().closeFuture().sync();
        group.shutdownGracefully();
    }
}
public class ClientHandler extends ChannelHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client channel active... ");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String response = (String) msg;
        System.out.println("Client: " + response);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    }
}

 
1.3コーディング
すなわち、オブジェクトのネットワーク伝送とローカル持続化を実現するためにjavaのシーケンス化を用いると符号ストリームが大きいため、Marshalling,Kyro(Protobufベース)を用いることが多い
次の例では、符号化伝送javabean(Marshalling javabeanはserializableを実装する必要がある)を用いて、messageをgzip圧縮する
 
カスタムコーデック
public final class MarshallingCodeCFactory {
    //   
    public static MarshallingDecoder buildMarshallingDecoder() {
        //      ,   serial     java          
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        //      ,    5 
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        //               provider
        UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
        //       .       provider,                    ,        
        MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
        return decoder;
    }

    //   
    public static MarshallingEncoder buildMarshallingEncoder() {
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
        //       .      Serializable   JavaBean         
        MarshallingEncoder encoder = new MarshallingEncoder(provider);
        return encoder;
    }
}

javabean
public class Request implements Serializable {  //   Serializable  

    private String id ;
    private String name ;
    private String requestMessage ;
    private byte[] attachment;
    
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String getRequestMessage() {
        return requestMessage;
    }
    public void setRequestMessage(String requestMessage) {
        this.requestMessage = requestMessage;
    }
    public byte[] getAttachment() {
        return attachment;
    }
    public void setAttachment(byte[] attachment) {
        this.attachment = attachment;
    }
}
public class Response implements Serializable { //   Serializable  
    
    private String id;
    private String name;
    private String responseMessage;
    
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String getResponseMessage() {
        return responseMessage;
    }
    public void setResponseMessage(String responseMessage) {
        this.responseMessage = responseMessage;
    }
}

GZip圧縮Util
public class GzipUtils {

    public static byte[] gzip(byte[] data) throws Exception {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        GZIPOutputStream gzip = new GZIPOutputStream(bos);
        gzip.write(data);
        gzip.finish();
        gzip.close();
        byte[] ret = bos.toByteArray();
        bos.close();
        return ret;
    }
    
    public static byte[] ungzip(byte[] data) throws Exception{
        ByteArrayInputStream bis = new ByteArrayInputStream(data);
        GZIPInputStream gzip = new GZIPInputStream(bis);
        byte[] buf = new byte[1024];
        int num = -1;
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        while((num = gzip.read(buf)) != -1 ){
            bos.write(buf, 0, num);
        }
        gzip.close();
        bis.close();
        byte[] ret = bos.toByteArray();
        bos.close();
        return ret;
    }
}

 
サービス側とクライアント
public class Server {
    public static void main(String[] args) throws Exception{
        
        EventLoopGroup pGroup = new NioEventLoopGroup();
        EventLoopGroup cGroup = new NioEventLoopGroup();
        
        ServerBootstrap b = new ServerBootstrap();
        b.group(pGroup, cGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 1024)
         //    
         .handler(new LoggingHandler(LogLevel.INFO))
         .childHandler(new ChannelInitializer() {
            protected void initChannel(SocketChannel sc) throws Exception {
                //      .         ,  Handler      msg                 
                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                sc.pipeline().addLast(new ServerHandler());
            }
        });
        
        ChannelFuture cf = b.bind(8765).sync();
        cf.channel().closeFuture().sync();
        pGroup.shutdownGracefully();
        cGroup.shutdownGracefully();
    }
public class ServerHandler extends ChannelHandlerAdapter{

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Request req = (Request)msg;
        System.out.println("Server : " + req.getId() + ", " + req.getName() + ", " + req.getRequestMessage());
        byte[] attachment = GzipUtils.ungzip(req.getAttachment());
        
        String path = System.getProperty("user.dir") + File.separatorChar + "receive" +  File.separatorChar + "001.jpg";
        FileOutputStream fos = new FileOutputStream(path);
        fos.write(attachment);
        fos.close();
        
        Response resp = new Response();
        resp.setId(req.getId());
        resp.setName("resp" + req.getId());
        resp.setResponseMessage("    " + req.getId());
        ctx.writeAndFlush(resp);//.addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
public class Client {
    public static void main(String[] args) throws Exception{
        
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(group)
         .channel(NioSocketChannel.class)
         .handler(new ChannelInitializer() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                sc.pipeline().addLast(new ClientHandler());
            }
        });
        
        ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
        
        for(int i = 0; i < 5; i++){
            Request req = new Request();
            req.setId("" + i);
            req.setName("req" + i);
            req.setRequestMessage("    " + i);    
            String path = System.getProperty("user.dir") + File.separatorChar + "sources" +  File.separatorChar + "001.jpg";
            File file = new File(path);
            FileInputStream in = new FileInputStream(file);  
            byte[] data = new byte[in.available()];  
            in.read(data);  
            in.close(); 
            req.setAttachment(GzipUtils.gzip(data)); //  
            cf.channel().writeAndFlush(req);
        }

        cf.channel().closeFuture().sync();
        group.shutdownGracefully();
    }
}
public class ClientHandler extends ChannelHandlerAdapter{
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            Response resp = (Response) msg;
            System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());            
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

 
1.4長接続/短接続
1.長い接続で、一貫して接続が自発的に中断されず、リアルタイム性が強い2.短い接続である.データはキャッシュに入れ、一度にすべてのデータを一括して提出し、サービス側が受信した後に接続を閉じる以上の2種類は、ChannelHandlerContextにChannelFutureListener.celOSEリスナーを追加するかどうかによって実現する
3.長い接続、一定時間アクティブでない場合は接続を閉じる.SocketChannelにReadTimeoutHandler実装を追加する.例は以下の通りである.
public final class MarshallingCodeCFactory {
    public static MarshallingDecoder buildMarshallingDecoder() {
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
        MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
        return decoder;
    }
    public static MarshallingEncoder buildMarshallingEncoder() {
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
        MarshallingEncoder encoder = new MarshallingEncoder(provider);
        return encoder;
    }
}
public class Request implements Serializable{
    private String id ;
    private String name ;
    private String requestMessage ;
    
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String getRequestMessage() {
        return requestMessage;
    }
    public void setRequestMessage(String requestMessage) {
        this.requestMessage = requestMessage;
    }
}
public class Response implements Serializable{
    private String id;
    private String name;
    private String responseMessage;
    
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String getResponseMessage() {
        return responseMessage;
    }
    public void setResponseMessage(String responseMessage) {
        this.responseMessage = responseMessage;
    }
}

 
public class Server {

    public static void main(String[] args) throws Exception{
        
        EventLoopGroup pGroup = new NioEventLoopGroup();
        EventLoopGroup cGroup = new NioEventLoopGroup();
        
        ServerBootstrap b = new ServerBootstrap();
        b.group(pGroup, cGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 1024)
         //    
         .handler(new LoggingHandler(LogLevel.INFO))
         .childHandler(new ChannelInitializer() {
            protected void initChannel(SocketChannel sc) throws Exception {
                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                sc.pipeline().addLast(new ReadTimeoutHandler(5));  //   ,             
                sc.pipeline().addLast(new ServerHandler());
            }
        });
        
        ChannelFuture cf = b.bind(8765).sync();
        cf.channel().closeFuture().sync();
        pGroup.shutdownGracefully();
        cGroup.shutdownGracefully();
    }
}
public class ServerHandler extends ChannelHandlerAdapter{

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Request request = (Request) msg;
        System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getRequestMessage());
        Response response = new Response();
        response.setId(request.getId());
        response.setName("response" + request.getId());
        response.setResponseMessage("    " + request.getId());
        ctx.writeAndFlush(response);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
public class Client {
    private EventLoopGroup group;
    private Bootstrap b;
    private ChannelFuture cf ;

    //   
    private static class SingletonHolder { 
        static final Client instance = new Client();
    }
    public static Client getInstance(){
        return SingletonHolder.instance;
    }
    
    private Client(){
            group = new NioEventLoopGroup();
            b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .handler(new LoggingHandler(LogLevel.INFO))
             .handler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(SocketChannel sc) throws Exception {
                        sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                        sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                        //  handler(                        ,      )
                        sc.pipeline().addLast(new ReadTimeoutHandler(5));   //   5s,             
                        sc.pipeline().addLast(new ClientHandler());
                    }
            });
    }
    
    public void connect(){
        try {
            this.cf = b.connect("127.0.0.1", 8765).sync();
            System.out.println("         ,         ");                
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    public ChannelFuture getChannelFuture(){
        if(this.cf == null) {   //    
            this.connect();
        }
        if(!this.cf.channel().isActive()){  //  
            this.connect();
        }
        return this.cf;
    }
    
    public static void main(String[] args) throws Exception{
        final Client c = Client.getInstance();
        
        ChannelFuture cf = c.getChannelFuture();
        for(int i = 1; i <= 3; i++ ){
            Request request = new Request();
            request.setId("" + i);
            request.setName("request" + i);
            request.setRequestMessage("    " + i);
            cf.channel().writeAndFlush(request);
            TimeUnit.SECONDS.sleep(4);  //  4s      
        }

        cf.channel().closeFuture().sync(); //       
        
        //                
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("         ");
                    ChannelFuture cf = c.getChannelFuture();
                    assert true == cf.channel().isActive(); //  
                    //      
                    Request request = new Request();
                    request.setId("" + 4);
                    request.setName("request" + 4);
                    request.setRequestMessage("    " + 4);
                    cf.channel().writeAndFlush(request);                    
                    cf.channel().closeFuture().sync();
                    System.out.println("     ");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        
        System.out.println("    ,     ..");
    }
    
}
public class ClientHandler extends ChannelHandlerAdapter{
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            Response resp = (Response) msg;
            System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());            
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

 
1.5 UDP使用(少ない使用)
public class Server {
    public void run(int port) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioDatagramChannel.class)  // UDP: NioDatagramChannel
                .option(ChannelOption.SO_BROADCAST, true) //   
                .handler(new ServerHandler());
            b.bind(port).sync().channel().closeFuture().await();
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new Server().run(8765);
    }
}
public class ServerHandler extends SimpleChannelInboundHandler {

    //     
    private static final String[] DICTIONARY = { 
        "     ,     。",
        "       ,       。", 
        "       ,       。",
        "       ,       。",
        "    ,    。    ,    !"
    };

    private String nextQuote() {
        int quoteId = ThreadLocalRandom.current().nextInt(DICTIONARY.length);
        return DICTIONARY[quoteId];
    }

    @Override
    public void messageReceived(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
        String req = packet.content().toString(CharsetUtil.UTF_8);
        System.out.println(req);
        if ("      ?".equals(req)) {
            ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("      : " + nextQuote(), CharsetUtil.UTF_8), packet.sender()));
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
        throws Exception {
        ctx.close();
        cause.printStackTrace();
    }
}
public class Client {

    public void run(int port) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioDatagramChannel.class)
                .option(ChannelOption.SO_BROADCAST, true)
                .handler(new ClientHandler());
            Channel ch = b.bind(0).sync().channel();
            //            UDP  
            ch.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("      ?", CharsetUtil.UTF_8), new InetSocketAddress("255.255.255.255", port))).sync();
            if (!ch.closeFuture().await(15000)) {
                System.out.println("    !");
            }
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new Client().run(8765);
    }
}
public class ClientHandler extends SimpleChannelInboundHandler {

    @Override
    public void messageReceived(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
        String response = msg.content().toString(CharsetUtil.UTF_8);
        if (response.startsWith("      : ")) {
            System.out.println(response);
            ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

 
 
2.心拍数検査
クラスタ内のプライマリサーバはスレーブサーバの状態を知る必要があるためclientは5~10秒おきにサーバにハートビートパケットを送信する
nettyとタイミングタスクで実現可能
public final class MarshallingCodeCFactory {
    public static MarshallingDecoder buildMarshallingDecoder() {
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
        MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
        return decoder;
    }
    public static MarshallingEncoder buildMarshallingEncoder() {
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
        MarshallingEncoder encoder = new MarshallingEncoder(provider);
        return encoder;
    }
}
public class RequestInfo implements Serializable {
    private String ip ;
    private HashMap cpuPercMap ;
    private HashMap memoryMap;
    
    public String getIp() {
        return ip;
    }
    public void setIp(String ip) {
        this.ip = ip;
    }
    public HashMap getCpuPercMap() {
        return cpuPercMap;
    }
    public void setCpuPercMap(HashMap cpuPercMap) {
        this.cpuPercMap = cpuPercMap;
    }
    public HashMap getMemoryMap() {
        return memoryMap;
    }
    public void setMemoryMap(HashMap memoryMap) {
        this.memoryMap = memoryMap;
    }
}

 
public class Server {

    public static void main(String[] args) throws Exception{
        
        EventLoopGroup pGroup = new NioEventLoopGroup();
        EventLoopGroup cGroup = new NioEventLoopGroup();
        
        ServerBootstrap b = new ServerBootstrap();
        b.group(pGroup, cGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 1024)
         //    
         .handler(new LoggingHandler(LogLevel.INFO))
         .childHandler(new ChannelInitializer() {
            protected void initChannel(SocketChannel sc) throws Exception {
                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                sc.pipeline().addLast(new ServerHeartBeatHandler());
            }
        });
        
        ChannelFuture cf = b.bind(8765).sync();
        cf.channel().closeFuture().sync();
        pGroup.shutdownGracefully();
        cGroup.shutdownGracefully();
    }
}
public class ServerHeartBeatHandler extends ChannelHandlerAdapter {
    
    private static HashMap AUTH_IP_MAP = new HashMap();
    private static final String SUCCESS_KEY = "auth_success_key";
    
    static {
        AUTH_IP_MAP.put("127.0.0.1", "1234");
    }
    
    private boolean auth(ChannelHandlerContext ctx, Object msg){
        String [] ret = ((String) msg).split(",");
        String auth = AUTH_IP_MAP.get(ret[0]);
        if(auth != null && auth.equals(ret[1])){
            //     ,       
            ctx.writeAndFlush(SUCCESS_KEY);
            return true;
        } else {
            ctx.writeAndFlush("auth failure !").addListener(ChannelFutureListener.CLOSE);
            return false;
        }
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if(msg instanceof String){
            auth(ctx, msg);
        } else if (msg instanceof RequestInfo) {
            RequestInfo info = (RequestInfo) msg;
            System.out.println("--------------------------------------------");
            System.out.println("    ip : " + info.getIp());
            System.out.println("    cpu  : ");
            HashMap cpu = info.getCpuPercMap();
            System.out.println("    : " + cpu.get("combined"));
            System.out.println("     : " + cpu.get("user"));
            System.out.println("     : " + cpu.get("sys"));
            System.out.println("   : " + cpu.get("wait"));
            System.out.println("   : " + cpu.get("idle"));
            
            System.out.println("    memory  : ");
            HashMap memory = info.getMemoryMap();
            System.out.println("    : " + memory.get("total"));
            System.out.println("       : " + memory.get("used"));
            System.out.println("       : " + memory.get("free"));
            System.out.println("--------------------------------------------");
            
            ctx.writeAndFlush("info received!");
        } else {
            ctx.writeAndFlush("connect failure!").addListener(ChannelFutureListener.CLOSE);
        }
    }
}
public class Client {
    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(group)
         .channel(NioSocketChannel.class)
         .handler(new ChannelInitializer() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                sc.pipeline().addLast(new ClienHeartBeatHandler());
            }
        });
        
        ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
        cf.channel().closeFuture().sync();
        group.shutdownGracefully();
    }
}
public class ClienHeartBeatHandler extends ChannelHandlerAdapter {

    private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    private ScheduledFuture> heartBeat;  //    
    //            
    private InetAddress addr ;
    private static final String SUCCESS_KEY = "auth_success_key";

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        addr = InetAddress.getLocalHost();
        //String ip = addr.getHostAddress();
        String ip = "127.0.0.1";
        String key = "1234";
        //  
        String auth = ip + "," + key;
        //     
        ctx.writeAndFlush(auth);
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            if(msg instanceof String){
                String ret = (String) msg;
                if(SUCCESS_KEY.equals(ret)){
                    //          ,    5       
                    this.heartBeat = this.scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 5, TimeUnit.SECONDS);
                    System.out.println(msg);                
                } else {  
                    //           
                    System.out.println(msg);
                }
            }
        } finally {
            //   ,           
            ReferenceCountUtil.release(msg);
        }
    }

    private class HeartBeatTask implements Runnable {
        private final ChannelHandlerContext ctx;
        public HeartBeatTask(final ChannelHandlerContext ctx) {
            this.ctx = ctx;
        }
        @Override
        public void run() {
            try {
                RequestInfo info = new RequestInfo();
                //ip
                info.setIp(addr.getHostAddress());
                Sigar sigar = new Sigar();
                //cpu prec
                CpuPerc cpuPerc = sigar.getCpuPerc();
                HashMap cpuPercMap = new HashMap();
                cpuPercMap.put("combined", cpuPerc.getCombined());
                cpuPercMap.put("user", cpuPerc.getUser());
                cpuPercMap.put("sys", cpuPerc.getSys());
                cpuPercMap.put("wait", cpuPerc.getWait());
                cpuPercMap.put("idle", cpuPerc.getIdle());
                // memory
                Mem mem = sigar.getMem();
                HashMap memoryMap = new HashMap();
                memoryMap.put("total", mem.getTotal() / 1024L);
                memoryMap.put("used", mem.getUsed() / 1024L);
                memoryMap.put("free", mem.getFree() / 1024L);
                info.setCpuPercMap(cpuPercMap);
                info.setMemoryMap(memoryMap);
                
                ctx.writeAndFlush(info);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            //             
            if (heartBeat != null) {
                heartBeat.cancel(true);
                heartBeat = null;
            }
            ctx.fireExceptionCaught(cause);
        }
    }
}

 
3. HTTP
3.1 Hello World
public final class HttpHelloWorldServer {
  
      static final boolean SSL = System.getProperty("ssl") != null;
      static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8443" : "8080"));
  
      public static void main(String[] args) throws Exception {
          final SslContext sslCtx;
          if (SSL) {
              SelfSignedCertificate ssc = new SelfSignedCertificate();
              sslCtx = SslContext.newServerContext(ssc.certificate(), ssc.privateKey());
          } else {
              sslCtx = null;
          }
  
          EventLoopGroup bossGroup = new NioEventLoopGroup();
          EventLoopGroup workerGroup = new NioEventLoopGroup();
          try {
              ServerBootstrap b = new ServerBootstrap();
              b.option(ChannelOption.SO_BACKLOG, 1024);
              b.group(bossGroup, workerGroup)
               .channel(NioServerSocketChannel.class)
               .handler(new LoggingHandler(LogLevel.INFO))
               .childHandler(new HttpHelloWorldServerInitializer(sslCtx));
  
              Channel ch = b.bind(PORT).sync().channel();
  
              System.err.println("Open your web browser and navigate to " +
                      (SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/');
  
              ch.closeFuture().sync();
          } finally {
              bossGroup.shutdownGracefully();
              workerGroup.shutdownGracefully();
          }
      }
}
public class HttpHelloWorldServerInitializer extends ChannelInitializer {

    private final SslContext sslCtx;

    public HttpHelloWorldServerInitializer(SslContext sslCtx) {
        this.sslCtx = sslCtx;
    }

    @Override
    public void initChannel(SocketChannel ch) {
        ChannelPipeline p = ch.pipeline();
        if (sslCtx != null) {
            p.addLast(sslCtx.newHandler(ch.alloc()));
        }
        p.addLast(new HttpServerCodec());   // !  http  , HttpRequest HttpResponse
        p.addLast(new HttpHelloWorldServerHandler());
    }
}
public class HttpHelloWorldServerHandler extends ChannelHandlerAdapter {
    private static final byte[] CONTENT = "HELLO WORLD".getBytes();

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof HttpRequest) {
            HttpRequest req = (HttpRequest) msg;

            if (HttpHeaderUtil.is100ContinueExpected(req)) {
                ctx.write(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE));
            }
            boolean keepAlive = HttpHeaderUtil.isKeepAlive(req);
            //     
            FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(CONTENT));
            response.headers().set(CONTENT_TYPE, "text/plain;charset=UTF-8");
            response.headers().setInt(CONTENT_LENGTH, response.content().readableBytes());

            if (!keepAlive) {
                // Request   ,        
                ctx.write(response).addListener(ChannelFutureListener.CLOSE);
            } else {
                //    , response    KEEP_ALIVE
                response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE);
                ctx.write(response);
            }
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

 
3.2 HTTPダウンロードファイル
public class HttpDownloadServer {

    private static final String DEFAULT_URL = "/sources/";

    public void run(final int port, final String url) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    // addLast     key,     
                    // request   
                    ch.pipeline().addLast("http-decoder", new HttpRequestDecoder());
                    // response    
                    ch.pipeline().addLast("http-encoder", new HttpResponseEncoder());
                    // chunked,         response       
                    ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
                    // ObjectAggregator,    response     FullHttpResponse
                    ch.pipeline().addLast("http-aggregator", new HttpObjectAggregator(65536));
                    //        handler
                    ch.pipeline().addLast("fileServerHandler", new HttpDownoadServerHandler(url));
                }
                });
            ChannelFuture future = b.bind("127.0.0.1", port).sync();
            System.out.println("HTTP         ,    : " + "http://localhost:"  + port + url);
            future.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8765;
        new HttpDownloadServer().run(port, DEFAULT_URL);
    }
}

 
//        SimpleChannelInboundHandler,    ,         msg   
public class HttpDownoadServerHandler extends SimpleChannelInboundHandler {
    
    private final String url;

    public HttpDownoadServerHandler(String url) {
        this.url = url;
    }

    @Override
    public void messageReceived(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
        //     (  )  
        if (!request.decoderResult().isSuccess()) {
            // 400
            sendError(ctx, BAD_REQUEST);
            return;
        }
        //          :    GET       
        if (request.method() != GET) {
            // 405
            sendError(ctx, METHOD_NOT_ALLOWED);
            return;
        }
        //    uri  
        final String uri = request.uri();
        // url    ,      
        final String path = parseURI(uri);
        //          , path null
        if (path == null) {
            //403
            sendError(ctx, FORBIDDEN);
            return;
        }
        
        //   file  
        File file = new File(path);
        //         
        if (file.isHidden() || !file.exists()) {
            // 404 
            sendError(ctx, NOT_FOUND);
            return;
        }
        //     
        if (file.isDirectory()) {
            if (uri.endsWith("/")) {
                //     "/"               :         
                sendListing(ctx, file);
            } else {
                //   "/"       ,      "/"     
                sendRedirect(ctx, uri + '/');
            }
            return;
        }
        //       file        
        if (!file.isFile()) {
            // 403
            sendError(ctx, FORBIDDEN);
            return;
        }
        
        //        
        RandomAccessFile randomAccessFile = null;
        try {
            randomAccessFile = new RandomAccessFile(file, "r");//           
        } catch (FileNotFoundException fnfe) {
            // 404
            sendError(ctx, NOT_FOUND);
            return;
        }
        
        //      
        long fileLength = randomAccessFile.length();
        //      
        HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
        //      
        HttpHeaderUtil.setContentLength(response, fileLength);
        //  Content-Type
        setContentTypeHeader(response, file);
        //   KeepAlive
        if (HttpHeaderUtil.isKeepAlive(request)) {
            response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE);
        }
        //  response header, HttpObjectAggregator            
        ctx.write(response);
        
        //  ChunkedFile.   ChunkedFile    RandomAccessFile     .       8192   
        ChannelFuture sendFileFuture = ctx.write(new ChunkedFile(randomAccessFile, 0, fileLength, 8192), ctx.newProgressivePromise());
        //      
        sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
            @Override
            public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {
                if (total < 0) { 
                    System.err.println("Transfer progress: " + progress);
                } else {
                    System.err.println("Transfer progress: " + progress + " / " + total);
                }
            }
            @Override
            public void operationComplete(ChannelProgressiveFuture future) throws Exception {
                System.out.println("Transfer complete.");
            }
        });
        
        //  Chunked,                 !
        ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
        //         Keep-Alive,           ,          
        if (!HttpHeaderUtil.isKeepAlive(request)) {
            lastContentFuture.addListener(ChannelFutureListener.CLOSE);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (ctx.channel().isActive()) {
           // 500
            sendError(ctx, INTERNAL_SERVER_ERROR);
            ctx.close();
        }
    }

    //    URI   
    private static final Pattern INSECURE_URI = Pattern.compile(".*[<>&\"].*");
    private String parseURI(String uri) {
        try {
            //  UTF-8   
            uri = URLDecoder.decode(uri, "UTF-8");
        } catch (UnsupportedEncodingException e) {
            try {
                //  ISO-8859-1
                uri = URLDecoder.decode(uri, "ISO-8859-1");
            } catch (UnsupportedEncodingException e1) {
                //         
                throw new Error();
            }
        }
        //  uri       :4     
        // step 1     
        if (!uri.startsWith(url)) {
            return null;
        }
        // step 2     
        if (!uri.startsWith("/")) {
            return null;
        }
        // step 3                        
        uri = uri.replace('/', File.separatorChar);
        // step 4        
        if (uri.contains(File.separator + '.') || uri.contains('.' + File.separator) || 
                uri.startsWith(".") || uri.endsWith(".") || 
                INSECURE_URI.matcher(uri).matches()) {
            return null;
        }
        //           + URI            
        return System.getProperty("user.dir") + File.separator + uri;
    }
    
    //           
    private static final Pattern ALLOWED_FILE_NAME = Pattern.compile("[A-Za-z0-9][-_A-Za-z0-9\\.]*");
    //    ,  html  
    private static void sendListing(ChannelHandlerContext ctx, File dir) {
        //       
        FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK);
        //    
        response.headers().set(CONTENT_TYPE, "text/html; charset=UTF-8");
        //       
        StringBuilder ret = new StringBuilder();
        String dirPath = dir.getPath();
        ret.append("\r
"); ret.append(""<span style="color:#000000;">); ret.append(dirPath); ret.append(</span>" :"<span style="color:#000000;">); ret.append(</span>"\r
"); ret.append("

"); ret.append(dirPath).append(" :"); ret.append("

\r
"); ret.append("\r
"); // ByteBuf, ByteBuf buffer = Unpooled.copiedBuffer(ret, CharsetUtil.UTF_8); // response.content().writeBytes(buffer); // ByteBuf buffer.release(); // ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } // private static void sendRedirect(ChannelHandlerContext ctx, String newUri) { FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, FOUND); response.headers().set(LOCATION, newUri); ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } // private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) { FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status.toString()+ "\r
", CharsetUtil.UTF_8)); response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8"); ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } private static void setContentTypeHeader(HttpResponse response, File file) { // mime Content-Type MimetypesFileTypeMap mimeTypesMap = new MimetypesFileTypeMap(); response.headers().set(CONTENT_TYPE, mimeTypesMap.getContentType(file.getPath())); } }

 
3.3 HTTPアップロードファイル(少ない使用)
実際のアプリケーションでは、ファイルアップロードサービス側に成熟したフレームワークfastDFS(小さなファイル)とHDFS(大きなファイル)があります.
ブレークポイントの継続を実現するには、アップロードの進捗を記録する必要があります.HTTPヘッダのRangeとContent-Langeを参照してください.
public final class HttpUploadServer {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8443" : "8080"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContext.newServerContext(ssc.certificate(), ssc.privateKey());
        } else {
            sslCtx = null;
        }

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup);
            b.channel(NioServerSocketChannel.class);
            b.handler(new LoggingHandler(LogLevel.INFO));
            b.childHandler(new HttpUploadServerInitializer(sslCtx));

            Channel ch = b.bind(PORT).sync().channel();
            System.err.println("Open your web browser and navigate to " + (SSL ? "https" : "http") + "://127.0.0.1:" + PORT + '/');

            ch.closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
public class HttpUploadServerInitializer extends ChannelInitializer {

    private final SslContext sslCtx;

    public HttpUploadServerInitializer(SslContext sslCtx) {
        this.sslCtx = sslCtx;
    }

    @Override
    public void initChannel(SocketChannel ch) {
        ChannelPipeline pipeline = ch.pipeline();
        if (sslCtx != null) {
            pipeline.addLast(sslCtx.newHandler(ch.alloc()));
        }
        pipeline.addLast(new HttpRequestDecoder());
        pipeline.addLast(new HttpResponseEncoder());
        //   
        pipeline.addLast(new HttpContentCompressor());
        pipeline.addLast(new HttpUploadServerHandler());
    }
}
public class HttpUploadServerHandler extends SimpleChannelInboundHandler {

    private static final Logger logger = Logger.getLogger(HttpUploadServerHandler.class.getName());

    private HttpRequest request;

    private boolean readingChunks;

    private final StringBuilder responseContent = new StringBuilder();

    private static final HttpDataFactory factory = new DefaultHttpDataFactory(DefaultHttpDataFactory.MINSIZE); //     minsize    

    private HttpPostRequestDecoder decoder;
    
    static {
        DiskFileUpload.deleteOnExitTemporaryFile = true; //           
        DiskFileUpload.baseDirectory = "D:" + File.separatorChar + "aa";  //      
        
        DiskAttribute.deleteOnExitTemporaryFile = true; //           
        DiskAttribute.baseDirectory = "D:" + File.separatorChar + "aa"; //      
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if (decoder != null) {
            decoder.cleanFiles();
        }
    }

    @Override
    public void messageReceived(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
        if (msg instanceof HttpRequest) {           // HttpRequest   
            HttpRequest request = this.request = (HttpRequest) msg;
            URI uri = new URI(request.uri());
            if (!uri.getPath().startsWith("/form")) {
                //       
                writeMenu(ctx);
                return;
            }
            //       
            responseContent.setLength(0);
            responseContent.append("WELCOME TO THE WILD WILD WEB SERVER\r
"); responseContent.append("===================================\r
"); responseContent.append("VERSION: " + request.protocolVersion().text() + "\r
"); responseContent.append("REQUEST_URI: " + request.uri() + "\r
\r
"); responseContent.append("\r
\r
"); for (Entry entry : request.headers()) { responseContent.append("HEADER: " + entry.getKey() + '=' + entry.getValue() + "\r
"); } responseContent.append("\r
\r
"); Set cookies = null; String value = request.headers().getAndConvert(HttpHeaderNames.COOKIE); if (value == null) { cookies = Collections.emptySet(); } else { cookies = ServerCookieDecoder.decode(value); } for (Cookie cookie : cookies) { responseContent.append("COOKIE: " + cookie + "\r
"); } responseContent.append("\r
\r
"); QueryStringDecoder decoderQuery = new QueryStringDecoder(request.uri()); Map> uriAttributes = decoderQuery.parameters(); for (Entry> attr: uriAttributes.entrySet()) { for (String attrVal: attr.getValue()) { responseContent.append("URI: " + attr.getKey() + '=' + attrVal + "\r
"); } } responseContent.append("\r
\r
"); // GET , return if (request.method().equals(HttpMethod.GET)) { responseContent.append("\r
\r
END OF GET CONTENT\r
"); return; } // POST try { decoder = new HttpPostRequestDecoder(factory, request); } catch (ErrorDataDecoderException e1) { e1.printStackTrace(); responseContent.append(e1.getMessage()); writeResponse(ctx.channel()); ctx.channel().close(); return; } readingChunks = HttpHeaderUtil.isTransferEncodingChunked(request); responseContent.append("Is Chunked: " + readingChunks + "\r
"); responseContent.append("IsMultipart: " + decoder.isMultipart() + "\r
"); if (readingChunks) { responseContent.append("Chunks: "); } } if (decoder != null) { if (msg instanceof HttpContent) { //HttpContent // chunk HttpContent chunk = (HttpContent) msg; try { decoder.offer(chunk); } catch (ErrorDataDecoderException e1) { e1.printStackTrace(); responseContent.append(e1.getMessage()); writeResponse(ctx.channel()); ctx.channel().close(); return; } responseContent.append('o'); // chunk 'o' readHttpDataChunkByChunk(); // chunk if (chunk instanceof LastHttpContent) { writeResponse(ctx.channel()); readingChunks = false; reset(); } } } else { writeResponse(ctx.channel()); } } private void reset() { request = null; decoder.destroy(); // decoder = null; } private void readHttpDataChunkByChunk() throws Exception { try { while (decoder.hasNext()) { InterfaceHttpData data = decoder.next(); if (data != null) { try { writeHttpData(data); } finally { data.release(); } } } } catch (EndOfDataDecoderException e1) { responseContent.append("\r
\r
END OF CONTENT CHUNK BY CHUNK\r
\r
"); } } private void writeHttpData(InterfaceHttpData data) throws Exception { if (data.getHttpDataType() == HttpDataType.Attribute) { Attribute attribute = (Attribute) data; String value = null; try { value = attribute.getValue(); } catch (IOException e1) { e1.printStackTrace(); responseContent.append("\r
BODY Attribute: " + attribute.getHttpDataType().name() + ": " + attribute.getName() + " Error while reading value: " + e1.getMessage() + "\r
"); return; } if (value.length() > 100) { responseContent.append("\r
BODY Attribute: " + attribute.getHttpDataType().name() + ": " + attribute.getName() + " data too long\r
"); } else { responseContent.append("\r
BODY Attribute: " + attribute.getHttpDataType().name() + ": " + attribute + "\r
"); } } else { responseContent.append("\r
-----------start-------------" + "\r
"); responseContent.append("\r
BODY FileUpload: " + data.getHttpDataType().name() + ": " + data + "\r
"); responseContent.append("\r
------------end------------" + "\r
"); if (data.getHttpDataType() == HttpDataType.FileUpload) { FileUpload fileUpload = (FileUpload) data; if (fileUpload.isCompleted()) { System.out.println("file name : " + fileUpload.getFilename()); System.out.println("file length: " + fileUpload.length()); System.out.println("file maxSize : " + fileUpload.getMaxSize()); System.out.println("file path :" + fileUpload.getFile().getPath()); System.out.println("file absolutepath :" + fileUpload.getFile().getAbsolutePath()); System.out.println("parent path :" + fileUpload.getFile().getParentFile()); if (fileUpload.length() < 1024 * 1024 * 10) { responseContent.append("\tContent of file\r
"); try { responseContent.append(fileUpload.getString(fileUpload.getCharset())); } catch (Exception e1) { e1.printStackTrace(); } responseContent.append("\r
"); } else { responseContent.append("\tFile too long to be printed out:" + fileUpload.length() + "\r
"); } fileUpload.renameTo(new File(fileUpload.getFile().getPath())); // , decoder.removeHttpDataFromClean(fileUpload); } else { responseContent.append("\tFile to be continued but should not!\r
"); } } } } private void writeResponse(Channel channel) { ByteBuf buf = copiedBuffer(responseContent.toString(), CharsetUtil.UTF_8); responseContent.setLength(0); // boolean close = request.headers().contains(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE, true) || request.protocolVersion().equals(HttpVersion.HTTP_1_0) && !request.headers().contains(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE, true); FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, buf); response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8"); // Content-Length if (!close) { response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, buf.readableBytes()); } Set cookies = null; String value = request.headers().getAndConvert(HttpHeaderNames.COOKIE); if (value == null) { cookies = Collections.emptySet(); } else { cookies = ServerCookieDecoder.decode(value); } if (!cookies.isEmpty()) { for (Cookie cookie : cookies) { response.headers().add(HttpHeaderNames.SET_COOKIE, ServerCookieEncoder.encode(cookie)); } } ChannelFuture future = channel.writeAndFlush(response); if (close) { future.addListener(ChannelFutureListener.CLOSE); } } // html private void writeMenu(ChannelHandlerContext ctx) { responseContent.setLength(0); // create Pseudo Menu responseContent.append(""); responseContent.append(""); responseContent.append("Netty Test Form\r
"); responseContent.append("\r
"); responseContent.append(""); responseContent.append(""); responseContent.append(""); responseContent.append(""); responseContent.append(""); responseContent.append("
"); responseContent.append("

Netty Test Form

"); responseContent.append("Choose one FORM"); responseContent.append("
\r
"); // GET responseContent.append("
GET FORM
"); responseContent.append("
"); responseContent.append(""); responseContent.append(""); responseContent.append(""); responseContent.append("
Fill with value:
Fill with value:
"); responseContent.append("
Fill with value: