PipedWriterとPipedReaderのソースコード分析_動力ノードJava学院の整理


PipedWriterとPipedReaderのソースコードの分析
1.PipedWriterソース(jdk 1.7.40ベース) 

package java.io;
 public class PipedWriter extends Writer {
   //  PipedWriter   PipedReader  
   private PipedReader sink;
   // PipedWriter     
   private boolean closed = false;
   //     ,     PipedReader
   public PipedWriter(PipedReader snk) throws IOException {
     connect(snk);
   }
   //     
   public PipedWriter() {
   }
   //  “PipedWriter”   “PipedReader”  。
   public synchronized void connect(PipedReader snk) throws IOException {
     if (snk == null) {
       throw new NullPointerException();
     } else if (sink != null || snk.connected) {
       throw new IOException("Already connected");
     } else if (snk.closedByReader || closed) {
       throw new IOException("Pipe closed");
     }
     sink = snk;
     snk.in = -1;
     snk.out = 0;
     //   “PipedReader” “PipedWriter”      
     // connected PipedReader    ,    “PipedReader PipedWriter”      
     snk.connected = true;
   }
   //      c  “PipedWriter” 。
   //  c  “PipedWriter”  ,   c   “PipedReader”
   public void write(int c) throws IOException {
     if (sink == null) {
       throw new IOException("Pipe not connected");
     }
     sink.receive(c);
   }
   //      b  “PipedWriter” 。
   //    b  “PipedWriter”  ,       “PipedReader”
   public void write(char cbuf[], int off, int len) throws IOException {
     if (sink == null) {
       throw new IOException("Pipe not connected");
     } else if ((off | len | (off + len) | (cbuf.length - (off + len))) < ) {
       throw new IndexOutOfBoundsException();
     }
     sink.receive(cbuf, off, len);
   }
   //   “PipedWriter”。
   //      “PipedReader” notifyAll();
   //     “PipedReader”          ,        (    PipedWriter   )  “PipedWriter”  。
   public synchronized void flush() throws IOException {
     if (sink != null) {
       if (sink.closedByReader || closed) {
         throw new IOException("Pipe closed");
       }
       synchronized (sink) {
         sink.notifyAll();
       }
     }
   }
   //   “PipedWriter”。
   //     ,   receivedLast()  “PipedReader”     。
   public void close() throws IOException {
     closed = true;
     if (sink != null) {
       sink.receivedLast();
     }
   }
 }
2.PipedReaderソース(jdk 1.7.40ベース)    

