【マルチスレッド高同時】コンテナとキューの同期


キーワード:同期コンテナ、キュー、ConcurrentMap、Copy-ON-Writeコンテナ、同時Queue、ConcurrentLinkedQueue、BlockQueueインタフェース、ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue、SynchronousQueue githubアドレス:https://github.com/zhaikaishun/concurrent_programmingコードは主にMulti_003 , Multi_003\Multi_003\src\com\kaishun\base\coll013

1.1同期容器


同期コンテナはすべてスレッドが安全ですが、いくつかのシーンでは、反復、ジャンプ、条件演算などの複合クラス操作を保護するためにロックが必要になる場合があります.これらの複合操作は、マルチスレッドが同時に実行されるときに実行されます.予期せぬ動作が発生する可能性があります.最も古典的なのは、コンテナの反復中にコンテンツが同時に変更されたため、ConcurrentModificationExceptionです.シンクロコンテナ:古いVector、HashTableなど.これらの容器の同期機能はJDKのCollectionsによって実現される.synchronizedなどのファクトリメソッドが実現されるが,その下位のメカニズムは,従来のsynchronizedキーワードを用いて各共通のメソッドを同期させ,毎回1つのスレッドしかコンテナの状態にアクセスできないようにすることにほかならない.これは、今日のインターネット時代の高同時性のニーズに合致せず、スレッドのセキュリティを保証すると同時に、十分なパフォーマンスが必要であることは明らかです.
JDK5.0以降、同期コンテナの代わりに複数の同時コンテナが提供され、パフォーマンスが向上します.コンカレントコンテナはコンカレントに特化して設計されており、ハッシュを与えるHashTableの代わりにConcurrentHashMapを使用し、ConcurrentHashMapでは、Voctorの代わりにCopyOnWriteArrayListを使用し、コンカレントのCopyOnWriteArraySet、およびコンカレントのQueue、ConcurrentLinkedQueueおよびLinkedBlockingQueueを使用し、前者は高性能のキューであり、後者はブロック形式のキューであり、具体的にQueueを実現するには、ArrayBlockingQueue、PriorityBlockingQueue、SynchronousQueueなど、まだまだたくさんあります.

1.2 ConcurrentMap


ConcurrentMapインタフェースには2つの重要な実装-ConcurrentHashMap-ConcurrentSkipListMap(ConcurrnetHashMapを補う同時ソート機能をサポート)ConcurrentHashMapは主にSegment(セグメント)方式を利用してロックの粒度を小さくし、同時性能を向上させるメカニズムを実現し、最大16セグメントに分けることができる.また、コードの多くの共有変数はVolatileキーワード宣言を使用しており、最初の時間に修正された内容を取得することを目的としており、図のように性能が非常によく、従来のHashTableは、map全体をロックし、ロックの粒度が比較的大きい.一方、CorrentHashMapは、このmapのある小さなSegmentに対してロックをかけ、どのセグメントでどのセグメントだけをロックするか、他のセグメントは影響せず、ロックの粒度が比較的小さいため、同時性能を向上させる.
具体的にどのように使うかは、前のHashMapとほぼそっくりなので、例を見てみましょう
        ConcurrentHashMap chm = new ConcurrentHashMap();
        chm.put("k1", "v1");
        chm.put("k2", "v2");
        chm.put("k3", "v3");
        chm.putIfAbsent("k4", "vvvv"); //  key , 

1.3 Copy-ON-Write容器


Copy-On-WriteはCOWと略称され、プログラム設計における最適化戦略である.コピーコンテナは、書き込み時にコピーしたコンテナで、現在のコンテナをコピーして新しいコンテナをコピーし、信頼したいコンテナに要素を追加し、追加後、講原コンテナの参照で新しいコンテナを指します.このような利点は、現在のコンテナには要素が追加されていないため、CopyOnWriteも読み書き分離の考え方であり、読み書きが異なるコンテナであり、読み書きが少ないシーンに適していることです.JDKのCOWコンテナは2種類あります:CopyOnWriteArrayListとCopyOnWriteArraySet、COWコンテナは非常に有用で、非常に多くの同時コンテナシーンで使用することができます.使用方法も元のArrayList,setと同じです
        CopyOnWriteArrayList cwal = new CopyOnWriteArrayList();
        CopyOnWriteArraySet cwas = new CopyOnWriteArraySet();

2.1同時Queue


JDKは、コンカレントキュー上で2つの実装を提供しています.1つは、ConcurrentLinkedQueueに代表される高性能キューであり、1つはBlockingQueueインタフェースに代表されるブロックキューであり、いずれもQueueから継承されます.

