コンカレント(Concurrency)

16019 ワード

Synchronizationにはどのような役割がありますか?
  • mutual exclusion(反発):shared mutable dataにアクセスするとき
  • 、異なるスレッドがアクセスするオブジェクトが一致する状態になることを防止する.
  • reliable communication between threads(スレッド間の信頼性の高い通信):1つのスレッドで前のスレッドの変更結果
  • が表示されます.
    atomicに対する理解
  • reading or writing a variable(longとdoubleタイプを除く)は原子性の
  • である
    Synchronization,volatile,AtomicLongが正しく使用されているいくつかの小さな例
    // Broken! - How long would you expect this program to run?
    //boolean , (mutual exclusion), 
    public class StopThread {
        private static boolean stopRequested;
        public static void main(String[] args)
                throws InterruptedException {
            Thread backgroundThread = new Thread(() -> {
                int i = 0;
                while (!stopRequested)
                    i++;
            });
            backgroundThread.start();
            TimeUnit.SECONDS.sleep(1);
            stopRequested = true;
        }
    }
    
    //  
    // Properly synchronized cooperative thread termination
    // Synchronization is not guaranteed to work unless both read and write operations are synchronized
    public class StopThread {
        private static boolean stopRequested;
        private static synchronized void requestStop() {
            stopRequested = true;
        }
        private static synchronized boolean stopRequested() {
            return stopRequested;
        }
        public static void main(String[] args)
                throws InterruptedException {
            Thread backgroundThread = new Thread(() -> {
                int i = 0;
                while (!stopRequested())
                    i++;
            });
            backgroundThread.start();
            TimeUnit.SECONDS.sleep(1);
            requestStop();
        }
    }
    
    //  
    // Cooperative thread termination with a volatile field
    // volatile  reliable communication between threads, mutual exclusion
    public class StopThread {
        private static volatile boolean stopRequested;
        public static void main(String[] args)
                throws InterruptedException {
            Thread backgroundThread = new Thread(() -> {
                int i = 0;
                while (!stopRequested)
                    i++;
            });
            backgroundThread.start();
            TimeUnit.SECONDS.sleep(1);
            stopRequested = true;
        }
    }
    
    // Broken - requires synchronization!
    // increment operator (++) is not atomic: first it reads the value, and then it writes back a new value, equal to the old value plus one
    private static volatile int nextSerialNumber = 0;
    public static int generateSerialNumber() {
        return nextSerialNumber++;
    }
    
    // Lock-free synchronization with java.util.concurrent.atomic
    // java.util.concurrent.atomic: This package provides primitives for lock-free, thread-safe programming on single variables
    private static final AtomicLong nextSerialNum = new AtomicLong();
    public static long generateSerialNumber() {
        return nextSerialNum.getAndIncrement();
    }
    

    synchronizationを使用する場合、何に注意すればいいですか?
  • synchronized regions内部、do as little work as possible
  • Obtain the lock, examine the shared data, transform it as necessary, and drop the lock. 
    If you must perform some time-consuming activity, find a way to move it out of the synchronized region
    
  • synchronizationを過度に使用すると、プログラム異常またはデッドロック
  • を引き起こす可能性がある.
    // Broken - invokes alien method from synchronized block!
    public class ObservableSet extends ForwardingSet {
        public ObservableSet(Set set) { super(set); }
    
        private final List> observers
                = new ArrayList<>();
    
        public void addObserver(SetObserver observer) {
            synchronized(observers) {
                observers.add(observer);
            }
        }
    
        public boolean removeObserver(SetObserver observer) {
            synchronized(observers) {
                return observers.remove(observer);
            }
        }
    
        private void notifyElementAdded(E element) {
            synchronized(observers) {
                for (SetObserver observer : observers)
                    observer.added(this, element); // alien method( )
            }
        }
    
        @Override public boolean add(E element) {
            boolean added = super.add(element);
            if (added)
                notifyElementAdded(element);
            return added;
        }
    
        @Override public boolean addAll(Collection extends E> c) {
            boolean result = false;
            for (E element : c)
                result |= add(element);  // Calls notifyElementAdded
            return result;
        }
    }
    
    @FunctionalInterface public interface SetObserver {
        // Invoked when an element is added to the observable set
        void added(ObservableSet set, E element);
    }
    
    public static void main(String[] args) {
        ObservableSet set = new ObservableSet<>(new HashSet<>());
    
       // work fine
        set.addObserver((s, e) -> System.out.println(e));
    
        for (int i = 0; i < 100; i++)
            set.add(i);
    }
    
    // throw ConcurrentModificationException
    set.addObserver(new SetObserver<>() {
        public void added(ObservableSet s, Integer e) {
            System.out.println(e);
            if (e == 23)
                s.removeObserver(this);
        }
    });
    
    // uses a background thread needlessly, deadlock
    set.addObserver(new SetObserver<>() {
       public void added(ObservableSet s, Integer e) {
          System.out.println(e);
          if (e == 23) {
             ExecutorService exec =
                   Executors.newSingleThreadExecutor();
             try {
                exec.submit(() -> s.removeObserver(this)).get();
             } catch (ExecutionException | InterruptedException ex) {
                throw new AssertionError(ex);
             } finally {
                exec.shutdown();
             }
          }
       }
    });
    
  • 上小例是正方法
  • //   :Alien method moved outside of synchronized block - open calls
    private void notifyElementAdded(E element) {
        List> snapshot = null;
        synchronized(observers) {
            snapshot = new ArrayList<>(observers);
        }
        for (SetObserver observer : snapshot)
            observer.added(this, element);
    }
    
    //  :Thread-safe observable set with CopyOnWriteArrayList
    //  rarely modified and often traversed, CopyOnWriteArrayList
    private final List> observers = new CopyOnWriteArrayList<>();
    public void addObserver(SetObserver observer) {
        observers.add(observer);
    }
    public boolean removeObserver(SetObserver observer) {
        return observers.remove(observer);
    }
    private void notifyElementAdded(E element) {
        for (SetObserver observer : observers)
            observer.added(this, element);
    }
    
  • は、StringBufferではなくStringBuilderを優先的に使用する.Javaを優先的に使用します.util.concurrent.javaではなくutil.Random

  • Concurrencyを使用する場合、注意すべきことは何ですか?
  • threads向けプログラミングではなくexecutors,tasks,and streams向けプログラミング
  • //Creating a work queue
    ExecutorService exec = Executors.newSingleThreadExecutor();
    //submit a runnable for execution
    exec.execute(runnable);
    //tell the executor to terminate gracefully
    exec.shutdown();
    //wait for a particular task to complete
    with the get method
    //wait for any or all of a collection of tasks to complete
    using the invokeAny or invokeAll methods
    //wait for the executor service to terminate
    using the awaitTermination method
    //retrieve the results of tasks one by one as they complete
    using an ExecutorCompletionService
    //schedule tasks to run at a particular time or to run periodically
    using a ScheduledThreadPoolExecutor
    //For a small program, or a lightly loaded server
    using Executors.newCachedThreadPool
    //For a heavily loaded production server
    using Executors.newFixedThreadPool
    

    Threadとexecutor frameworkの理解について
  • a Threadは同時に2つの役割を果たす:the unit of workとthe execution mechanism
  • In the executor framework,the unit of workとthe execution mechanismは別々である.the unit of workはtaskに抽象化され、taskを実行するメカニズムはexecutor serviceに抽象化される.2種類のtaskがある:RunnableとCallable(returns a valueでthrow任意のexceptionsが可能)
  • fork-joinに対する理解を話してください
  • Java 7,the Executor Frameworkはfork-join tasks
  • をサポートする
  • A fork-join taskは、a ForkJoinTaskを構成するthreadsがcess these tasksだけでなく、「steal」tasks from one another to ensure that all threads remain busy、higher CPU utilization、higher throughput、and lower latency
  • をもたらすForkJoinTask instanceによって表される.
    waitとnotifyの正確な使用の困難性を考慮すると、まずhigher-level concurrency utilitiesの使用を考慮すべきである.
    higher-level concurrency utilitiesはどの種類に分けられますか?
  • the Executor Framework
  • concurrent collections
  • synchronizers

  • concurrent collectionsの小例
  • Map’s putIfAbsent(key, value): inserts a mapping for a key if none was present and returns the previous value associated with the key, or null if there was none
  • // This method simulates the behavior of String.intern
    // Concurrent canonicalizing map atop ConcurrentMap - not optimal
    private static final ConcurrentMap map = new ConcurrentHashMap<>();
    
    public static String intern(String s) {
        String previousValue = map.putIfAbsent(s, s);
        return previousValue == null ? s : previousValue;
    }
    
    // Concurrent canonicalizing map atop ConcurrentMap - faster!
    public static String intern(String s) {
        String result = map.get(s);
        if (result == null) {
            result = map.putIfAbsent(s, s);
            if (result == null)
                result = s;
        }
        return result;
    }
    
  • はConcurrent collectionsを優先的に使用し、synchronized collectionsを使用するべきではありません.例えばuse ConcurrentHashMap,not use Collections.synchronizedMap
  • いくつかのcollection interfacesはBlockingQueue
  • のようなblocking operationsを実現した.
    Synchronizers(シンクロナイザ)の理解について
  • Synchronizers are objects that enable threads to wait for one another, allowing them to coordinate their activities
  • でよく使われるsynchronizersには、CountDownLatch、Semaphore
  • があります.
  • よく使われないsynchronizersには、CyclicBarrier、Exchanger
  • があります.
  • 最強synchronizers:Phaser
  • CountDownLatchの理解について
  • single-use barriersで、one or more threads to wait for one or more other threads to do something
  • を許可します.
  • The sole constructor for CountDownLatch takes an int that is the number of times the countDown method must be invoked on the latch before all waiting threads are allowed to proceed

  • CountDownLatchを使用した例を挙げます
  • suppose you want to build a simple framework for timing the concurrent execution of an action. This framework consists of a single method that takes an executor to execute the action, a concurrency level representing the number of actions to be executed concurrently, and a runnable representing the action. All of the worker threads ready themselves to run the action before the timer thread starts the clock. When the last worker thread is ready to run the action, the timer thread “fires the starting gun,” allowing the worker threads to perform the action. As soon as the last worker thread finishes performing the action, the timer thread stops the clock
  • // Simple framework for timing concurrent execution
    public static long time(Executor executor, int concurrency,
                Runnable action) throws InterruptedException {
        CountDownLatch ready = new CountDownLatch(concurrency);
        CountDownLatch start = new CountDownLatch(1);
        CountDownLatch done  = new CountDownLatch(concurrency);
    
        for (int i = 0; i < concurrency; i++) {
            executor.execute(() -> {
                ready.countDown(); // Tell timer we're ready
                try {
                    start.await(); // Wait till peers are ready
                    action.run();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    done.countDown();  // Tell timer we're done
                }
            });
        }
    
        ready.await();     // Wait for all workers to be ready
        long startNanos = System.nanoTime();
        start.countDown(); // And they're off!
        done.await();      // Wait for all workers to finish
        return System.nanoTime() - startNanos;
    }
    
  • For interval timing, always use System.nanoTime rather than System.currentTimeMillis

  • 遺産コードを維持するためにwaitとnotifyを使用する場合、何に注意すればいいですか?
  • Always use the wait loop idiom to invoke the wait method
  • // The standard idiom for using the wait method
    synchronized (obj) {
        while ()
            obj.wait(); // (Releases lock, and reacquires on wakeup)
        ... // Perform action appropriate to condition
    }
    
  • notify methodに対してnotifyAll method
  • が優先的に使用される
    thread safetyのレベルはどれらがありますか?
  • Immutable:クラスの例は定数です.たとえば、String,Long,BigInteger
  • などです.
  • Unconditionally thread-safe:クラスの例は変数ですが、内部にはAtomicLong,ConcurrentHashMapなどの十分なsynchronizationが行われています.柔軟性、細粒度、安全な同時制御を実現するために、synchronized methodsではなく、Unconditionally thread-safeクラスでprivate final lock objectを使用することを奨励します.private lock objectはクラス外では見えないため、クライアントとthe object’s synchronizationは、
  • などのインタラクションを生成します.
    private final Object lock = new Object();
    public void foo() {
        synchronized(lock) {
            ...
        }
    }
    
  • Conditionally thread-safe:Unconditionally thread-safeと似ています.いくつかの方法では、安全な同時実行に使用するために外部synchronizationが必要です.例えば、いくつかの集合のCollections.synchronizedラップ
  • Not thread-safe:クラスのインスタンスは変数です.同時実行のために、クライアントは外部でsynchronizationを使用して各メソッドをラップする必要があります.たとえば、ArrayList、HashMap
  • などです.
  • Thread-hostile:クライアントがメソッドごとにsynchronizationを外部に追加しても、安全な同時実行には使用できません.modifying static data without synchronization
  • に由来することが多い
    Lazy initializationとは
  • は、フィールドの値が必要になるまで、フィールド
  • を初期化する.
    Lazy initializationを使用する場合、注意すべきことは何ですか?
  • スレッドセキュリティの観点から、ほとんどの場合、Lazy initialization
  • ではなくnormal initializationを使用するべきである.
  • 高性能を得るため、または有害な初期化サイクルを破るために、Lazy initializationを使用してフィールド
  • をインスタンス化することが考えられる.
    Lazy initializationにはどんな良い技術がありますか?
  • instance fieldsのlazy initializationについてdouble-check習わし
  • を用いる
    // Double-check idiom for lazy initialization of instance fields
    //  (result),  (field) , read only once, (result) 1.4 
    private volatile FieldType field;
    
    private FieldType getField() {
        FieldType result = field;
        if (result == null) {  // First check (no locking)
        synchronized(this) {
                if (field == null)  // Second check (with locking)
                    field = result = computeFieldValue();
            }
        }
        return result;
    }
    
  • static fieldsのlazy initializationに対してholder class習慣
  • を使用する
    private static class FieldHolder {
        static final FieldType field = computeFieldValue();
    }
    
    private static FieldType getField() { return FieldHolder.field; }
    
  • 反復初期化を許容できるinstance fieldsについてsingle-check慣習
  • を用いる.
    // Single-check idiom - can cause repeated initialization!
    private volatile FieldType field;
    
    private FieldType getField() {
        FieldType result = field;
        if (result == null)
            field = result = computeFieldValue();
        return result;
    }
    

    thread schedulerに頼らないで
  • 頼むなyieldとスレッド優先度
  • Threadsはbusy-waitではなく、check a shared object waiting for its state to change
  • // Awful CountDownLatch implementation - busy-waits incessantly!
    public class SlowCountDownLatch {
        private int count;
    
        public SlowCountDownLatch(int count) {
            if (count < 0)
               throw new IllegalArgumentException(count + " < 0");
            this.count = count;
        }
    
        public void await() {
            while (true) {
                synchronized(this) {
                    if (count == 0)
                        return;
                }
            }
        }
    
        public synchronized void countDown() {
            if (count != 0)
                count--;
        }
    }