package java.io;
  public class PipedReader extends Reader {
    // “PipedWriter”       
    boolean closedByWriter = false;
    // “PipedReader”       
    boolean closedByReader = false;
    // “PipedReader” “PipedWriter”       
    //   PipedWriter connect()         true
   boolean connected = false;
   Thread readSide;  //   “  ”     
   Thread writeSide;  //  “  ”       
   // “  ”     
  private static final int DEFAULT_PIPE_SIZE = 1024;
   //    
   char buffer[];
   //          。in==out   ,  “     ”      。
   int in = -;
   //          。in==out   ,  “     ”      。
   int out = ;
   //     :   “PipedReader”   “PipedWriter”
   public PipedReader(PipedWriter src) throws IOException {
     this(src, DEFAULT_PIPE_SIZE);
   }
   //     :   “PipedReader”   “PipedWriter”,  “     ”
   public PipedReader(PipedWriter src, int pipeSize) throws IOException {
     initPipe(pipeSize);
     connect(src);
   }
   //     :        1024  
   public PipedReader() {
     initPipe(DEFAULT_PIPE_SIZE);
   }
   //     :        pipeSize
   public PipedReader(int pipeSize) {
     initPipe(pipeSize);
   }
   //    “  ”:       
   private void initPipe(int pipeSize) {
    if (pipeSize <= 0) {
       throw new IllegalArgumentException("Pipe size <= 0");
     }
     buffer = new char[pipeSize];
   }
   //  “PipedReader” “PipedWriter”  。
   //    ,      PipedWriter connect()  
   public void connect(PipedWriter src) throws IOException {
     src.connect(this);
   }
   //   int     b。
   //     PipedWriter write(int b)     
   synchronized void receive(int c) throws IOException {
     //       
     if (!connected) {
       throw new IOException("Pipe not connected");
     } else if (closedByWriter || closedByReader) {
       throw new IOException("Pipe closed");
     } else if (readSide != null && !readSide.isAlive()) {
       throw new IOException("Read end dead");
     }
     //   “    ”   
     writeSide = Thread.currentThread();
     //   “         ,         ” ,
    //    1000ms  “    ”,       :  “           ”,      。
     while (in == out) {
       if ((readSide != null) && !readSide.isAlive()) {
         throw new IOException("Pipe broken");
       }
       /* full: kick any waiting readers */
       notifyAll();
       try {
        wait(1000);
       } catch (InterruptedException ex) {
         throw new java.io.InterruptedIOException();
       }
     }
    if (in < 0) {
     in = 0;
      out = 0;
     }
     buffer[in++] = (char) c;
     if (in >= buffer.length) {
      in = 0;
     }
   }
   //       b。
   synchronized void receive(char c[], int off, int len) throws IOException {
     while (--len >= ) {
      receive(c[off++]);
     }
   }
   //  PipedWriter    ,   
   synchronized void receivedLast() {
     closedByWriter = true;
     notifyAll();
   }
   //    (   )       ,      int  
   public synchronized int read() throws IOException {
     if (!connected) {
       throw new IOException("Pipe not connected");
     } else if (closedByReader) {
       throw new IOException("Pipe closed");
     } else if (writeSide != null && !writeSide.isAlive()
          && !closedByWriter && (in < )) {
       throw new IOException("Write end dead");
     }
     readSide = Thread.currentThread();
    int trials = 2;
    while (in < 0) {
       if (closedByWriter) {
         /* closed by writer, return EOF */
         return -1;
       }
       if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < )) {
         throw new IOException("Pipe broken");
       }
       /* might be a writer waiting */
       notifyAll();
       try {
        wait(1000);
       } catch (InterruptedException ex) {
         throw new java.io.InterruptedIOException();
       }
     }
     int ret = buffer[out++];
     if (out >= buffer.length) {
      out = 0;
     }
     if (in == out) {
       /* now empty */
      in = -1;
     }
     return ret;
   }
   //    (   )     ,        b 
   public synchronized int read(char cbuf[], int off, int len) throws IOException {
     if (!connected) {
       throw new IOException("Pipe not connected");
     } else if (closedByReader) {
       throw new IOException("Pipe closed");
     } else if (writeSide != null && !writeSide.isAlive()
         && !closedByWriter && (in < 0)) {
       throw new IOException("Write end dead");
     }
    if ((off < 0) || (off > cbuf.length) || (len < 0) ||
      ((off + len) > cbuf.length) || ((off + len) < 0)) {
       throw new IndexOutOfBoundsException();
     } else if (len == 0) {
      return 0;
     }
     /* possibly wait on the first character */
     int c = read();
    if (c < 0) {
      return -1;
     }
     cbuf[off] = (char)c;
    int rlen = 1;
    while ((in >= 0) && (--len > 0)) {
       cbuf[off + rlen] = buffer[out++];
       rlen++;
       if (out >= buffer.length) {
       out = 0;
       }
       if (in == out) {
         /* now empty */
         in = -;
       }
     }
     return rlen;
   }
   //               
   public synchronized boolean ready() throws IOException {
     if (!connected) {
       throw new IOException("Pipe not connected");
     } else if (closedByReader) {
       throw new IOException("Pipe closed");
     } else if (writeSide != null && !writeSide.isAlive()
          && !closedByWriter && (in < )) {
       throw new IOException("Write end dead");
     }
    if (in < 0) {
       return false;
     } else {
       return true;
     }
   }
   //   PipedReader
   public void close() throws IOException {
     in = -;
     closedByReader = true;
   }
 }

次に、マルチスレッドでPipedWriterとPipedReaderを介して通信する例を見てみましょう。例には3つのクラスが含まれています。Receiver.java、Sender.java、PipeTest.java。
Receiver.javaのコードは以下の通りです。  

 import java.io.IOException;  
 import java.io.PipedReader;  
 @SuppressWarnings("all")  
 /** 
  *       
  */  
 public class Receiver extends Thread {  
   //        。
   //   “     (PipedWriter)”    ,
   //       “     ”   ,      。
   private PipedReader in = new PipedReader();  
   //   “       ”
   public PipedReader getReader(){  
     return in;  
   }  
   @Override
   public void run(){  
     readMessageOnce() ;
     //readMessageContinued() ;
   }
   //  “     ”      
   public void readMessageOnce(){  
     //   buf    2048   ,      “     ”   1024   。
    //   ,“     ”          1024   。
     char[] buf = new char[2048];  
     try {  
       int len = in.read(buf);  
      System.out.println(new String(buf,0,len));  
       in.close();  
     } catch (IOException e) {  
       e.printStackTrace();  
     }  
   }
   //  “     ”  >1024    ,     
  public void readMessageContinued(){
    int total=0;
     while(true) {
       char[] buf = new char[];
       try {
         int len = in.read(buf);
         total += len;
         System.out.println(new String(buf,,len));
        //         >1024,     。
       if (total > 1024)
           break;
       } catch (IOException e) {
         e.printStackTrace();
       }
     }
     try {
       in.close(); 
     } catch (IOException e) {  
       e.printStackTrace();  
     }  
   }  
 }
 
