Java_ioシステムのPipedInputStream、PipedOutputStreamの概要、ソースコードおよび例-06


Java_ioシステムのPipedInputStream/PipedOutputStreamの概要、ソースコードおよび例-06
       ——パイプ出力ストリームは、パイプ入力ストリームの上に構築する必要があるので、まずパイプ出力ストリームについて説明します.ソースコードを先に読んだり、まとめたりして書くのがちょっとQなので、無視したり、間違ったところを指摘したりするのは感謝に堪えません.
一:PipedOutputStream
1、       クラス機能の概要:
パイプバイト出力ストリームは、現在のスレッドの指定バイトをこのスレッドに対応するパイプ入力ストリームに書き込むために使用されるので、PipedInputStream(pis)、PipedOutputStream(pos)は組み合わせて使用する必要があります.パイプバイト出力ストリームの本質はpisを呼び出す方法でバイトまたはバイト配列をpisに書き込むことであり,これは異なる点である.したがってposの方法は少なく簡単で、主に伝達されたpisを自身にバインドし、ペアで使用し、次にバインドされたpisの書き込み方法を呼び出し、バイトまたはバイト配列をpisのキャッシュバイト配列に書き込むことを担当する.
2、       PipedOutputStreamAPIの概要:
A:キーフィールド
	PipedInputStream sink; 	          pos          

B:構築方法
 
	PipedOutputStream(PipedInputStream snk);	    snk   pos  
	
	PipedOutputStream();	     、       PipedInputStream  

C:一般的な方法
	synchronized void connect(PipedInputStream snk);	    snk   pos  
	
	void write(byte b) ;	     snk.receive(byte b)    b  snk 
	
	void write(byte[] b, int off, int len);		     snk.receive(byte[] b, int off, int len)     b       snk 。
	
	synchronized void flush();		           snk 
	
	close();	    pos、    、  snk              

 
3、       ソース分析:
package com.chy.io.original.code;

import java.io.*;

/**
 *      。          、         。
 * @author  andyChen
 * @version 1.1, 13/11/19
 */
public class PipedOutputStream extends OutputStream {

	//  PipedOutputStream   PipedInputStream  
    private PipedInputStream sink;

    /**
     *      pis   pos
     */
    public PipedOutputStream(PipedInputStream snk)  throws IOException {
    	connect(snk);
    }
    
    /**
     *      、             pis
     */
    public PipedOutputStream() {
    }
    
    /**
     *  “     ”   “     ”  。        、      
     *            pos   pis  。
     */
    public synchronized void connect(PipedInputStream snk) throws IOException {
        if (snk == null) {
            throw new NullPointerException();
        } else if (sink != null || snk.connected) {
        	throw new IOException("Already connected");
		}
		sink = snk;
		snk.in = -1;
		snk.out = 0;
        snk.connected = true;
    }

    /**
     *  int  b  “     ” 
     *  b  “     ”  ,   b   “     ”、            pos   pis   pis buffer 
     */
    public void write(int b)  throws IOException {
        if (sink == null) {
            throw new IOException("Pipe not connected");
        }
        sink.receive(b);
    }

    /**
     *      b  “     ” 
     *    b  “     ”  ,       “     ”、          
     */
    public void write(byte b[], int off, int len) throws IOException {
        if (sink == null) {
            throw new IOException("Pipe not connected");
        } else if (b == null) {
	    throw new NullPointerException();
		} else if ((off < 0) || (off > b.length) || (len < 0) ||
			   ((off + len) > b.length) || ((off + len) < 0)) {
		    throw new IndexOutOfBoundsException();
		} else if (len == 0) {
		    return;
		} 
		sink.receive(b, off, len);
    }

    /**
     *     、      pis        、  、    pos    
     */
    public synchronized void flush() throws IOException {
		if (sink != null) {
	            synchronized (sink) {
	                sink.notifyAll();
	            }
		}
    }

