JAva nioネットワークフレームワーク実装

6740 ワード

mavenプロジェクトhttps://github.com/solq360/common
  • チェーン符号化/復号化
  • リンク層チェーン処理
  • 配管管理socket
  • マルチプロトコル処理が非常に便利
  • netty NioEventLoopシングルスレッドシリアル処理
  • =========サポート機能:
  • 自動化符号化
  • rpcインタフェース強化
  • 使用
    簡単なチャットの例
    server
    TestNioServer
    //  session    
    ISessionFactory sessionFactory = new SessionFactory();
    //   /    
    ICoderParserManager coderParserManager = new CoderParserManager();
    //    /  ,    
    coderParserManager.register(CoderParser.valueOf("server chat", PackageDefaultCoder.valueOf(), new ChatTestServerHandle()));
    //  ServerSocket   
    ServerSocket serverSocket=ServerSocket.valueOf(SocketChannelConfig.valueOf(6969), 10,20,coderParserManager, sessionFactory);
    
    //    
    serverSocket.start();
    //      
    serverSocket.sync();
    //    
    serverSocket.stop();
    	

    client
    TestNioClient従来方式接続
    	//   /    
     	ICoderParserManager coderParserManager = new CoderParserManager();
     	//    /  ,    
    	coderParserManager.register(CoderParser.valueOf("chat", PackageDefaultCoder.valueOf(), new ChatHandle()));
    	//  ClientSocket   
    	final ClientSocket clientSocket = ClientSocket.valueOf(SocketChannelConfig.valueOf(6969), new SocketPool("client", null), coderParserManager, new EmptyHandle());
    
    	//          
    	Timer timer = new Timer();
    	timer.schedule(new TimerTask() {
    
    	    @Override
    	    public void run() {
    		clientSocket.send("       ");
    		System.out.println("send ");
    		this.cancel();
    	    }
    	}, 1000);
    	
    	//    
    	clientSocket.start();
    	//      
    	clientSocket.sync();
    	//    
    	clientSocket.stop();

    サーバ接続
    	//  session    
    	ISessionFactory sessionFactory = new SessionFactory();
    	//   /    
     	ICoderParserManager coderParserManager = new CoderParserManager();
     	//    /  ,    
    	coderParserManager.register(CoderParser.valueOf("chat", PackageDefaultCoder.valueOf(), new ChatHandle()));
    	//  ClientSocket   
    	final ServerSocket serverSocket = ServerSocket.valueOf(SocketChannelConfig.valueOf(8888), 10, 20, coderParserManager, sessionFactory);
    
    	//          
    	Timer timer = new Timer();
    	timer.schedule(new TimerTask() {
    
    	    @Override
    	    public void run() {
    		System.out.println("registerClientSocket");
    		//       
    		ClientSocket clientSocket = serverSocket.registerClient(SocketChannelConfig.valueOf(6969));
    		clientSocket.send("       ");
    		this.cancel();
    	    }
    	}, 1000);
    	
    	//    
    	serverSocket.start();
    	//      
    	serverSocket.sync();
    	//    
    	serverSocket.stop();

    ソース実装プロセス
    チェーンコーディング
  • は、複数のICoderによる入出力変換処理
  • である.
  • CoderParserクラス組立複数のICoder
  • 符号化プロセッサ注意優先度
  • nio read -> packageCoder -> link coders -> handle
  • handle write -> link coders -> packageCoder -> nio write
  • ICoderParserManager管理呼び出し処理
  • public interface ICoderParserManager {
    
        /**
         *     
         * 
         * @return CoderResult
         * */
        CoderResult decode(ByteBuffer buffer, ICoderCtx ctx);
    
        /**
         *     
         * */
        ByteBuffer encode(Object message, ICoderCtx ctx);
    
        void error(ByteBuffer buffer, ICoderCtx ctx);
    
        /**     /     */
        void register(CoderParser coderParser);
    }

    コアdecodencode
      @Override
        public CoderResult decode(ByteBuffer buffer, ICoderCtx ctx) {
    	final SocketChannelCtx socketChannelCtx = (SocketChannelCtx) ctx;
    	final ClientSocket clientSocket = socketChannelCtx.getClientSocket();
    
    	for (CoderParser coderParser : coderParsers.values()) {
    	    final IPackageCoder packageCoder = coderParser.getPackageCoder();
    	    final ICoder<?, ?>[] linkCoders = coderParser.getCoders();
    	    final IHandle handle = coderParser.getHandle();
    	    Object value = null;
    	    synchronized (buffer) {
    		//     
    		if (socketChannelCtx.getCurrPackageIndex() >= buffer.limit()) {
    		    return CoderResult.valueOf(ResultValue.UNFINISHED);
    		}
    		//      
    		if (!packageCoder.verify(buffer, ctx)) {
    		    continue;
    		}
    		//    
    		value = packageCoder.decode(buffer, ctx);
    		if (value == null) {
    		    //      
    		    return CoderResult.valueOf(ResultValue.UNFINISHED);
    		}
    	    }
    	    //     
    	    if (linkCoders != null) {
    		for (ICoder coder : linkCoders) {
    		    value = coder.decode(value, ctx);
    		    if (value == null) {
    			throw new CoderException("     : " + coder.getClass());
    		    }
    		}
    	    }
    	    //       
    	    value = handle.decode(value, ctx);
    	    clientSocket.readBefore(socketChannelCtx, value);
    	    handle.handle(value, ctx);
    	    clientSocket.readAfter(socketChannelCtx, value);
    
    	    return CoderResult.valueOf(ResultValue.SUCCEED);
    	}
    	return CoderResult.valueOf(ResultValue.NOT_FIND_CODER);
        }
    
        @Override
        public ByteBuffer encode(Object message, ICoderCtx ctx) {
    
    	for (CoderParser coderParser : coderParsers.values()) {
    	    final IPackageCoder packageCoder = coderParser.getPackageCoder();
    	    final ICoder<?, ?>[] linkCoders = coderParser.getCoders();
    	    final IHandle handle = coderParser.getHandle();
    	    //     
    	    if (!handle.verify(message, ctx)) {
    		continue;
    	    }
    	    //       
    	    Object value = handle.encode(message, ctx);
    	    //     
    	    if (linkCoders != null) {
    		for (int i = linkCoders.length - 1; i >= 0; i--) {
    		    ICoder coder = linkCoders[i];
    		    value = coder.encode(value, ctx);
    		    if (value == null) {
    			throw new CoderException("     : " + coder.getClass());
    		    }
    		}
    	    }
    	    //       
    	    value = packageCoder.encode(value, ctx);
    	    if (value != null) {
    		return (ByteBuffer) value;
    	    }
    	    throw new CoderException("      :" + packageCoder.getClass());
    	}
    
    	throw new CoderException("    /      ");
       }   
    
  • 半包/帖包処理:AbstractISocketChannel doReadメソッドの要約は、復号から返された状態に従って処理される.
  • 半パケット:完了状態でない場合、復号を継続し、最後のパケットインデックスから
  • の処理を開始する.
  • 帖包:パケット復号移動パケットインデックスが完了すると、次の復号処理
       boolean run = true;
    	    //     
    	    while (run) {
    		ByteBuffer cpbuffer = socketChannelCtx.coderBegin();
    		cpbuffer.mark();
    		CoderResult coderResult = coderParserManager.decode(cpbuffer, socketChannelCtx);
    		switch (coderResult.getValue()) {
    		case SUCCEED:
    		    break;
    		case NOT_FIND_CODER:
    		    final int readySize = socketChannelCtx.getWriteIndex() - socketChannelCtx.getCurrPackageIndex();
    		    final int headLimit = 255;
    		    if (readySize >= headLimit) {
    			throw new CoderException("    /      ");
    		    }
    		    run = false;
    		    break;
    		case UNFINISHED:
    		case UNKNOWN:
    		case ERROR:
    		default:
    		    run = false;
    		    // TODO throw
    		    break;
    		}
    	  }
  • に従う.
    未完侍加