Sender.java     :
 
 import java.io.IOException;  
  import java.io.PipedWriter;  
 @SuppressWarnings("all")
 /** 
  *       
  */  
 public class Sender extends Thread {  
   //        。
   //   “     (PipedReader)”    ,
   //           “     ”   ,       “     ”    。
   private PipedWriter out = new PipedWriter();
   //   “     ”  
   public PipedWriter getWriter(){
     return out;
   }  
   @Override
   public void run(){  
     writeShortMessage();
     //writeLongMessage();
   }  
   //  “     ”           :"this is a short message" 
   private void writeShortMessage() {
     String strInfo = "this is a short message" ;
     try {
       out.write(strInfo.toCharArray());
       out.close();  
     } catch (IOException e) {  
       e.printStackTrace();  
     }  
   }
   //  “     ”          
   private void writeLongMessage() {
     StringBuilder sb = new StringBuilder();
    //   for    1020   
     for (int i=0; i<102; i++)
      sb.append("0123456789");
    //    26   。
    sb.append("abcdefghijklmnopqrstuvwxyz");
    // str     1020+26=1046   
    String str = sb.toString();
    try {
      //  1046      “     ” 
       out.write(str);
       out.close();
     } catch (IOException e) {
       e.printStackTrace();
     }
   }
 }
 
PipeTest.javaのコードは以下の通りです。 

 import java.io.PipedReader;
 import java.io.PipedWriter;
 import java.io.IOException;
 @SuppressWarnings("all")  
 /** 
  *                 
  */  
 public class PipeTest {  
   public static void main(String[] args) {  
    Sender t1 = new Sender();  
   Receiver t2 = new Receiver();  
    PipedWriter out = t1.getWriter();  
     PipedReader in = t2.getReader();  
     try {  
       //    。          。
       //out.connect(in);  
       in.connect(out);  
       /** 
       * Thread  START  : 
       *         ;Java           run   。  
       *             ;    (       start   )      (    run   )。  
       *             。             ,       。  
       */
       t.start();
       t.start();
     } catch (IOException e) {
       e.printStackTrace();
     }
   }
 }
実行結果:

this is a short message
結果説明:
(01)in.co nnect(out)
        その役割は「パイプ入力ストリーム」と「パイプ出力ストリーム」を関連付けることです。PipedWriter.javaとPipedReader.javaのconnectのソースコードを調べます。私たちはout.co nnect(in)を知っています。n.co nnect(out)と同等である。