    /**
     *   “     ”。  pis、     pos   、 pis                、        
     */
    public void close()  throws IOException {
		if (sink != null) {
		    sink.receivedLast();
		}
    }
}

 
4、       実例:
PipedOutputStreamはPipedInputStreamと組み合わせて使用する必要があるため、両方の例を一緒にします.
二:PipedInputStream
1、       クラス機能の概要:
パイプバイト入力ストリーム、対応するバインドされたパイプバイト出力ストリームを読み込むための内蔵バイトキャッシュ配列buffer内のバイトを書き込むことでスレッド間の通信を実現し、pisにはpos呼び出し、、receive(byte b)、receive(byte[]b,int off,intlen)、posがpisのbufferにバイトまたはバイト配列を書き込むことができるように、
2、       PipedInputStream APIの概要:
A:キーフィールド
    //             
    boolean closedByWriter = false;
   
    //             
    volatile boolean closedByReader = false;
    //     、         
    boolean connected = false;

    //           
    Thread readSide;
    //            
    Thread writeSide;

    //        
    private static final int DEFAULT_PIPE_SIZE = 1024;

    protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE;

    //         
    protected byte buffer[];

    //      buf      、in==out            
    protected int in = -1;

    //        
    protected int out = 0;

B:構築方法
    PipedInputStream(PipedOutputStream src);			       buf、                     pis
    
    PipedInputStream(PipedOutputStream src, int size);		         buffer  、       pos  pis
    
    PipedInputStream();		          pis、 pis               pos。
	
    PipedInputStream(int pipeSize);		          pis、 pis               pos。

コアの方法は2つ目!
C:一般的な方法
	void connect(PipedOutputStream src);	 pis pos  、      pos connect  、
	
	synchronized void receive(byte b);		     pis   pos        pis buffer 、   pis  
	
	synchronized void receive(byte[] b, int off, int len);		     pis   pos     b         pis buffer 、   pis  
	
	void receivedLast();	       pos   、pos         pis、    、      pis         
	
	int read();		        pis buffer       、        
	
	int read(byte[] b,int off, int len);	      len        off  、  len    、          
	
	synchronized int available();		        、          
	
	void close();		      buffer

3、       ソース分析:
package com.chy.io.original.code;

import java.io.IOException;
import java.io.PipedOutputStream;

/**
 *        、            、         。
 * @author  andyChen
 * @version 1.1, 13/11/19
 */
public class PipedInputStream extends InputStream {
	//             
    boolean closedByWriter = false;
    //             
    volatile boolean closedByReader = false;
    //     、         
    boolean connected = false;

    //           
    Thread readSide;
    //            
    Thread writeSide;

    //        
    private static final int DEFAULT_PIPE_SIZE = 1024;

    protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE;

    //         
    protected byte buffer[];

    //      buf      、in==out            
    protected int in = -1;

    //        
    protected int out = 0;

    /**
     *        buf、                     pis
     */
    public PipedInputStream(PipedOutputStream src) throws IOException {
        this(src, DEFAULT_PIPE_SIZE);
    }

    /**
     *         buf、                     pis
     */
    public PipedInputStream(PipedOutputStream src, int pipeSize)
            throws IOException {
    	initPipe(pipeSize);
    	connect(src);
    }

    /**
     *           pis、 pis               pos。
     */
    public PipedInputStream() {
    	initPipe(DEFAULT_PIPE_SIZE);
    }

    /**
     *           pis、 pis               pos。
     */
    public PipedInputStream(int pipeSize) {
    	initPipe(pipeSize);
    }
    
    //   buf  。
    private void initPipe(int pipeSize) {
         if (pipeSize <= 0) {
            throw new IllegalArgumentException("Pipe Size <= 0");
         }
         buffer = new byte[pipeSize];
    }

    /**
     *  pis pos  、        、       pos connect(pis)    
     */
    public void connect(PipedOutputStream src) throws IOException {
    	src.connect(this);
    }

