Java NIOのセレクタ分析

10270 ワード

目次
概要
セレクタSelector
セレクトキーSelectionKey
Selectorによるチャネルの選択
ケース
 
概要
従来のIOストリームリクエストは、リクエストを処理するためにスレッドが必要であり、リクエスト数が膨大であれば、オペレーティングシステムにとってスレッドは一定のメモリを消費し、スレッドコンテキストの切り替えにもオーバーヘッドがかかります.セレクタSelectorはjava NIOで複数のチャネルの準備状況(読み書きの準備ができているかどうか)を検出し、単一のスレッドを使用して複数のチャネルを管理します.スレッドは、セレクタに登録されているチャネルが準備完了状態にあるか、周期的なポーリングがチャネルが準備完了状態にあるかどうかを検出するまで休眠できます.1つ以上の選択可能なチャネルがセレクタオブジェクトに登録されると、セレクタとチャネルを表すキーが返され、選択キーオブジェクトには対応するチャネルとセレクタが含まれています.なお、セレクタはチャネルとともに使用され、チャネルは選択可能(すなわち、SelectableChannelクラスを継承)である必要があり、非ブロックモードであるため、FileChannelはセレクタSelectorと一緒に使用できず、すべてのソケットチャネルが可能である.
セレクタSelector
1.作成
セレクタオブジェクトは、静的メソッドopen()をチャネル呼び出してインスタンスを作成することができる.
Selector selector = Selector.open();

2.登録チャネル
選択可能チャネル(SelectableChannel)は、すべてのソケットチャネルを含み、FileChannelチャネルを含まない場合はセレクタに登録し、興味のある操作を指定できます.チャネルをセレクタに登録する前に、チャネルを非ブロックに設定する必要があります.チャネル内でregister()を定義し、ブロックモードを設定する方法を選択できます.これらのチャネルを管理するのは本当にセレクタであり、セレクタとチャネルを表すキーも管理されます.
channel.configureBlocking(false);
SelectionKey key = channel.register(selector,int ops);

