Netty紹介(2)————マルチチャットルームでDomeを実現


gitアドレス:https://github.com/lkj41110/netty_dome

サービス側

/**
 *  
 * @author lkj41110
 * @version time:2017 1 16   9:54:55
 */
public class ServerMain {

   private int port;

   public ServerMain(int port) {
      this.port = port;
   }

   public static void main(String[] args) {
      new ServerMain(2000).run();
   }

   public void run() {
      EventLoopGroup acceptor = new NioEventLoopGroup();
      EventLoopGroup worker = new NioEventLoopGroup();
      ServerBootstrap bootstrap = new ServerBootstrap();
      bootstrap.option(ChannelOption.SO_BACKLOG, 1024);

      bootstrap.group(acceptor, worker);// , , IO
       bootstrap.channel(NioServerSocketChannel.class);// socketchannel 
       bootstrap.childHandler(new ServerIniterHandler());// accept channel pipeline 
        try {
         //  
         Channel channel = bootstrap.bind(port).sync().channel();
         System.out.println("server strart running in port:" + port);

         //  
         channel.closeFuture().sync();
      } catch (InterruptedException e) {
         e.printStackTrace();
      } finally {
         //  
         acceptor.shutdownGracefully();
         worker.shutdownGracefully();
      }
   }
}

サービス側が起動するときは、まずサーバBootstrapインスタンスを作成します.次に、内蔵の転送モードをバインドする必要があります.NettyにはNIO,Epoll,OIO,JVM内部伝送の伝送モードが内蔵されており,それぞれNioEventLoopGroup,EpollEventLoopGroup,OioEventLoopGroup,LocalEventLoopGroupの4つのモードに対応している.一般的にはNIOを使用しているので、NioEventLoopGroupを使用しています.
EventLoopGroup acceptor = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
// NIO。
bootstrap.group(acceptor, worker);
bootstrap.channel(NioServerSocketChannel.class);

上のコードからbootstrapが2つのNioEventLoopGroup(スレッドプールと見なすことができる)をバインドしているのを見ました.これはReactor主従スレッドモデルで、新しい接続を受け入れ、処理し、ChannelのタイプをNioServer-SocketChannelに指定します.その後、ローカルアドレスを選択したポートを持つInetSocket-Addressに設定します.サーバはこのアドレスにバインドされ、新しい接続要求を傍受します.

public class ServerIniterHandler extends  ChannelInitializer<SocketChannel> {

   @Override
   protected void initChannel(SocketChannel arg0) throws Exception {
      ChannelPipeline pipeline = arg0.pipeline();
      pipeline.addLast("decode",new StringDecoder());
      pipeline.addLast("encode",new StringEncoder());
      pipeline.addLast("chat",new ChatServerHandler());

   }
}

ServerIniterHandlerでは主にChannelPipelineチェーンをバインドし、StringDecoderを設定し、StringEncoderが受信した情報がStringタイプではなく、ChatServerHandlerビジネス処理クラスを設定し、サービス側のビジネス処理ロジックは主にChatServerHandlerによって実現されます.

/**
 *  
 */
public class ChatServerHandler extends SimpleChannelInboundHandler {

    // 
    public static final ChannelGroup group = new DefaultChannelGroup(
            GlobalEventExecutor.INSTANCE);

    @Override
    protected void channelRead0(ChannelHandlerContext arg0, String arg1)
            throws Exception {
        Channel channel = arg0.channel();
        // , 
        for (Channel ch : group) {
            if (ch == channel) {
                ch.writeAndFlush("[you]:" + arg1 + "
"
); } else { ch.writeAndFlush( "[" + channel.remoteAddress() + "]: " + arg1 + "
"
); } } System.out.println("[" + channel.remoteAddress() + "]: " + arg1 + "
"
); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); for (Channel ch : group) { ch.writeAndFlush( "[" + channel.remoteAddress() + "] " + "is comming"); } group.add(channel); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); for (Channel ch : group) { ch.writeAndFlush( "[" + channel.remoteAddress() + "] " + "is comming"); } group.remove(channel); } // @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); System.out.println("[" + channel.remoteAddress() + "] " + "online"); ctx.writeAndFlush("[server]: welcome"); } // @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); System.out.println("[" + channel.remoteAddress() + "] " + "offline"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println( "[" + ctx.channel().remoteAddress() + "]" + "exit the room"); ctx.close().sync(); } }

クライアントのコードも基本的にサービス側と似ており、最大の違いはクライアントの起動コードが作成されたBootstrapの例であり、NioEventLoopGroupのスレッドプールが1つだけ設定されて接続を処理することである.

クライアント

/**
 *  ( , )
 * @author lkj41110
 * @version time:2017 1 16   9:55:41
 */
public class ClientMain {
   private String host;
   private int port;
   private boolean stop = false;

   public ClientMain(String host, int port) {
      this.host = host;
      this.port = port;
   }

   public static void main(String[] args) throws IOException {
      new ClientMain("127.0.0.1", 2000).run();
   }

   public void run() throws IOException {
       // worker , 
      EventLoopGroup worker = new NioEventLoopGroup();
      Bootstrap bootstrap = new Bootstrap();
      bootstrap.group(worker);
      //  NIO   Channel
      bootstrap.channel(NioSocketChannel.class);
      bootstrap.handler(new ClientIniterHandler());

      try {
          //     
         Channel channel = bootstrap.connect(host, port).sync().channel();
         while (true) {
             // 
            BufferedReader reader = new BufferedReader(
                  new InputStreamReader(System.in));
            String input = reader.readLine();
            if (input != null) {
               if ("quit".equals(input)) {
                  System.exit(1);
               }
               channel.writeAndFlush(input);
            }
         }
      } catch (InterruptedException e) {
         e.printStackTrace();
         System.exit(1);
      }
   }

}

public class ClientIniterHandler extends ChannelInitializer<SocketChannel> {

   @Override
   protected void initChannel(SocketChannel arg0) throws Exception {
      ChannelPipeline pipeline = arg0.pipeline();
      pipeline.addLast("stringD", new StringDecoder());
      pipeline.addLast("stringC", new StringEncoder());
      pipeline.addLast("http", new HttpClientCodec());
      pipeline.addLast("chat", new ChatClientHandler());
   }

}

public class ChatClientHandler extends SimpleChannelInboundHandler<String> {

   @Override
   protected void channelRead0(ChannelHandlerContext arg0, String arg1)
         throws Exception {
       // 
      System.out.println(arg1);
   }

}
 
// 。 
server strart running in port:2000
[/127.0.0.1:60628] online
[/127.0.0.1:60628]: 123

[/127.0.0.1:60628]: 12312

[/127.0.0.1:60628]: 312312

[/127.0.0.1:60744] online
[/127.0.0.1:60744]: 123123

[/127.0.0.1:60744]: 1231231

[/127.0.0.1:60744] offline
[/127.0.0.1:60628]: 1232131

[/127.0.0.1:60628] offline

// 
[server]: welcome
123
[you]:123

12312
[you]:12312

312312
[you]:312312

[/127.0.0.1:60744] is comming
[/127.0.0.1:60744]: 123123

[/127.0.0.1:60744]: 1231231

[/127.0.0.1:60744] is comming
1232131
[you]:1232131

以上は簡単なnetty Domeです.nettyの重要なコンポーネントが使用されています(後で一つ一つ分析します):
  • Reactorスレッドモデル
  • Bootstrap or ServerBootstrap
  • EventLoop
  • EventLoopGroup
  • ChannelPipeline
  • Channel
  • Future or ChannelFuture
  • ChannelInitializer
  • ChannelHandler