2.2 ConcurrentLinkedQueue


ConcurrentLinkedQueue:高同時シーンに適したキューであり、ロックなしで高同時状態での高性能を実現している.通常、ConcurrentLinkedQueueの性能はBlockingQueueより優れている.彼はリンクノードに基づく無境界スレッドセキュリティキューであり、このキューの要素は先進的な先出しの原則に従い、ヘッダは最新の加入であり、末尾は最近加入している.このキューではnull要素は許可されません.
ConcurrentLinkedQueueの重要な方法:add()とofferはいずれも要素を追加する方法(ConcurrentLinkedQueueではこの2つの方法に違いはありません)poll()とpeek()はいずれもヘッダ要素ノードであり、前者は要素を削除し、後者は削除しません.例:com.kaishun.base.coll013.UseQueue高性能無ブロック無境界キュー:ConcurrentLinkedQueue
// :ConcurrentLinkedQueue

ConcurrentLinkedQueue q = new ConcurrentLinkedQueue();
q.offer("a");
q.offer("b");
q.offer("c");
q.offer("d");
q.add("e");

System.out.println(q.poll());   //a  , 
System.out.println(q.size());   //4
System.out.println(q.peek());   //b
System.out.println(q.size());   //4

---- -----
a
4
b
4

2.3 BlockQueueインタフェース


5種類のqueueの実装があります.

ArrayBlockingQueue


配列ベースのブロックキュー実装は、ArrayBlockingQueue内部で、一定の配列を維持しながらキュー内のデータオブジェクトをキャッシュし、その内部に読み書き分離が実現されていないことは、生産と消費が完全に並列できないことを意味し、長さは定義が必要であり、先進先出または先進後出を指定することができる.有界行列とも呼ばれ、多くの場合に適しています.
        ArrayBlockingQueue array = new ArrayBlockingQueue(5);
        array.put("a");
        array.put("b");
        array.add("c");
        array.add("d");
        array.add("e");
//      array.add("f");
        System.out.println(array.offer("a", 3, TimeUnit.SECONDS));

5つの長さを指定したので、前に5つ追加しましたが、後に再度追加した場合、3秒以内に追加できません.3秒後に1つのfalseを返し、出力します.
false

超えてもaddメソッドを使うと、IllegalStateException:Queue full、例えば
        ArrayBlockingQueue array = new ArrayBlockingQueue(5);
        array.put("a");
        array.put("b");
        array.add("c");
        array.add("d");
        array.add("e");
        array.add("f");  // 
        System.out.println(array.offer("a", 3, TimeUnit.SECONDS));

LinkedBlockingQueue:


チェーンテーブルベースのブロックキューは、ArrayBlockingQueueと同様に、内部にもデータバッファキュー(チェーンテーブルで構成されている)が維持されており、LinkBlockingQueueが同時データをコミカルに処理できるのは、内部に読み書き分離ロックが実現され、生産者と消費者の完全な並列運転が実現されているためであり、彼は無境界キューである.
        // 
        LinkedBlockingQueue q = new LinkedBlockingQueue();
        q.offer("a");
        q.offer("b");
        q.offer("c");
        q.offer("d");
        q.offer("e");
        q.add("f");
        System.out.println(q.size());

        for (Iterator iterator = q.iterator(); iterator.hasNext();) {
            String string = (String) iterator.next();
            System.out.println(string);
        }
----------- -----------
6
a
b
c
d
e
f

drainToは一度に複数の要素を取ります
        // 
        LinkedBlockingQueue q = new LinkedBlockingQueue();
        q.offer("a");
        q.offer("b");
        q.offer("c");
        q.offer("d");
        q.offer("e");
        q.add("f");
        System.out.println(q.size());

        List list = new ArrayList();
        // 3 ,  list  
        System.out.println(q.drainTo(list, 3));
        System.out.println(list.size());
        for (String string : list) {
            System.out.println(string);
        }
----------- -----------
6
3
3
a
b
c

PriorityBlockingQueue:


優先度に基づくブロックキュー(優先度の判断は、関数が伝達するCompatorオブジェクトを構築することによって決定され、すなわち、伝達キューのオブジェクトはComparableインタフェースを実現しなければならない)であり、PriorityBlockingQueueを実現する際に、内部制御スレッド同期のロックは公平ロックを採用し、彼も無境界のキューである.例:Taskクラス、Comparableメソッドを実装し、comparareToメソッドを書き換える
public class Task implements Comparable<Task>{