register()メソッドの2番目のパラメータは、関心のある操作を表し、セレクタはチャネル関心のあるイベントを監視し、4つの異なるタイプのイベントまたは操作を含む:リード(read)、ライト(write)、接続(connect)、受け入れ(accept)、すべての操作がすべてのチャネルで使用できるわけではありません.たとえば、SocketChannelはaccept()操作をサポートしていません.呼び出しチャネル内のメソッドvalidOps()メソッドは、チャネルがサポートできるアクションを取得することができ、4つの異なるアクションは、選択キーSelectionKeyの4つの定数で表される.
  • SelectionKey.OP_CONNECT.
  • SelectionKey.OP_ACCEPT.
  • SelectionKey.OP_READ.
  • SelectionKey.OP_WRITE.

  • 2つ以上の関心のあるイベントについては、ビット操作「または」接続を使用することができる.
    int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE

    3.クローズ
    セレクタを使用しない場合、close()を呼び出してすべての占有リソースを解放し、すべての選択キーを無効に設定します.
    4.wakeUp()
    あるスレッドがselect()メソッドを呼び出すとブロックされ、チャネルが準備されていない場合、別のスレッドを使用して最初のスレッドがselect()メソッドを呼び出すオブジェクトの上でwakeUp()を呼び出すと、select()メソッドにブロックされたスレッドをすぐに返すことができます.
    セレクトキーSelectionKey
    いずれのチャネルとセレクタの登録関係もSelectionKeyオブジェクトにカプセル化されている.選択キーオブジェクトには、次の属性が含まれます.
  • interest集合
  • ready集合
  • channelとselector
  • アタッチメント
  • 1.interestコレクション
    チャネルとセレクタのコンビネーションが関係するオブジェクトを表すために使用されます.現在のinterestコレクションは、キーオブジェクトのinterestOps()メソッドを呼び出すことで取得できます.interestコレクションと選択キー定数を指定してビットアンド操作を行い、interestコレクションに特定のイベントが含まれているかどうかを確認できます.
    int interestSet = selectionKey.interestOps();
    boolean isInterestedInAccept = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT;
    boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
    boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
    boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
    

    2.ready集合
    チャネルが実行するアクションの準備ができていることを示し、キーオブジェクトのメソッドreadOps()を呼び出して関連チャネルが準備されているアクションを取得します.readyコレクションはinterestコレクションのサブセットであり、interestコレクションでselect()が最後に呼び出された後に準備されたアクションを示します.指定した操作に対するinterestコレクションの「ビットアンド」操作により、selectionKey.readOps()などのチャネル関連操作の準備を表示できます. &SelectionKey.OP_WRITEは、readyコレクションに読み取り操作が含まれているかどうかを確認します.ただし、選択キーSelectionKeyオブジェクトには、次の4つの方法があります.
    int readySet = selectionKey.readyOps()
    selectionKey.isAcceptable();
    selectionKey.isConnectable();
    selectionKey.isReadable();
    selectionKey.isWritable();
    

    3.channelとselector
    選択キーオブジェクトでは、登録されたチャネルとセレクタを取得できます.
    Channel  channel  = selectionKey.channel();
    Selector selector = selectionKey.selector();
    

    4.オブジェクトの添付
    選択キーオブジェクトには、選択キーに添付ファイルを追加し、後で取得できる方法があります.任意のオブジェクトをキーに関連付ける便利な方法です.
    selectionKey.attach(theObject);
    Object attachedObj = selectionKey.attachment();
    

    さらに、SelectableChannelは、Objectタイプのパラメータを受信するためにregister()メソッドのリロードバージョンを提供します.チャネルが選択キーに登録されている場合は、添付ファイルを直接追加できます.
    SelectionKey key = channel.register (selector, SelectionKey.OP_READ, myObject);
       
    SelectionKey key = channel.register (selector, SelectionKey.OP_READ);
    key.attach (myObject)

    Selectorによるチャネルの選択
    public abstract int select( ) throws IOException;
    public abstract int select (long timeout) throws IOException;
    public abstract int selectNow( ) throws IOException;
    public abstract Set keys( )
    public abstract Set selectedKeys( )
  • select()---登録されたイベントにチャネルが準備されるまでブロックされます.
  • select(long timeout)----timeoutミリ秒までブロック時間はブロックされません.
  • selectNow()---完全非ブロックモードで、どのチャネルもすぐに戻ります.チャネルの準備ができていない場合は、0に戻ります.
  • keys()----セレクタに関連付けられた登録済みキーのセットは、登録済みキーがすべて有効であるわけではありません.keys()メソッドで表示できます.
  • selectedKeys()---登録キーのサブセットであり、セット内のスレーブメンバーが関連するチャネルであるとセレクタによって準備済みチャネルと判定される.

  • select()メソッドが返すint値は、準備されたすべてのチャネルの総数ではなく、select()が最後に呼び出された後にどれだけのチャネルが準備されているかを示します.select()準備完了のチャネルが呼び出されたが、今回の呼び出し準備完了のチャネルカウントにはカウントされません.
    selectorのメソッドselectedKeys()メソッドで返される「選択キーのセット」は、準備済みのチャネルセットであることを示します.ready集合とは同じ概念ではなく,ready集合はチャネル準備の動作を表す.次のように、選択キーを巡回し、1つの選択キーで対応する準備ができている操作を表示します.
    Selector selector = Selector.open();
    channel.configureBlocking(false);
    SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
    while(true) {
      int readyChannels = selector.select();
      if(readyChannels == 0) continue;
      Set selectedKeys = selector.selectedKeys();
      Iterator keyIterator = selectedKeys.iterator();
      while(keyIterator.hasNext()) {
        SelectionKey key = keyIterator.next();
        if(key.isAcceptable()) {
            // a connection was accepted by a ServerSocketChannel.
        } else if (key.isConnectable()) {
            // a connection was established with a remote server.
        } else if (key.isReadable()) {
            // a channel is ready for reading
        } else if (key.isWritable()) {
            // a channel is ready for writing
        }
        keyIterator.remove();
      }
    

    ケース
    public class SocketChannelClient {
      private Selector selector=null;
      
      private SocketChannelClient initChannel() throws IOException, ClosedChannelException {
       //    
        SocketChannel channel = SocketChannel.open();
        //        
        channel.configureBlocking(false);
        //     1234
        channel.connect(new InetSocketAddress(1234));
         //     
        selector = Selector.open();
        //          , SelectionKey.OP_CONNECT  
        channel.register(selector, SelectionKey.OP_CONNECT);
        return this;
      }
      public static void main(String[] args) throws IOException {
        new SocketChannelClient().initChannel().listen();
      }
      
      private ByteBuffer buffer = ByteBuffer.allocate(1024);
      
      private void listen() throws IOException {
        System.out.println("     ....");
          //  selector
        while(true) {
          selector.select();
            //            
          Iterator iterator = selector.selectedKeys().iterator();
          while(iterator.hasNext()) {
            SelectionKey key = iterator.next();
            //       ,       ,     
            if(key.isConnectable()) {
              SocketChannel channel = (SocketChannel)key.channel();
              if(channel.isConnectionPending()) {
                channel.finishConnect();
              }
              channel.configureBlocking(false);
              channel.write(writeInfoToServer("this is information from client..."));
              //          ,SelectionKey.OP_READ  
              channel.register(selector, SelectionKey.OP_READ);
              //      ,        
            }else if(key.isReadable()) {
              receiveInfoFromServer(key);
            }
            //            
            iterator.remove();
          }
        }
      }
      private void receiveInfoFromServer(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel)key.channel();
        buffer.clear();
        int count;
        while((count= channel.read(buffer))>0) {
          buffer.flip();
          String info = new String(buffer.array());
          System.out.println("receive information from Server:"+info);
        }
      }
      private static ByteBuffer writeInfoToServer(String info) throws UnsupportedEncodingException {
       return ByteBuffer.wrap(info.getBytes("UTF-8"));
      }
    }
    
    public class SocketChannelServer {
      private static final int port = 1234;
      Selector selector = null;
      
      private SocketChannelServer initSocketChannel() throws IOException, ClosedChannelException {
        //    ServerSocketChannel   
        ServerSocketChannel socketChannel = ServerSocketChannel.open();
        ServerSocket serverSocket = socketChannel.socket();
        //        
        socketChannel.configureBlocking(false);
        serverSocket.bind(new InetSocketAddress(port));
         //       Selector
        selector = Selector.open();
        //           
        socketChannel.register(selector, SelectionKey.OP_ACCEPT);
        return this;
      }
    
      public static void main(String[] args) throws IOException {
        new SocketChannelServer().initSocketChannel().listen();
      }
      private void listen() throws IOException {
        System.out.println("Listening on port "+port);
        //  selector
        while(true) {
          int select = selector.select();
          if(select==0) {
            continue;
          }
          //        
          Iterator iterator = selector.selectedKeys().iterator();
          while(iterator.hasNext()) {
            SelectionKey key = iterator.next();
            //         
            if(key.isAcceptable()) {
              ServerSocketChannel  serverChannel =(ServerSocketChannel) key.channel();
              SocketChannel channel = serverChannel.accept();
              channel.configureBlocking(false);
              channel.register(selector, SelectionKey.OP_READ);
              sayHello(channel);
            }
             //        ,        
            if(key.isReadable()) {
              readInfoFromClient(key);
            }
            iterator.remove();
          }
        }
      }
     
      private void readInfoFromClient(SelectionKey key) throws IOException {
       SocketChannel channel = (SocketChannel)key.channel();
       int count;
       buffer.clear();
       while((count= channel.read(buffer))>0) {
         buffer.flip();
         String info = new String(buffer.array());
         System.out.println("receive information from client: "+info);
       }
      }
      private ByteBuffer buffer = ByteBuffer.allocate(1024);
      private void sayHello(SocketChannel channel) throws IOException {
        buffer.clear();
        buffer.put("welcome to here!
    \r".getBytes()); buffer.flip(); channel.write(buffer); } }