Java NIOチュートリアルSelector

9930 ワード

今回は、非ブロッキングI/OのSelectorについてお話しします.非ブロッキングTCPとUDPを組み合わせて使用する必要があります.まず,TCPとUDPの非ブロッキングチャネルについて簡単に述べる.
非ブロックI/Oチャネル
コードをつける前に、最も基本的な知識を説明します.TCPとUDPは、SocketChannel、Server SocketChannel、DatagramChannelの3つのチャネルに対応しています.チャンネルを通ることができますOpen()メソッドで初期化します.また、SocketChannelでは、新しい接続がServer SocketChannelに到達すると作成されます(コードに記載されています).また、使用が終了すると閉じる必要があります.
まずはSocketChannelの基本操作を見てみましょう
//  open()  SocketChannel
SocketChannel socketChannel = SocketChannel.open();
//      
socketChannel.connect(new InetSocketAddress("127.0.0.1", 18888));
//        
socketChannel.configureBlocking(false);
while(! socketChannel.finishConnect() ){
	//     
}
//   SocketChannel      

次に、SocketChannelでデータ操作を行う方法について説明します.そのデータの読み書きは他のチャネルの読み書き方式と完全に一致しているが、非ブロックモードではread()とwrite()は何もせずに返されるので、ループで呼び出し、戻り値に注意しなければならない.
ByteBuffer buf = ByteBuffer.allocate(48);
while(socketChannel.read(buf)!=-1) {
	buf.flip();
	while(buf.hasRemaining()) {
		socketChannel.write(buf);
	}		
	buf.clear();
}

SocketChannelは従来のI/OのSocketに相当し、ServerSocketChannelはServerSocketに相当する.また全体的な形式は一貫しており,多重化の考え方を利用してサーバ側で接続を受信した後,クライアントとデータ伝送を行う専用のSocketを生成する.具体的な形式は「serverSocketChannel.accept()」です.接続を受信すると、コードを参照してSocketChannelが返されます.
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//      
serverSocketChannel.socket().bind(new InetSocketAddress(9999));
serverSocketChannel.configureBlocking(false);
while (true) {
	//accept()       ,     ,   SocketChannel;    null
	SocketChannel socketChannel = serverSocketChannel.accept();
	if (socketChannel != null) {
		//   SocketChannel      
	}
}

DatagramChannelはDatagramPacketとよく似ていますが、パケットは当初のbyte配列から現在のByteBufferに変わりました
DatagramChannel channel = DatagramChannel.open();
//      
channel.socket().bind(new InetSocketAddress(9999));
channel.configureBlocking(false);
ByteBuffer buf = ByteBuffer.allocate(48);
/*
 * 1.  UDP         ,     TCP       ,          。
 * 2.receive()       ,        ,   null;
 * 		    ,      byteBuffer,     SocketAddress  (    IP   )
 * 3.  Buffer        ,         
 */
while(channel.receive(buf)==null){
	//     
}
buf.flip();
//      SocketAddress
channel.send(buf, new InetSocketAddress("127.0.0.1", 8888));