    /**
     *   int     b。
     *     PipedOutputStream write(int b)     
     */
    protected synchronized void receive(int b) throws IOException {
    	//       
        checkStateForReceive();
        //   “    ”   、     pis  buffer                   
        writeSide = Thread.currentThread();
        if (in == out)
        	//  “    ”           ,   。
            awaitSpace();
		if (in < 0) {
		    in = 0;
		    out = 0;
		}
		//  b      
		buffer[in++] = (byte)(b & 0xFF);
		if (in >= buffer.length) {
		    in = 0;
		}
    }

    /**
     *       b    
     */
    synchronized void receive(byte b[], int off, int len)  throws IOException {
    	//       
        checkStateForReceive();
        //   “    ”   
        writeSide = Thread.currentThread();
        int bytesToTransfer = len;
        while (bytesToTransfer > 0) {
        	//  “    ”           ,   。
            if (in == out)
                awaitSpace();
            //      buff     
            int nextTransferAmount = 0;
            //            ,         ;
            //    nextTransferAmount=buffer.length - in
            if (out < in) {
                nextTransferAmount = buffer.length - in;
            } else if (in < out) {
            	//   “         ,  /         ”,        
            	//  in==-1 buffer     ,     in、out  nextTransferAmount = buffer.length - in;
            	//   ,nextTransferAmount = out - in;
                if (in == -1) {
                    in = out = 0;
                    nextTransferAmount = buffer.length - in;
                } else {
                    nextTransferAmount = out - in;
                }
            }
            if (nextTransferAmount > bytesToTransfer)
                nextTransferAmount = bytesToTransfer;
            // assert      , nextTransferAmount <= 0,     。                   。
            assert nextTransferAmount > 0;
            System.arraycopy(b, off, buffer, in, nextTransferAmount);
            bytesToTransfer -= nextTransferAmount;
            off += nextTransferAmount;
            in += nextTransferAmount;
            if (in >= buffer.length) {
                in = 0;
            }
        }
    }
  
    /**
    *       、      、  pos、pis     、              、    
    */
    private void checkStateForReceive() throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByWriter || closedByReader) {
	    throw new IOException("Pipe closed");
	} else if (readSide != null && !readSide.isAlive()) {
            throw new IOException("Read end dead");
        }
    }
    /**
     *   。
     *  “    ”           (  ,     ),   awaitSpace()  ;
     *       “       ”          ,        “  ”     。
     */
    private void awaitSpace() throws IOException {
    	//   “         ,         ” ,
    	//    1000ms  “    ”,       :  “           ”,      。
		while (in == out) {
		    checkStateForReceive();
		    /* full: kick any waiting readers */
		    notifyAll();
		    try {
		        wait(1000);
		    } catch (InterruptedException ex) {
		    	throw new java.io.InterruptedIOException();
		    }
		}
    }

    /**
     *  PipedOutputStream    ,   
     */
    synchronized void receivedLast() {
		closedByWriter = true;
		notifyAll();
    }

    /**
     *              ,      int  
     */
    public synchronized int read()  throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByReader) {
	    throw new IOException("Pipe closed");
		} else if (writeSide != null && !writeSide.isAlive()
	                   && !closedByWriter && (in < 0)) {
	            throw new IOException("Write end dead");
		}
	
	        readSide = Thread.currentThread();
		int trials = 2;
		while (in < 0) {
		    if (closedByWriter) {
			/* closed by writer, return EOF */
			return -1;
		    }
		    if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
			throw new IOException("Pipe broken");
		    }
	            /* might be a writer waiting */
		    notifyAll();
		    try {
		        wait(1000);
		    } catch (InterruptedException ex) {
			throw new java.io.InterruptedIOException();
		    }
	 	}
		int ret = buffer[out++] & 0xFF;
		if (out >= buffer.length) {
		    out = 0;
		}
		if (in == out) {
	            /* now empty */
		    in = -1;
		}
	
		return ret;
    }

    /**
     *       len        off  、  len    
     */
    public synchronized int read(byte b[], int off, int len)  throws IOException {
		if (b == null) {
		    throw new NullPointerException();
		} else if (off < 0 || len < 0 || len > b.length - off) {
		    throw new IndexOutOfBoundsException();
		} else if (len == 0) {
		    return 0;
		}
	
	        /* possibly wait on the first character */
		int c = read();
		if (c < 0) {
		    return -1;
		}
		b[off] = (byte) c;
		int rlen = 1;
		while ((in >= 0) && (len > 1)) {
		    int available; 
		    if (in > out) {
			available = Math.min((buffer.length - out), (in - out));
		    } else {
			available = buffer.length - out;
		    }
	
		    // A byte is read beforehand outside the loop
		    if (available > (len - 1)) {
			available = len - 1;
		    }
		    System.arraycopy(buffer, out, b, off + rlen, available);
		    out += available;
		    rlen += available; 
		    len -= available;
		    
		    if (out >= buffer.length) {
			out = 0;
		    }
		    if (in == out) {
	                /* now empty */
			in = -1;
		    }
		}
		return rlen;
    }

    /**
     *         、          
     */
    public synchronized int available() throws IOException {
		if(in < 0)
		    return 0;
		else if(in == out)
		    return buffer.length;
		else if (in > out)
		    return in - out;
		else
		    return in + buffer.length - out;
    }

    /**
     *     
     */
    public void close()  throws IOException {
    	closedByReader = true;
        synchronized (this) {
            in = -1;
        }
    }
}