(02)
t 1.start()//「Sender」スレッドを起動します。
t 2.start()///「Receiver」スレッドを起動します。
Sender.javaのソースコードを確認して、スレッドが起動したら、run関数を実行します。Sender.javaのrun()では、writeShotMessage();
writeShot Message();「パイプライン出力ストリーム」にデータ「this is aショートメッセージ」を書き込む役割があります。このデータは「パイプ入力ストリーム」によって受信されます。これはどうやって実現されたのかを見てみます。
まずwrite(char charのソースコードを見ます。PipedWriter.javaはWriter.javaに継承されます。Writer.javaの中でwriteのソースコードは以下の通りです。

public void write(char cbuf[]) throws IOException {
  write(cbuf, 0, cbuf.length);
}
実際にはwrite(char c[])は呼び出しのPipedWriter.javaにおけるwrite関数です。writeのソースコードを調べたら、sink.receive(cbuf,off,len)を呼び出します。さらにreceive(char c[],int off,int len)の定義を調べてみると、私たちはsink.receive(cbuf,off,len)の役割を知っています。つまり、「パイプ出力ストリーム」のデータを「パイプ入力ストリーム」のバッファに保存するということです。一方、「パイプライン入力ストリーム」のバッファサイズは1024文字です。
ここで、私達はt 1.start()がSenderスレッドを起動し、Senderスレッドがデータ「this_a shart message」を「パイプ出力ストリーム」に書き込むことを知っています。また、「パイプライン出力ストリーム」は、このデータを「パイプライン入力ストリーム」、すなわち「パイプライン入力ストリーム」のバッファに保存します。
次に、「ユーザーが『パイプライン入力ストリーム』のバッファからデータを読み取る方法」を見てみます。これは実はReceiverスレッドの動作です。
t 2.start()はReceiverスレッドを起動し、Receiver.javaのrun関数を実行します。Receiver.javaのソースコードを確認して、私達はrun()がreadMessage Onceを呼び起こしたことを知っています。
readMessage Onceとは、「パイプ入力ストリームin」からデータを呼び出してbufに保存することである。
上記の分析により、「パイプ入力フローin」のバッファにおけるデータは「this is aショートメッセージ」であることが分かりました。ですから、bufのデータは「this is a shart message」です。
パイプに対する理解を深めるために。私たちは次の二つの小さなテストを続けます。
試験一:Sender.javaを修正する

public void run(){  
  writeShortMessage();
  //writeLongMessage();
}
に変更

public void run(){  
  //writeShortMessage();
  writeLongMessage();
}
プログラムを実行します。運転結果は以下の通りです。 
その中から、プログラムの運行が間違っていることが分かりました。例外java.io.IOException:Pipe closedをスローします。
どうしてですか?
プログラムの流れを分析します。
(01)PipeTestでは、入出力配管を、in.co nnectで接続する。そして、2つのスレッドが起動されます。t 1.start()はスレッドSenderを起動し、t 2.start()はスレッドReceiverを起動した。
(02)Senderスレッドが起動した後、writeLongMessage()を通じて「出力配管」にデータを書き込み、out.write(str.toCharararray()は1046文字を書き込みました。PipedWriterのソースコードによって、PipedWriterのwrite関数はPipedReaderのreceive()関数を呼びます。PipedReaderのreceive()関数を観察してみると、PipedReaderは受け取るデータストアのバッファエリアを知っています。receive()関数をよく観察すると、次のコードがあります。

while (in == out) {
  if ((readSide != null) && !readSide.isAlive()) {
    throw new IOException("Pipe broken");
  }
  /* full: kick any waiting readers */
  notifyAll();
  try {
    wait(1000);
  } catch (InterruptedException ex) {
    throw new java.io.InterruptedIOException();
  }
}
また、inとoutの初期値はそれぞれin=1,out=0である。上のwhile(in=out)を結合します。私達は知っていて、その意味はパイプの中に1つの文字を書き込むごとに、in=outのこの条件に達しました。その後、notifyAll()を呼び出し、「パイプの読み取りスレッド」を起動します。
つまり、パイプに文字を書き込むたびに、他のスレッドが読み込まれるのを待つのです。
しかし、PipedReaderのバッファのデフォルトサイズは1024です。しかし、このとき書き込むデータは1046があります。したがって、一度に限り1024文字しか書き込みできません。
(03)Receiverスレッドが起動すると、readMessage Once()リードパイプ入力ストリームが起動されます。1024文字を読み込むと、クローズド()が起動し、パイプが閉じられます。
(02)および(03)の分析から、Senderは1046文字をパイプに書き込むことがわかった。ここで、最初の1024文字(バッファ容量は1024)は正常に書き込みでき、書き込みごとに1つ読みます。1025文字を書き込む時、依然としてPipedWriter.javaの中のwrite()を順次呼び出します。その後、write()でPipedReader.javaのreceive()を呼び出します。PipedReader.javaでは、最終的にreceive(int c)関数に呼び出されます。この時、パイプの入力ストリームはすでに閉鎖されています。つまり、closedByReaderはtrueです。だから、throw new IOException(「Pipe closed」)を出します。
私たちは「試験一」を引き続き修正し、この問題を解決します。
試験二:「試験一」に基づいてReceiver.javaを引き続き修正する。

public void run(){  
  readMessageOnce() ;
  //readMessageContinued() ;
}
に変更

public void run(){  
  //readMessageOnce() ;
  readMessageContinued() ;
}
この場合、プログラムは正常に動作します。実行結果は:
0123457878901234 5678901234 5678901234 1234 56787878901234 5678787878901234 5678787878789
0123457878901234 5678901234 5678901234 1234 56787878901234 5678787878901234 5678787878789
0123457878901234 5678901234 5678901234 1234 56787878901234 5678787878901234 5678787878789
0123457878901234 5678901234 5678901234 1234 56787878901234 5678787878901234 5678787878789
0123457878901234 5678901234 5678901234 1234 56787878901234 5678787878901234 5678787878789
0123457878901234 5678901234 5678901234 1234 56787878901234 5678787878901234 5678787878789
0123457878901234 5678901234 5678901234 1234 56787878901234 5678787878901234 5678787878789
0123457878901234 5678901234 5678901234 1234 56787878901234 5678787878901234 5678787878789
0123457878901234 5678901234 5678901234 1234 56787878901234 5678787878901234 5678787878789
012345789 abcd
efghijklmnopqrstuvxyz
以上は小编が绍介したPipedWriterとPipedReaderのソースコードの分析です。皆さんに助けてほしいです。もし何か疑问があれば、メッセージをください。小编はすぐに返事します。ここでも私たちのサイトを応援してくれてありがとうございます。