DatagramChannelには、ネットワーク内の特定のアドレスに「接続」できる特別な場所があり、TCP接続に非常に似ています.ただし、UDPは接続されていないため、特定のアドレスに接続するとTCPチャネルのように本物の接続は作成されません.DatagramChannelをロックし、特定のアドレスからしかデータを送受信できないようにします.この機能を実現するには、TCPと非常に似たような書き方をしたいので、書かないでください.文書を見に行くでしょう.説明がはっきりしています.
Selector
今から私たちの今日のテーマSelectorに入ります
前書きでは、Selectorとは何か、なぜSelectorを使うのかを簡単に説明しています.ここではもう繰り返しません(あなたはもう忘れたと思います.帰ってもう一度見てみましょう)、私たちはやはり最も基礎的な創建から話します.
Selectorの作成はSelectorを呼び出すことによって行われる.Open()メソッドで完了しました(この部分はopen()で作成されています)
Selector登録
作成すると言ったら、チャンネルとSelectorを組み合わせて使う方法を話さなければなりませんか?一言で言えば、「チャンネルをselectorに登録する」という動作はSelectionKeyチャンネルを通じて行われます.register(Selector sel,int ops,Object att)メソッドが完了しました.
ここで強調したいのは、registerを呼び出すchannelが非ブロックでなければならないことです.これでFileChannelを排除します(通話料をチャージして送るのはだめです).
register()の各パラメータの意味について説明します.最初のパラメータは、channelをどのSelectorに登録するかです.2番目のパラメータは、「interestコレクション」です.Selectorによるチャネルのリスニング時にどのようなイベントに興味があるかを意味し、Connect、Accept、Read、Writeの4つの異なるタイプのイベントをリスニングできます.4つの意味は次のとおりです.
  • Connect(SelectionKey.OP_CONNECT):1つのチャネルが別のサーバに正常に接続されました.接続準備完了
  • Accept(SelectionKey.OP_ACCEPT):新しい接続を受信する準備ができているサーバSocketchannel--「受信準備完了」
  • Read(SelectionKey.OP_READ):1つのチャネルの読み取り可能なデータが準備されている--「読み取り準備完了」
  • Write(SelectionKey.OP_WRITE):1つのチャネルの書き込み可能なデータが準備されています.「書き込み準備完了」
  • P.S:括弧の中にあるのは、2番目のパラメータopsの位置に記入するint定数です.私たちはこの4つを「興味のある事件」と呼び、後で何度もこの概念に言及します.
    複数のイベントに興味がある場合は、次のように定数を「ビットまたは」オペレータで接続できます.int ops = SelectionKey.OP_READ | SelectionKey.OP_WRITE;register()メソッドの3番目のパラメータは、同じSelectorに登録されている他のチャネルを区別するために使用される各チャネルの識別子として使用できるObjectオブジェクトである追加オブジェクトです.他のオブジェクトをアタッチすることもできます.
    最後にregister()メソッドの戻り値を見てみましょう.戻り値はSelectionKeyオブジェクトです.これは重要なオブジェクトです.次に、SelectionKeyについて説明します.
    SelectionKey
    Selectorは、いくつかのチャネル内の関心のあるイベントが発生したことを発見すると、対応するチャネルのSelectionKeyオブジェクトを返します.
    SelectionKeyオブジェクトには多くの情報が含まれています.例えば、チャネルに属するチャネルオブジェクトはselectionKey.channel()メソッドは得られます.チャネルの追加オブジェクトもあり、selectionKey.attachment()メソッドは得られます.チャネルの関心時間が次の4つの方法で得られることもできます.
  • boolean selectionKey.isAcceptable()
  • boolean selectionKey.isConnectable()
  • boolean selectionKey.isReadable()
  • boolean selectionKey.isWritable()

  • さらに詳細については、文書を見に行くを参照してください.
    Selector.select()
    以前の作成、登録などの準備が完了したら、準備したデータが来るのを待つことができます.この場合、興味のあるイベントがいくつあるかを知る必要があります.この時、次の3つの方法でこの任務を完成させます.それぞれは
  • int selector.select()
  • int selector.select(long timeout)
  • int selector.selectNow()

  • まず,この3つの方法の正確な役割を述べ,それらはいずれも複数のチャネルが準備状態になったことを返す.違いは次のとおりです.
  • select()はブロックされており、チャネルの準備が整うまで待っています.
  • select(long timeout)もブロックされ、チャネルの準備が完了するか、所定のtimeout時間を超えて0に戻るまで待機します.
  • selectNow()は非ブロックであり、チャネルの準備ができていない場合は直接0に戻ります.

  • Selector.selectedKeys()
    select()メソッドでは、いくつかのチャネルの準備が完了していることがわかります.次のメソッドを呼び出して、対応するいくつかのチャネルのselectedKeyを返すことができます.Set<SelectionKey> selectedKeys = selector.selectedKeys() selectedKeysを取得すると、対応する処理ができます.1つのselectionKeyを処理するたびにSetから削除する必要があることを強調し、次回の準備ができたら再度Setに追加することができます.
    ここでSelectorについての知識は基本的に終わりますので、一つのサーバ側、クライアントが文字列を送受信する例で今回の説明を終わりましょう.
    クライアント
    public class HansClient {
    	//     SocketChannel Selector  
    	private Selector selector = null;
    	//    SocketChannel
    	private SocketChannel sc = null;
    
    	public void init() throws IOException {
    		selector = Selector.open();
    		InetSocketAddress isa = new InetSocketAddress("127.0.0.1", 30000);
    		//   open              SocketChannel
    		sc = SocketChannel.open(isa);
    		//    sc        
    		sc.configureBlocking(false);
    		//  SocketChannel       Selector
    		sc.register(selector, SelectionKey.OP_READ);
    		//              
    		new ClientThread().start();
    		//        
    		Scanner scan = new Scanner(System.in);
    		while (scan.hasNextLine()) {
    			//       
    			String line = scan.nextLine();
    			//            SocketChannel 
    			sc.write(StandardCharsets.UTF_8.encode(line));
    		}
    	}
    
    	//             
    	private class ClientThread extends Thread {
    		public void run() {
    			try {
    				while (selector.select() > 0) {
    					//        IO  Channel   SelectionKey
    					for (SelectionKey sk : selector.selectedKeys()) {
    						//        SelectionKey
    						selector.selectedKeys().remove(sk);
    						//    SelectionKey   Channel       
    						if (sk.isReadable()) {
    							//   NIO  Channel    
    							SocketChannel sc = (SocketChannel) sk.channel();
    							ByteBuffer buff = ByteBuffer.allocate(1024);
    							String content = "";
    							while (sc.read(buff) > 0) {
    								sc.read(buff);
    								buff.flip();
    								content += StandardCharsets.UTF_8.decode(buff);
    							}
    							//          
    							System.out.println("    :" + content);
    						}
    					}
    				}
    			} catch (IOException ex) {
    				ex.printStackTrace();
    			}
    		}
    	}
    
    	public static void main(String[] args) throws IOException {
    		new HansClient().init();
    	}
    }

    サーバ側
    public class HansServer {
    	//       Channel   Selector
    	private Selector selector = null;
    
    	public void init() throws IOException {
    		selector = Selector.open();
    		//   open           ServerSocketChannel  
    		ServerSocketChannel server = ServerSocketChannel.open();
    		InetSocketAddress isa = new InetSocketAddress("127.0.0.1", 30000);
    		//   ServerSocketChannel     IP  
    		server.socket().bind(isa);
    		//   ServerSocket        
    		server.configureBlocking(false);
    		//  server     Selector  
    		server.register(selector, SelectionKey.OP_ACCEPT);
    		while (selector.select() > 0) {
    			//     selector        SelectionKey
    			for (SelectionKey sk : selector.selectedKeys()) {
    				//  selector     Key         SelectionKey
    				selector.selectedKeys().remove(sk);
    				//   sk               
    				if (sk.isAcceptable()) {
    					//   accept      ,         SocketChannel
    					SocketChannel sc = server.accept();
    					//          
    					sc.configureBlocking(false);
    					//   SocketChannel    selector
    					sc.register(selector, SelectionKey.OP_READ);
    				}
    				//   sk            
    				if (sk.isReadable()) {
    					//    SelectionKey   Channel, Channel       
    					SocketChannel sc = (SocketChannel) sk.channel();
    					//            ByteBuffer
    					ByteBuffer buff = ByteBuffer.allocate(1024);
    					String content = "";
    					//       
    					try {
    						while (sc.read(buff) > 0) {
    							buff.flip();
    							content += StandardCharsets.UTF_8.decode(buff);
    						}
    						//     sk   Channel       
    						System.out.println("=====" + content);
    					}
    					//       sk   Channel     ,    Channel
    					//    Client     ,   Selector   sk   
    					catch (IOException ex) {
    						//  Selector      SelectionKey
    						sk.cancel();
    						if (sk.channel() != null) {
    							sk.channel().close();
    						}
    					}
    					//   content     0,        
    					if (content.length() > 0) {
    						//    selector      SelectKey
    						for (SelectionKey key : selector.keys()) {
    							//    key   Channel
    							Channel targetChannel = key.channel();
    							//    channel SocketChannel  
    							if (targetChannel instanceof SocketChannel) {
    								//          Channel 
    								SocketChannel dest = (SocketChannel) targetChannel;
    								dest.write(StandardCharsets.UTF_8.encode(content));
    							}
    						}
    					}
    				}
    			}
    		}
    	}
    
    	public static void main(String[] args) throws IOException {
    		new HansServer().init();
    	}
    }

    今回の説明はここまでで、本シリーズの説明もここまでです.もしあなたがここを見ることができたら、私は本当に嬉しいです.何事も私と議論することができます.