4、       実例:
一般に、2つのスレッド間の通信に使用される場合、2つのスレッド、1つの出力者、内部にPipedOutputStreamオブジェクトの参照を持ち、受信者にバイト情報を送信するために使用される.1つの受信者、内部にPipedInputStreamオブジェクトの参照を持ち、送信者から送信されたバイト情報を受信し、最後にテストクラスを介して両者を結合し、スレッド間の通信を実現する.SenderThread:単純で、3つのプライベートメソッドがそれぞれ1バイト、1つの短いバイト(1024未満、PipedinputStreamのbufferのデフォルトサイズが1024であるため)1つの長いバイト(1024を超える)を見て、何か違いがあります.
package com.chy.io.original.thread;

import java.io.IOException;
import java.io.PipedOutputStream;


public class SenderThread implements Runnable {
	private PipedOutputStream pos = new PipedOutputStream();
	
	public PipedOutputStream getPipedOutputStream(){
		return pos;
	}
	@Override
	public void run() {
		//sendOneByte();
		//sendShortMessage();
		sendLongMessage();
	}

	private void sendOneByte(){
		try {
			pos.write(0x61);
			pos.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	private void sendShortMessage() {
		try {
			pos.write("this is a short message from senderThread !".getBytes());
			pos.flush();
			pos.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	private void sendLongMessage(){
		try {
			byte[] b = new byte[1028];
			//       1028     、 1020  1、 8  2。
			for(int i=0; i<1020; i++){
				b[i] = 1;
			}
			for (int i = 1020; i <1028; i++) {
				b[i] = 2;
			}
			pos.write(b);
			pos.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
}

ReceiverThread:簡単です.bufferに書き込まれたバイトを受信する方法も3つあります.
package com.chy.io.original.thread;

import java.io.IOException;
import java.io.PipedInputStream;

public class ReceiverThread extends Thread {
	
	private PipedInputStream pis = new PipedInputStream();
	
	public PipedInputStream getPipedInputStream(){
		return pis;
	}
	@Override
	public void run() {
		//receiveOneByte();
		//receiveShortMessage();
		receiverLongMessage();
	}
	
	private void receiveOneByte(){
		try {
			int n = pis.read();
			System.out.println(n);
			pis.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	private void receiveShortMessage() {
		try {
			byte[] b = new byte[1024];
			int n = pis.read(b);
			System.out.println(new String(b, 0, n));
			pis.close();
			
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	private void receiverLongMessage(){
		try {
			byte[] b = new byte[2048];
			int count = 0;
			while(true){
				count = pis.read(b); 
				for (int i = 0; i < count; i++) {
					System.out.print(b[i]);
				}
				if(count == -1)
					break;
			}
			pis.close();
			
		} catch (IOException e) {
			e.printStackTrace();
		}
	}	
}

テストクラス:PipedStreamTest、ReceiverThread、SenderThreadの2つのスレッドをパイプフローで結合し、通信効果を表示
package com.chy.io.original.test;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

import com.chy.io.original.thread.Receiver;
import com.chy.io.original.thread.ReceiverThread;
import com.chy.io.original.thread.Sender;
import com.chy.io.original.thread.SenderThread;

@SuppressWarnings("all")   
/**  
 *                 
 */   
public class PipedStreamTest {   
   
    public static void main(String[] args) {   
    	testPipedStream();
    }

    private static void testPipedStream(){
    	SenderThread st =new SenderThread();
    	ReceiverThread rt = new ReceiverThread();
    	PipedInputStream pis = rt.getPipedInputStream();
    	PipedOutputStream pos = st.getPipedOutputStream();
    	
    	try {
			pos.connect(pis);
			
			new Thread(st).start();
			rt.start();
		} catch (IOException e) {
			e.printStackTrace();
		}
    	
    }
    
	private static void exampleFromIE() {
		Sender t1 = new Sender();   
           
        Receiver t2 = new Receiver();   
           
        PipedOutputStream out = t1.getOutputStream();   
 
        PipedInputStream in = t2.getInputStream();   

        try {   
            //    。  2        。
            //out.connect(in);   
            in.connect(out);   
               
            /**  
             * Thread  START  :  
             *         ;Java           run   。   
             *             ;    (       start   )      (    run   )。   
             *             。             ,       。   
             */
            t1.start();
            t2.start();
        } catch (IOException e) {
            e.printStackTrace();
        }
	}
}

結果の説明:
前の2つの書き込みバイト、書き込みshortバイト配列はよく理解されています.つまり、書き込み数の読み取り、後の書き込み長バイト配列、受信スレッドは1つのwhile(true)でbufferのバイトを繰り返し読み出し、読み終わるまで、ループを使わないと1024バイトしか読めません.bufferのデフォルトサイズは1024、posは一度に1024しか書けないので、pisも同様に一度に1024個しか読み取れず、whileを用いた場合、pos書き込みの1020個を初めて読み取った後、pisが再読み取りするとbufferが空になり、受信者スレッドが待機し、送信者にバイトの書き込みを通知し、継続する.
まとめ
PipedInputStream、PipedOutputStreamの両者の結合はオシドリのように、どちらを離れても存在し続けることができないと同時に、連理枝のようにPipedOutputStreamはまずconnect(PipedInputStream snk)によって関係を確定し、PipedInputStream状態を初期化し、PipedInputStreamがこのPipedOutputStreamに属するしかないと訴え、connect=true、PipedInputStreamバイトを贈りたい場合は、receive(byte b)、receive(byte[]b、int off、int len)を直接呼び出して、pisの通帳bufferにバイトまたはバイト配列を入れます.PipedInputStreamの角度に立って、どのPipedOutputStreamを見たときにposを暗示し、posに主導権を渡し、posのconnectを呼び出して自分で登録します.バイトを使おうとするとbufferからバイトをもらいますが、自分でバイトを稼ぐ能力がないので、bufferにバイトがないときは、自分で待っています.posにバイトがないと、posは通帳にバイトを預けます.もちろん、posはずっと貯金しません.通帳が空いているときも、自発的に貯金しません.pisがもらうのを待っています.貯金しなければならない.最后の2つはbufferだけで相手の存在を知っていて、bufferから保存したりバイトを取ったりするたびに相手が安康かどうかを见て、安ければ生活を続けて、一方がいなければ、もう一方も独存したくない!
詳細IOコンテンツ:java_ioシステムのディレクトリ