Java Concurrent (4)


ここではEffective Java by Joshua BlochとConcurrent Programming in Java by Doug Leaから抜粋する.
 
1.7.1 synchronizedの使用の制限    内部のsynchronizedメソッドとブロックは、ロックベースの多くのアプリケーションを満たすことができますが、以下の制限があります.
  • あるスレッドがロックを取得しようとしたが、このロックが他のスレッドによって保持されている場合、ロールバックすることもできず、しばらく待ってから待機を放棄することもできず、またはある中断後にロックを取得しようとする企みを取り消すこともできず、スレッドが活性の問題から回復しにくい.
  • は、再入性、読み書き保護、公平性などのロックの意味形式を変更することはできません.
  • には同期アクセス制御がなく、いずれの方法でもアクセス可能なオブジェクトに対してsynchronized(obj)操作を実行することができ、これにより、必要なロックが占有されているため、サービスを拒否する問題が発生する.
  • 法とブロック内の同期により、厳格なブロック構造に対してのみロックを使用することができる.例えば、1つの方法ではロックが得られず、別の方法ではロックが解放される.

  • 1.7.2 util.concurrentキット    util.concurrentツールパッケージは、基本的なJava同期ツール(synchronization tools)の上にDoug Leaが作成した高品質、高効率、意味的に正確なスレッド制御構造ツールパッケージです.いくつかのインタフェースと実装について簡単に説明します.
     
    1.7.2.1 ReentrantLock     ReentrantLockは、内部ロックと同じ反発、再入力、メモリの可視性を保証し、明示的に解放する必要があります.ReentrantLockは、ブロック構造ロックではなく、中断可能、タイミング可能である.Java 5では、ReentrantLockのパフォーマンスは内部ロックよりはるかに高い.Java 6では、内部ロックを管理するアルゴリズムにReentrantLockと同様のアルゴリズムが採用されているため、内部ロックとReentrantLockの性能差は大きくない.    ReentrantLockのコンストラクション関数には、2つの公平性の選択があります.非公平ロック(デフォルト)または公平ロックを作成します.フェアロックでは、ロックが他のスレッドによって占有されている場合、リクエストスレッドは待機キューに追加され、順番にロックが取得されます.非フェアロックでは、ロックが要求されたときにロックのステータスが使用可能である場合、リクエストスレッドは、待機キューにスレッドが待機しているかどうかにかかわらず、ロックを直接取得することができる.フェアロックの代価は、スレッドの停止と再開のパフォーマンスオーバーヘッドをより多くすることです.多くの場合、非公平ロックの性能は公平ロックよりも高い.Java内部ロックも、特定の公平性の保証を提供しておらず、Java言語仕様もJVMに内部ロックを公平に実現するように要求していないため、ReentrantLockはロックの公平性を減らしていない.次はReentrantLockの一例です.
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class BoundedBuffer<T> {
    	//
    	private int head;
    	private int tail;
    	private int count;
    	private final T buffer[];
    	
    	//
    	private final Lock lock = new ReentrantLock();
    	private final Condition notEmpty = lock.newCondition();
    	private final Condition notFull = lock.newCondition();
    	
    	@SuppressWarnings("unchecked")
    	public BoundedBuffer(int capacity) {
    		this.buffer = (T[]) new Object[capacity];
    	}
    	
    	public T take() throws InterruptedException {
    		lock.lock();
    		try {
    			while(isEmpty()) {
    				notEmpty.await();
    			}
    			
    			T t = doTake();
    			
    			notFull.signal();
    			
    			return t;
    		} finally {
    			lock.unlock();
    		}
    	}
    
    	public void put(T t) throws InterruptedException {
    		lock.lock();
    		try {
    			while(isFull()) {
    				notFull.await();
    			}
    			
    			doPut(t);
    			
    			notEmpty.signal();
    		} finally {
    			lock.unlock();
    		}
    	}
    	
    	private boolean isEmpty() {
    		return count == 0;
    	}
    	
    	private boolean isFull() {
    		return count == buffer.length;
    	}
    	
    	private T doTake() {
    		T t = buffer[head];
    		buffer[head] = null;
    		if(++head == buffer.length) {
    			head = 0;
    		}
    		--count;
    		return t;
    	}
    
    	private void doPut(T t) {
    		buffer[tail] = t;
    		if(++tail == buffer.length) {
    			tail = 0;
    		}
    		++count;
    	}
    }

    1.7.2.2 Mutex     Mutexクラス(反発排他ロックmutual exclusion lock)の書き込みは、
    public class Mutex implemets Sync
    {
        public void acquire() throws InterruptedException;
        public void release();
        public boolean attempt(long msec) throws InterruptedException;
    }

        acquireは同期ブロックの入口動作と類似しており,releaseと同期ブロックの解放ロック動作は類似している.attempt操作は、所定の時間内にロックを取得してこそtrueを返す.0は合法的で、ロックが得られなければ待つ必要はないことを示しています.組み込みの同期メカニズムとは異なり、現在のスレッドがロックを取得しようとしている間に中断されると、acquireメソッドとattemptメソッドはInterruptedException異常を放出し、使用の複雑さを増すが、応答の良い堅牢なコードを記述してキャンセル操作を処理するメカニズムを提供する.synchronizedメソッドやブロックとは異なり、標準のMutexクラスは再入力できません.ロックがacquireを実行するスレッドによって保持されている場合、このスレッドがacquireを呼び出し続けると、ブロックされます.ReentrantLockは再入可能なロックです.
     
    1.7.2.3 Semaphore     信号量(Semaphore)は、同時制御における古典的な構成要素である.他のツールクラスと同様に、取得-リリースプロトコルも遵守します.概念的には、1つの信号量は、構造方法で初期化されたライセンスのセットを維持している.必要に応じて、acquire操作のたびにライセンスが使用可能になるまでブロックされ、このライセンスを占有します.attemptメソッドは同様の操作を実行しますが、タイムアウト時に失敗して終了することができます.releaseのたびにライセンスが追加されます.しかし、実際には実際のライセンスオブジェクトは使用されておらず、信号量は現在使用可能なライセンスの数を知り、関連する操作を実行するだけでよい.Mutexはライセンス数が1のSemaphoreと見なすことができる.次に、信号量の一例を示す.
    public class SyncQueue implements Queue
    {
        private final Queue mQueue;
        private final int mCapacity;
        private final Semaphore mSemProducer;
        private final Semaphore mSemConsumer;	
    
        public SyncQueue(Queue queue)
        {
            this(queue, Integer.MAX_VALUE);
        }
    
        public SyncQueue(Queue queue, int capacity)
        {
            mQueue = queue;
            mCapacity = capacity;
            mSemProducer = new Semaphore(capacity);
            mSemConsumer = new Semaphore(0);
        }
    
        public Object get()
        {
            // Accquire consumer's semaphore
            try
            {
                mSemConsumer.acquire();
            }
            catch(InterruptedException ie)
            {
                Thread.currentThread().interrupt();
                return null;
            }
    	
            // Get the item
            Object item = null;
            synchronized(mQueue)
            {		
                item = mQueue.get();
            }
    	
            //
            mSemProducer.release();
            return item;
        }
    	
        public boolean put(Object item)
        {
            // Precondition checking
            if(item == null)
            {
                return false;
            }
    
            // Accquire producer's semaphore
            try
            {
                mSemProducer.acquire();
            }
            catch(InterruptedException ie)
            {
                Thread.currentThread().interrupt();
                return false;
            }
    	
            // Add the item
            synchronized(mQueue)
            {
                mQueue.put(item);
            }
    	
            // Release consumer's semaphore
            mSemConsumer.release();
            return true;
        }
    }
     
    1.7.2.4 Latch     閉鎖(latch)とは、ある値が得られると変化しない変数または条件を指す.二元閉鎖変数または条件(通常は閉鎖となる)の値は、一度だけ、すなわちその初期化状態からその最終状態に変更することができる.閉鎖に関連する同時制御技術はLatchクラスにカプセル化され,汎用的な獲得−解放プロトコルを遵守する.しかし、release操作は、すべての前または後のacquire操作を実行に戻すことを意味します.    閉鎖の拡張の1つはカウントダウンカウンタであり、そのacquire操作はrelease操作で一定の回数を実行し、1回後に実行を再開するだけではない.閉鎖、逆数カウンタ、およびそれらの基礎の上に確立された簡単なツールクラスは、これらの条件の応答動作を処理するために使用することができる.
  • インジケータを完了します.たとえば、一部の操作が実行されるまでスレッドを強制します.
  • タイミングバルブ値.たとえば、ある時期にスレッドのセットがトリガーされます.
  • イベント指示.例えば、特定のメッセージを受信したり、特定のボタンが押されたりしてこそ続行できる操作がトリガーされる.
  • エラー表示.例えば、グローバルなクローズ者が実行するときに実行できるスレッド
  • のセットがトリガーされる.
    1.7.2.5 Barrier
       Barrierは、ロックとは異なり、ロックが待機しているのはイベントであり、barrierが待機しているのはスレッドであるというスレッドのセットをブロックすることができる.CyclicBarrierは、所与の数のスレッドを1つのbarrier pointに複数回集中させることを可能にする.あるスレッドがawaitメソッドを呼び出すとブロックされ、すべてのスレッドがawaitメソッドを呼び出すとbarrierが突破され、すべてのスレッドが実行され続け、barrierもresetされて次の使用に備えられる.await呼び出しがタイムアウトしたり、ブロック内のスレッドが中断されたりした場合、barrierは失敗したと考えられ、未完了のawait呼び出しはすべてBrokenBarrierExceptionによって終了します.await呼び出しが成功すると、一意の到着インデックス番号が返されます.CyclicBarrierでは、構造関数にRunnable型のbarrier actionを渡すこともできます.barrierを通過することに成功すると実行されます.CyclicBarrierの例を次に示します.
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.CyclicBarrier;
    
    public class Solver {
    	//
    	private final String[][] data;
    	private final CyclicBarrier barrier;
    	private final CountDownLatch latch;
    
    	public Solver(String[][] data) {
    		this.data = data;
    		this.barrier = new CyclicBarrier(data.length, new BarrierAction());
    		this.latch = new CountDownLatch(data.length);
    	}
    	
    	public void start() {
    		//
    		for (int i = 0; i < data.length; ++i)  {
    			new Thread(new Worker("worker" + i, this.data[i])).start();
    		}
    		
    		//
    		try {
    			latch.await();
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    	}
    	
    	public static void main(String args[]) {
    		String[][] data = new String[][]{{"a1", "a2", "a3"}, {"b1", "b2", "b3"}, {"c1", "c2", "c3"}};
    		Solver solver = new Solver(data);
    		solver.start();
    	}
    	
    	private class BarrierAction implements Runnable {
    		public void run() {
    			System.out.println(Thread.currentThread().getName() + " is processing barrier action");
    		}
    	}
    	
    	private class Worker implements Runnable {
    		//
    		private String name;
    		private String[] row;
    		
    		Worker(String name, String[] row) {
    			this.name = name;
    			this.row = row; 
    		}
    		
    		public void run() {
    			for(int i = 0; i < row.length; i++) {
    				System.out.println(name + " is processing row[" + i +"]" + row[i]);
    				
    				try {
    					barrier.await(); 
    				} catch (InterruptedException ex) { 
    					break; 
    				} catch (BrokenBarrierException ex) { 
    					break; 
    				}
    			}
    			
    			//
    			latch.countDown();
    		}
    	}
    } 

     
    1.8同時処理の実践    集合クラスを設計したと仮定し、マルチスレッド環境での遍歴方法を提供したいと思います.この設計問題には、一般的に3つの解決策があります.同期集計操作、インデックス化遍歴、バージョン化反復変数です.各方法には設計のメリットとデメリットがあります.1.8.1同期集計操作    1つの安全な使用列挙方法は、各要素に作用する操作を抽出することであり、synchronized applyToAllメソッドのパラメータ(C/C++の関数ポインタ、javaのインタフェース、または閉パッケージ)として使用することができる.例:
    public interface Procedure
    {
        void apply(Object obj);
    }
    
    public class Vector
    {
        public syncronized void applyToAll(Procedure p)
        {
            for(int i = 0;  i < size; i++)
            {
                p.apply(data[i]);
            }
        }
    }

        この方法は,遍歴中に他のスレッドが集合要素を増加または減少させる可能性のある干渉を除去するが,その代価として集合のロックを持つ時間が長すぎる.
     
    1.8.2インデックスパスとクライアントロック    このような遍歴ポリシーは、クライアントがインデックスのアクセス方法を使用して遍歴することを要求します.たとえば、次のようにします.
    for(int i = 0;  i < v.size(); i++)
    {
        System.out.println(v.get(i));
    }

        size()ではget(int)メソッドは同期されていますが、v.size()メソッドのような細いロッククラス度によって生じる潜在的な競合を処理するために成功する可能性がありますが、その後、別のスレッドが最後の要素を削除し、v.get(i)を呼び出すとエラーが発生する可能性があります.この問題を解決する1つの方法は、クライアントロックを使用して、サイズチェックとアクセスの原子性を保証することです.    この方法は比較的柔軟に用いられるが,破損したパッケージを代価として前提しており,正確かどうかはVector内部実装の理解度に依存する.
     
    1.8.3バージョン化反復変数    これは、ループメソッドでは、失敗した反復変数、すなわち放棄された反復変数をサポートする集合クラスに関連し、ループ中に集合要素が変更されると、反復操作は例外を放出します.このポリシーを実装する最も簡単な方法は、セットを更新するたびに増加する反復操作のバージョン番号を維持することです.反復変数が次の要素にアクセスするたびに、このバージョン番号が変更された場合、例外が放出されます.このバージョン番号は、1回の遍歴中にバージョン番号がループしないように十分大きいはずです.一般的には、整形(int)で十分です.    Java言語集合フレームワークのjava.util.Iteratorは、このポリシーを使用します.ConcurrentModificationExceptionでは、スレッド間で計画的で見たくないインタラクションがあることがよく説明されていますが、これらの問題の修正は例外処理コードだけでは不十分なことが多いです.コレクションクラスでは、これらの反復変数の上に集約ループまたはクライアントロックを使用することができるため、反復変数をバージョン化するのは良い選択です.