Nettyはサービス側でありクライアントであり、サービス側とクライアントは互いに対応している.

6332 ワード

Nettyはサービス側でありクライアントであり、サービス側とクライアントは互いに対応している.
具体的な機能の詳細は次のとおりです.
上流にはミドルウェアプラットフォームにメッセージを送信するサービスがあります.ミドルウェアプラットフォームにはサービス側もクライアントもあります.下流クライアントを介して接続されたクライアントとミドルウェアを介して伝送されるチャネルidは同じであり、互いに対応するチャネルを実現し、伝送されたメッセージが同じ道を歩んでいることを保証する.では、同時に1つのサービス側と1つのクライアントを備え、下流のクライアントがサービス側に接続しなければならない.すなわち、ミドルウェアプラットフォームはメッセージ変換のプラットフォームとしてのみ機能し、変換を行わない透過機能を達成する.
上記の機能の詳細については、上流から下へ送信されたデータと自分自身の接続データおよび下流クライアントの接続データをグローバルmapで格納し、mapで対応するチャネルidとデータを取得することを想定している.
ステップ1:ダウンストリームクライアントの接続を傍受し、クライアント接続を作成する
public class NettyClientListener implements ApplicationRunner {

    @Autowired
    NettyClient NettyClient;

    @Override
    public void run(ApplicationArguments args) {
        log.info("Netty Client connection is starting ........................................");
        //                   
        Channel channel = cGSNettyClient.newClient();
        CommonDataContext.putChannel(CommonDataContext.CGS_CHANNEL_KEY, channel);
        log.info("Netty Client channel id = {}", channel.id());
    }
}

ステップ2:新しい接続を初期化する
public class NettyClient {

    public void init() {
        client = new Bootstrap();
        EventLoopGroup group = new NioEventLoopGroup();
        //       
        client.channel(NioSocketChannel.class);
        client.option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.TCP_NODELAY, true);
        client.group(group);
        client.remoteAddress(host, port);
        // NioSocketChannel   handler,      
        client.handler(new ChannelInitializer() { //    NioSocketChannel
            @Override
            protected void initChannel(NioSocketChannel ch) {
                ch.pipeline().addLast(new IdleStateHandler(5, 5, 0));
                ch.pipeline().addLast(new ByteArrayDecoder());
                //   handler
                ch.pipeline().addLast(new ClientHandler(applicationContext));
            }
        });
    }

    public Channel newClient() {
        try {
            if (client == null) {
                init();
            }
            //      
            ChannelFuture channelFuture = client.connect(host, port).sync();
            channelFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture arg0) throws Exception {
                    if (channelFuture.isSuccess()) {
                        log.info("Netty client connection is successful ........................................");
                    } else {
                        log.info("Netty client connection is failed ........................................");
                        channelFuture.cause().printStackTrace();
                    }
                }
            });
            return channelFuture.channel();
        } catch (Exception e) {
            log.error("Netty client connection is failed ........................................, because: {}", e.getMessage());
            return null;
        }
    }
}

第三部:チャネル接続、読み書き、破棄
public class YJServerHandler extends ChannelInboundHandlerAdapter {

	/***
	 *     (    )
	 * @param ctx
	 */
	@Override
	public void channelInactive(ChannelHandlerContext ctx) {
		Channel hardwareChannel = ctx.channel();
		//     ID
		String hardwareChannelId = ctx.channel().id().asLongText();
		//            
		Channel ccChannel = CommonDataContext.getChannel(hardwareChannelId);
		//   ID          ID
		String ccChannelId = ccChannel.id().asLongText();

		CommonDataContext.removeChannel(hardwareChannelId);
		CommonDataContext.removeChannel(ccChannelId);

		ccChannel.disconnect();
		hardwareChannel.disconnect();
	}

	/**
	 *     ,          ,                ;
	 * @param ctx
	 */
	@Override
	public void channelActive(ChannelHandlerContext ctx) {
		NettyClient NettyClient = SpringContextUtils.getBean(NettyClient.class);
		Channel channel = ctx.channel();
		//          ,(   )        ;
		Channel cccClientChannel = ccNettyClient.newClient();
		CommonDataContext.putChannel(channel.id().asLongText(), ccClientChannel);
		CommonDataContext.putChannel(ccClientChannel.id().asLongText(), channel);
		log.info("logictis channelActive ChannelMap = {},          :{}", CommonDataContext.getChannelMap(), ctx.channel().remoteAddress());
	}

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		//        
		Channel channel = CommonDataContext.getChannel(ctx.channel().id().asLongText());
		//log.info("SocketHandler channel = {}", channel);
		channel.writeAndFlush(msg);
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		cause.printStackTrace();
		ctx.close();
	}

	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
		super.userEventTriggered(ctx, evt);
		if (evt instanceof IdleStateEvent) {
			IdleStateEvent event = (IdleStateEvent) evt;
			if (IdleState.READER_IDLE.equals(event.state())) {
				log.warn("         ");
				//         
			} else if (IdleState.WRITER_IDLE.equals(event.state())) {
				log.warn("        ");
				//      
				// ctx.writeAndFlush(MessageProto.Message.newBuilder().setType(1));
			} else if (event.state().equals(IdleState.ALL_IDLE)) {
				//      
				log.warn("           ");
			}
		}
	}
}

第四部:モニタリング
public class XYServerPipeline extends ChannelInitializer {

	@Override
	public void initChannel(SocketChannel ch) {
		ChannelPipeline pipeline = ch.pipeline();

		pipeline.addLast(new ByteArrayEncoder());
		//        ,    Channel         ,      
		pipeline.addLast("idleStateHandler", new IdleStateHandler(180, 180, 180, TimeUnit.SECONDS));
		pipeline.addLast(new XYServerHandler());
	}
}

ステップ5:ビジネス読み取りを実現し、下流クライアントに送信
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            //        ,        ,      ;
            Channel channel = CommonDataContext.getChannel(ctx.channel().id().asLongText());
            log.info("ClientHandler channel = {}", channel);
            channel.writeAndFlush(msg);
            return;
}