Javaでのマルチスレッドのステップアップ編

11025 ワード

この1編は主にEffecttive Java同時章を学ぶノートです.
1、同期アクセス共有の可変データ
同期がなければ、1つのスレッドの変化は他のスレッドに見られません.同期は、1つのスレッドがオブジェクトが一致しない状態にあることを阻止するだけでなく、同期メソッドまたは同期コードブロックに入る各スレッドが、同じロックによって保護される前にすべての変更効果を保証することができます.
スレッド間で信頼性の高い通信を行うためにも,反発アクセスのためにも同期が必要である.
1、Threadを使わないでください.stopは1つのスレッドが別のスレッドのタスクを妨げることを阻止し、JavaではThreadが提供されている.stopメソッドですが、このメソッドは本質的に安全ではありません.使用するとデータが破壊されます.
あるスレッドが別のスレッドを妨げるのを阻止するには、提案する方法は、最初はfalseであったが、最初のスレッドが自分を終了することを示すために2番目のスレッドをtrueに設定することができるbooleanドメインをポーリングすることです.booleanドメインの読み書き操作はすべて原子であるため、プログラマはこのドメインにアクセスするときに同期を使用しません.エラー例:

//        :           !
private static boolean stopRequested;
    private void test_1(){
        Thread backgroundThread=new Thread(new Runnable() {
            @Override
            public void run() {
                int i=0;
                LogUtils.w("stopRequested="+stopRequested);
                while(!stopRequested){
                    i++;
                    LogUtils.w("i="+i);
                }
            }
        });

        backgroundThread.start();

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        stopRequested=true;
    }


正しい例1:
//           
    private static boolean stopRequested;

    private static synchronized void requestStop(){
        stopRequested=true;

    }
    private static synchronized boolean stopRequested(){
        return stopRequested;
    }

    private void test_2(){
        Thread backgroundThread=new Thread(new Runnable() {
            @Override
            public void run() {
                int i=0;
                while(!stopRequested()){
                    i++;
                    LogUtils.w("i="+i);
                }
            }
        });

        backgroundThread.start();

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        requestStop();

    }


正しい例2:
//volatile          ,                                。
private static volatile boolean stopRequested;


    private void test_2(){
        Thread backgroundThread=new Thread(new Runnable() {
            @Override
            public void run() {
                int i=0;
                while(!stopRequested){
                    i++;
                    LogUtils.w("i="+i);
                }
            }
        });

        backgroundThread.start();

        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        stopRequested=true;

    }


提案:最良の方法は、可変データを共有しないか、可変データを共有しないかです.すなわち、可変データは単一スレッドに制限される.
2、過度の同期
/**   */
public class ForwardingSet implements Set {
    private final Set s;

    public ForwardingSet(Set s) {
        this.s = s;
    }

    @Override
    public int size() {
        return 0;
    }

    @Override
    public boolean isEmpty() {
        return s.isEmpty();
    }

    @Override
    public boolean contains(Object o) {
        return s.contains(o);
    }

    @NonNull
    @Override
    public Iterator iterator() {
        return s.iterator();
    }

    @NonNull
    @Override
    public Object[] toArray() {
        return s.toArray();
    }

    @NonNull
    @Override
    public  T[] toArray(T[] a) {
        return s.toArray(a);
    }

    @Override
    public boolean add(E e) {
        return s.add(e);
    }

    @Override
    public boolean remove(Object o) {
        return s.remove(o);
    }

    @Override
    public boolean containsAll(Collection> c) {
        return s.containsAll(c);
    }

    @Override
    public boolean addAll(Collection extends E> c) {
        return s.addAll(c);
    }

    @Override
    public boolean retainAll(Collection> c) {
        return s.retainAll(c);
    }

    @Override
    public boolean removeAll(Collection> c) {
        return s.removeAll(c);
    }

    @Override
    public void clear() {
        s.clear();
    }

    @Override
    public boolean equals(Object obj) {
        return s.equals(obj);
    }

    @Override
    public int hashCode() {
        return s.hashCode();
    }

    @Override
    public String toString() {
        return s.toString();
    }


}

/**       */
public class ObservableSet extends ForwardingSet {

    public ObservableSet(Set s) {
        super(s);
    }

    private final List> observers=new ArrayList<>();

    public void addObserver(SetObserver observer){
        synchronized (observers){
            observers.add(observer);
        }
    }

    public boolean removeObserver(SetObserver observer){
        synchronized (observers){
            return observers.remove(observer);
        }
    }

    private void notifyElementAdded(E element){
        synchronized (observers){
            for(SetObserver observer: observers){
                observer.added(this,element);
            }
        }
    }

    public  boolean add(E element){
        boolean added=super.add(element);
        if(added){
            notifyElementAdded(element);
        }
        return added;
    }

    public boolean addAll(Collection extends E> c){
        boolean result=false;
        for(E elemnet:c){
            result|=add(elemnet);
        }
        return result;
    }

    
    public interface SetObserver{
        void added(ObservableSet set,E element);
    }
}

次のテストでは、反復は同期されたブロックであり、同時変更は防止されますが、反復スレッド自体が観察可能なセットにコールバックすることは防止されません.また、observersリストの変更は防止されません.
/**
     *    ConcurrentModificationException            
     *         ???
     */
    private void test_1() {
        ObservableSet set=new ObservableSet(new HashSet());

        set.addObserver(new ObservableSet.SetObserver() {
            @Override
            public void added(ObservableSet set, Integer element) {
                Log.w("addObserver","element="+element);
                if(element==23){
                    set.removeObserver(this);
                }
            }
        });

        for(int i=0;i<100;i++){
            LogUtils.w(this,"i="+i);
            set.add(i);
        }
    }


