Javaマルチスレッド開発の~~~マルチ条件Conditionインタフェースの使用

7860 ワード

マルチスレッド開発では、このような状況が発生する可能性があります.実行を続行するには、別のスレッドが○○の条件を満たす必要があるか、または
実行するには、他のスレッドがいくつかの条件を満たす必要があります.このような多条件のマルチスレッドが同時に実行される場合、各スレッド間の関係をどのように制御し、彼らを
衝突をうまく処理して問題にならないようにしましょう.Javaが提供するConditionというインタフェースを紹介します.このインタフェースはよく実現されています.
このような需要.
この問題の最も古典的な例は生産者の消費者モデルで、生産者はバッファがいっぱいになったときに商品を生産しないでバッファが空いていることを知っていて、消費します
生産者がバッファに商品を入れるまでバッファが0の場合、商品を持たない.以下、Conditonというインタフェースを使用してこのような需要を実現する.
package com.bird.concursey.charpet4;

/**
 * First, let's implement a class that will simulate a text file. Create a class named
FileMock with two attributes: a String array named content and int named
index. They will store the content of the file and the line of the simulated file that will
be retrieved.
 * @author bird
 * 2014 9 21    6:55:31
 */
public class FileMock {
	
	private String content[];
	private int index;
	
	/**
	 * Implement the constructor of the class that initializes the content of the file with
random characters.
	 * @param size
	 * @param length
	 */
	public FileMock(int size, int length) {
		content = new String[size];
		for(int i = 0; i < size; i++) {
			StringBuilder buffer = new StringBuilder(length);
			for(int j = 0; j < length; j++) {
				int indice = (int) (Math.random() * 255);
				buffer.append((char)indice);
			}
			content[i] = buffer.toString();
		}
		index = 0;
	}
	
	/**
	 * Implement the method hasMoreLines() that returns true if the file has more lines
to process or false if we have achieved the end of the simulated file.
	 * @return
	 */
	public boolean hasMoreLines() {
		return index < content.length;
	}
	
	/**
	 * Implement the method getLine() that returns the line determined by the index
attribute and increases its value.
	 * @return
	 */
	public String getLine() {
		if(this.hasMoreLines()) {
			System.out.println("Mock: " + (content.length - index));
			return content[index++];
		}
		return null;
	}
}
package com.bird.concursey.charpet4;

import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * implement a class named Buffer that will implement the buffer shared by
 * producers and consumers.
 * 
 * @author bird 2014 9 21    6:58:13
 */
public class Buffer {

	private LinkedList<String> buffer;
	private int maxSize;
	private ReentrantLock lock;
	private Condition lines;
	private Condition space;
	private boolean pendingLines;
	
	/**
	 * Implement the constructor of the class. It initializes all the attributes
described previously.
	 * @param maxSize
	 */
	public Buffer(int maxSize) {
		this.maxSize = maxSize;
		buffer = new LinkedList<String>();
		lock = new ReentrantLock();
		lines = lock.newCondition();
		space = lock.newCondition();
		pendingLines = true;
	}
	
	/**
	 * Implement the insert() method. It receives String as a parameter and tries
to store it in the buffer. First, it gets the control of the lock. When it has it, it then
checks if there is empty space in the buffer. If the buffer is full, it calls the await()
method in the space condition to wait for free space. The thread will be woken up
when another thread calls the signal() or signalAll() method in the space
Condition. When that happens, the thread stores the line in the buffer and calls
the signallAll() method over the lines condition. As we'll see in a moment, this
condition will wake up all the threads that were waiting for lines in the buffer.
	 * @param line
	 */
	public void insert(String line) {
		lock.lock();
		try {
			while(buffer.size() == maxSize) {
				space.await();
			}
			buffer.add(line);
			System.out.printf("%s: Inserted Line: %d
", Thread.currentThread().getName(),buffer.size()); lines.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); }finally{ lock.unlock(); } } /** * Implement the get() method. It returns the first string stored in the buffer. First, it gets the control of the lock. When it has it, it checks if there are lines in the buffer. If the buffer is empty, it calls the await() method in the lines condition to wait for lines in the buffer. This thread will be woken up when another thread calls the signal() or signalAll() method in the lines condition. When it happens, the method gets the first line in the buffer, calls the signalAll() method over the space condition and returns String. * @return */ public String get() { String line = null; lock.lock(); try { while((buffer.size() == 0) && (hasPendingLines())) { lines.await(); } if(hasPendingLines()) { line = buffer.poll(); System.out.printf("%s: Line Readed: %d
",Thread.currentThread().getName(),buffer.size()); space.signalAll(); } } catch (InterruptedException e) { e.printStackTrace(); }finally{ lock.unlock(); } return line; } /** * Implement the setPendingLines() method that establishes the value of the attribute pendingLines. It will be called by the producer when it has no more lines to produce. * @param pendingLines */ public void setPendingLines(boolean pendingLines) { this.pendingLines=pendingLines; } /** * Implement the hasPendingLines() method. It returns true if there are more lines to be processed, or false otherwise. * @return */ public boolean hasPendingLines() { return pendingLines || buffer.size() > 0; } }
package com.bird.concursey.charpet4;

public class Producer implements Runnable {
	
	private FileMock mock;
	
	private Buffer buffer;
	
	public Producer(FileMock mock, Buffer buffer) {
		this.mock = mock;
		this.buffer = buffer;
	}

	/**
	 * Implement the run() method that reads all the lines created in the FileMock
object and uses the insert() method to store them in the buffer. Once it finishes,
use the setPendingLines() method to alert the buffer that it's not going to
generate more lines.
	 */
	@Override
	public void run() {
		buffer.setPendingLines(true);
		while(mock.hasMoreLines()) {
			String line = mock.getLine();
			buffer.insert(line);
		}
		buffer.setPendingLines(false);
	}

}
package com.bird.concursey.charpet4;

import java.util.Random;

public class Consumer implements Runnable {
	
	private Buffer buffer;
	
	public Consumer(Buffer buffer) {
		this.buffer = buffer;
	}

	/**
	 * Implement the run() method. While the buffer has pending lines, it tries to get one
and process it.
	 */
	@Override
	public void run() {
		while(buffer.hasPendingLines()) {
			String line = buffer.get();
			processLine(line);
		}
	}
	
	/**
	 * Implement the auxiliary method processLine(). It only sleeps for 10 milliseconds
to simulate some kind of processing with the line.
	 * @param line
	 */
	private void processLine(String line) {
		Random random = new Random();
		try {
			Thread.sleep(random.nextInt(100));
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	
	public static void main(String[] args) {
		FileMock fileMock = new FileMock(100, 10);
		Buffer buffer = new Buffer(20);
		
		Producer producer = new Producer(fileMock, buffer);
		Thread threadProducer = new Thread(producer, "Producer");
		
		Consumer consumers[] = new Consumer[3];
		Thread threadConsumers[] = new Thread[3];
		for(int i = 0; i < 3; i++) {
			consumers[i] = new Consumer(buffer);
			threadConsumers[i] = new Thread(consumers[i], "consumer " + i);
		}
		
		threadProducer.start();
		for(int i = 0; i < 3; i++) {
			threadConsumers[i].start();
		}
	}

}

注意:
When a thread calls the await() method of a condition, it automatically frees the control of the lock, so that another thread can get it and begin the execution of the same, or another critical section protected by that lock.