JAva同時(4)読者執筆者問題のいくつかの実現方法と性能比較

7398 ワード

読者-ライターの質問
1、読みが他の読みをブロックしない
2、リードブロックその他の書き込み
3、書き込みが他の読み取りと他の書き込みをブロックする
 
問題は3つに分けられる:読者優先、書く者優先、公平な競争
 
読者優先:現在実行中がリードスレッドの場合、後続のリードスレッドをブロックせずに直接読むことができます.
現在書き込みなしスレッドが実行されている場合は、ブロックされた読み取りスレッドまたは書き込みスレッドをランダムに選択して実行します.
≪書込み優先|Writer Priority|oem_src≫:現在読取り、書込みスレッドの実行にかかわらず、ブロックされた書込みスレッドを優先的に選択します.
ブロックされたライトスレッドがない場合にのみ、ブロックされたリードプログラムが実行されます.
公正競争:リード・ライト・スレッドは先着順(FIFO)で順次実行され、キュー・データ構造が必要
 
JAvaは、wait()とnotify()、synchronized、Semaphore信号量、jdk 1を用いるなど、この問題を解決するために多くの一般的な方法がある.5+の他の同時技術については,読者優先を例に説明する.
 
1、リードロックとライトロックを使って解決する.この方法は最も簡単で直感的で、性能も比較的に良い.
 
package com.xx.concurrent.commonUse;

import java.util.concurrent.CountDownLatch;

public class ReaderAndWriterWithMonitor {
	
	//  
	static Object w = new Object();
	//  
	static Object r = new Object();
	
	//   
	static int data = 0 ;

	static CountDownLatch latch = new CountDownLatch(150);

	class Reader extends Thread {

		int quantity;

		Reader(int quantity) {
			this.quantity = quantity;
		}

		@Override
		public void run() {
			synchronized (w) {
					while (quantity > 0) {
						System.out.println(getName()
								+ " is reading ...【data=" + data + "】");
						quantity--;
					}
					latch.countDown();
			}
		}
	}

	class Writer extends Thread {
		int quantity;

		Writer(int quantity) {
			this.quantity = quantity;
		}

		@Override
		public void run() {
			synchronized (w) {
				synchronized (r) {
					while (quantity > 0) {
						data++;
						System.out.println(getName() + " is writing...【data=" + data + "】");
						quantity--;
					}
					latch.countDown();
				}
			}
		}
	}

	public static void main(String[] args) throws InterruptedException {
		long startTime = System.nanoTime();
		ReaderAndWriterWithMonitor demo = new ReaderAndWriterWithMonitor();
		for (int i = 0; i < 100; ++i) {
			demo.new Reader(10).start();
		}
		for (int i = 0; i < 50; ++i) {
			demo.new Writer(1).start();
		}

		latch.await();
		long endTime = System.nanoTime();
		System.out.println(endTime - startTime + "ns");
	}

}

2、使用信号量
package com.xx.concurrent.commonUse;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class ReaderAndWriter {
	
	static Semaphore mutex = new Semaphore(1);
	static Semaphore w = new Semaphore(1);
	static int readcnt = 0 ;
	static CountDownLatch latch = new CountDownLatch(150);
	static int data = 0;
	
	class Reader extends Thread{
		
		int quantity;
		
		Reader(int quantity){
			this.quantity = quantity;
		}
		
		@Override
		public void run(){
			while(quantity > 0){
				try {
					mutex.acquire();
					readcnt++;
					if (readcnt == 1)
						w.acquire();
					mutex.release();
					//read something
					System.out.println(getName() + " is reading ...【data=" + data + "】");
					mutex.acquire();
					readcnt--;
					if (readcnt == 0)
						w.release();
					mutex.release();
					quantity--;
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			latch.countDown();
		}
	}
	
	class Writer extends Thread{
		int quantity;
		
		Writer(int quantity){
			this.quantity = quantity;
		}
		
		@Override
		public void run(){
			while(quantity>0){
				try {
					w.acquire();
					data++;
					System.out.println(getName() + " is writing ...【data=" + data + "】");
					w.release();
					quantity--;
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			latch.countDown();
		}
	}
	
	
	public static void main(String[] args) throws InterruptedException {
		long startTime = System.nanoTime();
		ReaderAndWriter demo = new ReaderAndWriter();
		ExecutorService service = Executors.newFixedThreadPool(150);
		for(int i=0; i< 100; ++i){
			 service.execute(demo.new Reader(10));
		}
		for(int i=0 ; i< 50; ++i){
			service.execute(demo.new Writer(1));
		}
		latch.await();
		service.shutdown();
		long endTime = System.nanoTime();
		System.out.println(endTime - startTime + "ns");
		
	}
}

 
3、waitとnotify方式を使う
package com.xx.concurrent.commonUse;

import java.util.concurrent.CountDownLatch;

public class ReaderAndWriterWithWaitNotify {

	static Object w = new Object();

	static int readcnt = 0;
	static int writecnt = 0;
	
	static int data = 0;

	static CountDownLatch latch = new CountDownLatch(150);

	class Reader extends Thread {

		int quantity;

		Reader(int quantity) {
			this.quantity = quantity;
		}

		@Override
		public void run() {
			synchronized (w) {
				try {
					while (writecnt > 0) {
						w.wait();
					}
					readcnt++;
					while (quantity > 0) {
						System.out.println(getName() + " is reading...【data=" + data + "】" );
						quantity--;
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				readcnt--;
				w.notify();
				latch.countDown();
			}
		}
	}

	class Writer extends Thread {
		int quantity;

		Writer(int quantity) {
			this.quantity = quantity;
		}

		@Override
		public void run() {
			synchronized (w) {
				while (readcnt > 0 || writecnt > 0) {
					try {
						w.wait();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
				writecnt++;
				while (quantity > 0) {
					data++;
					System.out.println(getName() + " is writing...【data=" + data + "】" );
					quantity--;
				}
				writecnt--;
				w.notify();
				latch.countDown();
			}
		}
	}

	public static void main(String[] args) throws InterruptedException {
		long startTime = System.nanoTime();
		ReaderAndWriterWithWaitNotify demo = new ReaderAndWriterWithWaitNotify();
		for (int i = 0; i < 100; ++i) {
			demo.new Reader(10).start();
		}
		for (int i = 0; i < 50; ++i) {
			demo.new Writer(1).start();
		}

		latch.await();
		long endTime = System.nanoTime();
		System.out.println(endTime - startTime + "ns");

	}

}