バックグラウンドスレッドはs.removeObserverを呼び出し、observersをロックしようとしたが、プライマリスレッドにロックがあるため、ロックを取得できなかった.その間、プライマリ・スレッドはバックグラウンド・スレッドを待って観察者の削除を完了し、デッドロックの原因となります.
/**    */
private void test_2() {
        ObservableSet set=new ObservableSet(new HashSet());

        set.addObserver(new ObservableSet.SetObserver() {
            @Override
            public void added(final ObservableSet s, Integer element) {
                Log.w("addObserver","element="+element);
                if(element==23){
                    ExecutorService executor= Executors.newSingleThreadExecutor();
                    final ObservableSet.SetObserver observer=this;
                    try {
                        executor.submit(new Runnable() {
                            @Override
                            public void run() {
                                s.removeObserver(observer);
                            }
                        }).get();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        for(int i=0;i<100;i++){
            LogUtils.w(this,"i="+i);
            set.add(i);
        }
    }

メソッドを変更して、外部メソッドの呼び出しを同期コードブロックから削除し、同時集合CopyOnWriteArrayListを使用します.
public class ObservableSet extends ForwardingSet {

    public ObservableSet(Set s) {
        super(s);
    }


//    private final List> observers=new ArrayList<>();
    private final List> observers=new CopyOnWriteArrayList<>();

    public void addObserver(SetObserver observer){
            observers.add(observer);
    }

    public boolean removeObserver(SetObserver observer){
            return observers.remove(observer);
    }

    private void notifyElementAdded(E element){
            for(SetObserver observer: observers){
                observer.added(this,element);
            }
    }

    public  boolean add(E element){
        boolean added=super.add(element);
        if(added){
            notifyElementAdded(element);
        }
        return added;
    }

    public boolean addAll(Collection extends E> c){
        boolean result=false;
        for(E elemnet:c){
            result|=add(elemnet);
        }
        return result;
    }


    public interface SetObserver{
        void added(ObservableSet set,E element);
    }
}


同期領域でできるだけ少ない仕事をしなければなりません.過度な同期は行わないでください.マルチコアの時代、過度な同期の実際のコストは、ロックを取得するのにかかるcpu時間ではなく、並列の機会を失ったこと、および各コアに一貫したメモリビューが必要であることによる遅延を指します.過剰同期のもう一つの潜在的なオーバーヘッドは、VM最適化コードの実行能力を制限することです.StringButterインスタンスは、ほとんど単一スレッドで使用されますが、内部同期が実行されます.したがって、StringButterは基本的にStringBuilderに取って代わられます.
executorとtaskはスレッドより優先
Javaプラットフォームにjavaが追加されました.util.concurrent.このパッケージには、インタフェースベースの柔軟なタスク実行ツールであるExecutor Frameworkが含まれています.コードを1行で作成するだけで、さまざまな面で優れたワークキューが作成されます.
ExecutorService executor=Executors.newSingleThreadExecutor();
//    runnable  
executor.execute(runnable);
//executor  (        ,          )
executor.shutdown();

executorサービスを利用して多くのことを行うことができます.
  • タスクの完了を待つ
  • タスクセット内のタスクまたはすべてのタスクの完了を待つ(invokeAnyまたはinvokeAllメソッドを使用)
  • executor serviceが優雅に終了するのを待つ(awaitTermination法を利用する)
  • タスクの完了時にこれらのタスクの結果を1つずつ取得する(ExecutorCompletionServiceを利用する)このキューからのリクエストを1つ以上のスレッドで処理したい場合は、異なる静的ファクトリを呼び出すだけで、このファクトリはスレッドプールと呼ばれる異なるexecutor serviceを作成します.スレッドプールの操作のほとんどを制御できるThreadPoolExecutorクラスを直接使用できます.

  • ウィジェットを作成する場合、またはサーバをリロードする場合はExecutorsを使用します.新CachedThreadPoolは、サーバの負荷が重すぎて、すべてのCPUが完全に占有されている場合に良い選択です.新CachedThreadPoolは、より多くのタスクがある場合、より多くのスレッドを作成し、状況をさらに悪化させるだけです.したがって、大負荷の製品サービスではExecutorsを使用することが望ましい.新FixedThteadPool(固定数のスレッドプール).それを最大限に制御するには、ThreadPoolExecutorクラスを使用します.
    タスクは2種類、Runnableとその近親Callable(戻り値あり)
    本質的には、Executor Fameworkが行う仕事は実行です.Executor Frameworkにもjavaに代わるものがあります.util.Timerのもので、timerは1つのスレッドだけでタスクを実行します.これは、長期にわたって実行されているタスクに直面すると、タイミングの正確性に影響します.timerの一意のスレッドがキャプチャされていない例外を投げ出すと、timerは実行を停止します.ScheduledThreadPoolExecutorを使用すると、複数のスレッドがサポートされ、未検出の例外が投げ出されたタスクから優雅に復元されます.Java同時トピックを参照:Timerの欠陥をScheduledExecutorServiceで置き換える
    未完待続~