    private int id ;
    private String name;
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }

    @Override
    public int compareTo(Task task) {
        return this.id > task.id ? 1 : (this.id < task.id ? -1 : 0);  
    }

    public String toString(){
        return this.id + "," + this.name;
    }

}

UsePriorityBlockingQueueクラステストが整列キューであるかどうか
public class UsePriorityBlockingQueue {


    public static void main(String[] args) throws Exception{


        PriorityBlockingQueue q = new PriorityBlockingQueue();

        Task t1 = new Task();
        t1.setId(3);
        t1.setName("id 3");
        Task t2 = new Task();
        t2.setId(4);
        t2.setName("id 4");
        Task t3 = new Task();
        t3.setId(1);
        t3.setName("id 1");

        //return this.id > task.id ? 1 : 0;
        q.add(t1);  //3
        q.add(t2);  //4
        q.add(t3);  //1

        // 1 3 4
        System.out.println(" :" + q);
        System.out.println(q.take().getId());
        System.out.println(" :" + q);
        System.out.println(q.take().getId());
        System.out.println(q.take().getId());
    }
}

しゅつりょく
 :[1,id 1, 4,id 4, 3,id 3]
1
 :[3,id 3, 4,id 4]
3
4

説明、このキューはtakeがない場合、まだソートされていません.take()の場合、ソート、比較の方法を利用します.

DelayQueue:


遅延時間を持つQueueは、指定された遅延時間になった場合にのみキューから取得できます.DelayQueueの要素はDelayedインタフェースを実装する必要があります.DelayQueueはサイズ制限のないキューで、キャッシュタイムアウトのデータを1か所、タスクタイムアウト処理、空き接続のクローズなど、適用シーンが多いです.クラシックなネットカフェのオンラインケース:WangminはDelayedインタフェースを実現しました
public class Wangmin implements Delayed {  

    private String name;  
    //   
    private String id;  
    //   
    private long endTime;  
    // 
    private TimeUnit timeUnit = TimeUnit.SECONDS;

    public Wangmin(String name,String id,long endTime){  
        this.name=name;  
        this.id=id;  
        this.endTime = endTime;  
    }  

    public String getName(){  
        return this.name;  
    }  

    public String getId(){  
        return this.id;  
    }  

    /** 
     *   
     */  
    @Override  
    public long getDelay(TimeUnit unit) { 
        //return unit.convert(endTime, TimeUnit.MILLISECONDS) - unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        return endTime - System.currentTimeMillis();
    }  

    /** 
     *   
     */  
    @Override  
    public int compareTo(Delayed delayed) {  
        Wangmin w = (Wangmin)delayed;  
        return this.getDelay(this.timeUnit) - w.getDelay(this.timeUnit) > 0 ? 1:0;  
    } 

WangBaクラス
public class WangBa implements Runnable {  

    private DelayQueue queue = new DelayQueue();  

    public boolean yinye =true;  

    public void shangji(String name,String id,int money){  
        Wangmin man = new Wangmin(name, id, 1000 * money + System.currentTimeMillis());  
        System.out.println(" "+man.getName()+"  "+man.getId()+" "+money+" , ...");  
        this.queue.add(man);  
    }  

    public void xiaji(Wangmin man){  
        System.out.println(" "+man.getName()+"  "+man.getId()+" ...");  
    }  

    @Override  
    public void run() {  
        while(yinye){  
            try {  
                Wangmin man = queue.take();  
                xiaji(man);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
    }  

    public static void main(String args[]){  
        try{  
            System.out.println(" ");  
            WangBa siyu = new WangBa();  
            Thread shangwang = new Thread(siyu);  
            shangwang.start();  

            siyu.shangji(" ", "123", 1);  
            siyu.shangji(" ", "234", 10);  
            siyu.shangji(" ", "345", 5);  
        }  
        catch(Exception e){  
            e.printStackTrace();
        }  

    }  
}  

しゅつりょく
 
   123 1 , ...
   234 10 , ...
   345 5 , ...
   123 ...
   345 ...
   234 ...

SynchronousQueue:


バッファリングされていないキューで、生産者が生成したデータは直接消費者に取得され、消費される個人は仮想キューと理解され、このキューは要素を保存せず、生産と消費は互いに投げ合うだけだ.
        final SynchronousQueue q = new SynchronousQueue();
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(q.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        t1.start();
        Thread t2 = new Thread(new Runnable() {

            @Override
            public void run() {
                q.add("asdasd");
            }
        });
        t2.start();
--------- -------
asdasd

-本文は先人の経験を総括して、とても疲れていることを総括して、特にインターネットのアーキテクチャの白鶴翔先生に感謝します