非同期チャネルと非同期演算結果


非同期チャネルと非同期演算結果
以下の内容は孫衛琴の書いた「Javaネットワークプログラミング核心技術詳細解」の第12章を参照してください。ソースコードのダウンロード先:http://lesson.javathinker.net/javanet/javanetsourcecode.rar
JDK 7から、非同期チャネルを表すAynchronousSocketChanel類とAynchronousServer SocketChanel類が導入されました。この2つのタイプの役割は、SocketChanel類とServerSocketChanel類と似ています。違いは、非同期チャネルのいくつかの方法はいつも非閉塞モード式を採用しています。方法を保存するための非同期演算結果。
AynchronousSocketChanel類には以下の非閉塞方法があります。
  • Future connect:リモートホストに接続する。
  • Future read(ByteBuffer dst):チャネルからデータを読み込み、ByteBufferに格納する。Futureオブジェクトには、実際にチャネルから読み出されたバイト数が含まれています。
  • Future write(ByteBuffer src):ByteBufferのデータをチャネルに書き込みます。Futureオブジェクトには実際の書き込みチャネルのバイト数が含まれています。
  • AynchronousServerSocketChanel類には以下のような非閉塞方法があります。
  • Future accept():顧客接続要求を受け付ける。Futureオブジェクトには接続確立後に作成されたAynchronousSocketChanelオブジェクトが含まれています。
  • 非同期チャネルを使用して、プログラムを並列に複数の非同期動作を実行させることができる。
    SocketAddress socketAddress=……;
    AsynchronousSocketChannel client= AsynchronousSocketChannel.open();
    //      
    Future connected=client.connect(socketAddress);
    ByteBuffer byteBuffer=ByteBuffer.allocate(128);
    
    //      
    //……
    
    //      
    connected.get();  
    //    
    Future future=client.read(byteBuffer);
    
    //      
    //……
    
    //           
    future.get();
    
    byteBuffer.flip();
    WritableByteChannel out=Channels.newChannel(System.out);
    out.write(byteBuffer);
    以下のPingCient類は非同期チャネルの使い方を実証した。ユーザーが入力したドメイン名(つまり、ネットワーク上のホストの名前)を継続的に受信し、このホスト上の80ポートと接続を確立し、最後に接続を確立するまでの時間を印刷します。指定されたホストにプログラムが接続できない場合は、関連エラー情報を印刷します。ユーザが「bye」を入力すると、プログラムは終了します。PingCientクラスを実行する際にユーザが入力した情報とプログラム出力の情報です。ここでは、非斜体フォントを採用した行は、ユーザがコンソールに入力した情報を表し、斜体フォントを用いた行はプログラムの出力結果を表します。
    C:\chapter 04\classis>java nonblock.PingCientwn.abc 888 cowwww.javathinker.netping www.abc 888.comの結果:接続失敗ping www.javathinker.netの結果:20 msbye
    以上のプリント結果から、PingCientがリモートホストに接続されたwww.javathinker.netは20 msを使っていますが、www.abc 888 comホストに接続できませんでした。印刷結果からも、PingCientは非同期通信方式を採用しており、ユーザーがホスト名を入力すると、プログラムがホスト名に対する処理結果を出力するまで待つ必要がなく、次のホスト名を入力し続けることができる。各ホスト名に対する処理結果は接続が成功したか失敗した後にプリントアウトします。
    /* PingClient.java */
    package nonblock;
    import java.net.*;
    ……
    class PingResult {  //           
      InetSocketAddress address;
      long connectStart;  //        
      long connectFinish = 0;  //        
      String failure;
      Future connectResult;  //           
      AsynchronousSocketChannel socketChannel;
      String host;
      final String ERROR="    ";
    
      PingResult(String host) {
          try {
              this.host=host;
              address =
                  new InetSocketAddress(InetAddress.getByName(host),80);
          } catch (IOException x) {
              failure = ERROR;
          }
      }  
    
      public void print() {  //             
          String result;
          if (connectFinish != 0)
              result = Long.toString(connectFinish - connectStart) + "ms";
          else if (failure != null)
              result = failure;
          else
              result = "Timed out";
          System.out.println("ping "+ host+"   " + " : " + result);
      }
    }
    
    public class PingClient{
      //    PingResult     
      private LinkedList pingResults=
                   new LinkedList();
      boolean shutdown=false;
      ExecutorService executorService;
    
      public PingClient()throws IOException{
        executorService= Executors.newFixedThreadPool(4);
        executorService.execute(new Printer());
        receivePingAddress();
      }
    
      public static void main(String args[])throws IOException{
        new PingClient();
      }
    
      /**            ,      PingHandler   */  
      public void receivePingAddress(){
        try{
          BufferedReader localReader=new BufferedReader(
                        new InputStreamReader(System.in));
          String msg=null;
          //           
          while((msg=localReader.readLine())!=null){
            if(msg.equals("bye")){
              shutdown=true;
              executorService.shutdown();
              break;
            }
            executorService.execute(new PingHandler(msg));
          }
        }catch(IOException e){ }
      }
    
      /**         ,          PingResults      */
      public void addPingResult(PingResult pingResult) {
         AsynchronousSocketChannel socketChannel = null;
         try {
           socketChannel = AsynchronousSocketChannel.open();
    
           pingResult.socketChannel=socketChannel;
           pingResult.connectStart = System.currentTimeMillis();
    
           synchronized (pingResults) {
             // pingResults       PingResult  
             pingResults.add(pingResult);
             pingResults.notify();
           }
    
           Future connectResult=
               socketChannel.connect(pingResult.address);
           pingResult.connectResult = connectResult;
        }catch (Exception x) {
          if (socketChannel != null) {
            try {socketChannel.close();} catch (IOException e) {}
          }
          pingResult.failure = pingResult.ERROR;
        }
      }
    
      /**   PingResults                  */
      public void printPingResults() {
        PingResult pingResult = null;
        while(!shutdown ){
          synchronized (pingResults) {
            while (!shutdown && pingResults.size() == 0 ){
              try{
                pingResults.wait(100);
              }catch(InterruptedException e){e.printStackTrace();}
            }
    
            if(shutdown  && pingResults.size() == 0 )break;
            pingResult=pingResults.getFirst();
    
            try{
              if(pingResult.connectResult!=null)
                pingResult.connectResult.get(500,TimeUnit.MILLISECONDS);
            }catch(Exception e){
                pingResult.failure= pingResult.ERROR;
            }
    
            if(pingResult.connectResult!=null
               && pingResult.connectResult.isDone()){
    
              pingResult.connectFinish = System.currentTimeMillis();
            }
    
            if(pingResult.connectResult!=null
               && pingResult.connectResult.isDone()
               || pingResult.failure!=null){
    
               pingResult.print();
               pingResults.removeFirst();
               try {
                  pingResult.socketChannel.close();
                } catch (IOException e) { }
             }
          }
        }
      }
    
      /**         ,    PingResult  ,
              PingResults      */
      public class PingHandler implements Runnable{
        String msg;
        public PingHandler(String msg){
            this.msg=msg;  
        }
        public void run(){
            if(!msg.equals("bye")){
              PingResult pingResult=new PingResult(msg);
              addPingResult(pingResult);
            }
        }
      }
    
      /**   PingResults                  */
      public class Printer implements Runnable{
        public void run(){
            printPingResults();
        }
      }
    }
    以上のPingResult類は一つのホストに接続した実行結果を表します。PingCient類のPingResultsキューは、すべてのPingResultオブジェクトを保存します。PingCientクラスはまた、特定のタスクを表す2つの内部クラスを定義しています。
  • PingHandlerタスククラス:非同期チャネルを介してクライアント入力のホストアドレスに接続する試みを行い、PingResultオブジェクトを作成し、接続動作の非同期演算結果を含む。PingResultオブジェクトをPingResults結果列に追加します。
  • Printerタスククラス:PingResults結果キューの印刷を担当します。印刷済みのPingResultオブジェクトはPingResultsキューから削除されます。
  • PingCient類のメインスレッドは以下の操作を完了しました。
  • はスレッドプールを作成する。
  • は、スレッド池にPrinterタスクを提出する。
  • は、クライアントが入力したホストアドレスを継続的に読み取り、PingHandlerタスクをスレッド池に提出する。クライアントが「bye」を入力するとプログラムが終了します。
  • PingCient類のスレッド池は以下の操作を完了しました。
  • は、Printerタスクを実行する。
  • はPingHanderタスクを実行します。