【マルチスレッド高同時】コンテナとキューの同期
23885 ワード
キーワード:同期コンテナ、キュー、ConcurrentMap、Copy-ON-Writeコンテナ、同時Queue、ConcurrentLinkedQueue、BlockQueueインタフェース、ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue、SynchronousQueue githubアドレス:https://github.com/zhaikaishun/concurrent_programmingコードは主にMulti_003 , Multi_003\Multi_003\src\com\kaishun\base\coll013
同期コンテナはすべてスレッドが安全ですが、いくつかのシーンでは、反復、ジャンプ、条件演算などの複合クラス操作を保護するためにロックが必要になる場合があります.これらの複合操作は、マルチスレッドが同時に実行されるときに実行されます.予期せぬ動作が発生する可能性があります.最も古典的なのは、コンテナの反復中にコンテンツが同時に変更されたため、ConcurrentModificationExceptionです.シンクロコンテナ:古いVector、HashTableなど.これらの容器の同期機能はJDKのCollectionsによって実現される.synchronizedなどのファクトリメソッドが実現されるが,その下位のメカニズムは,従来のsynchronizedキーワードを用いて各共通のメソッドを同期させ,毎回1つのスレッドしかコンテナの状態にアクセスできないようにすることにほかならない.これは、今日のインターネット時代の高同時性のニーズに合致せず、スレッドのセキュリティを保証すると同時に、十分なパフォーマンスが必要であることは明らかです.
JDK5.0以降、同期コンテナの代わりに複数の同時コンテナが提供され、パフォーマンスが向上します.コンカレントコンテナはコンカレントに特化して設計されており、ハッシュを与えるHashTableの代わりにConcurrentHashMapを使用し、ConcurrentHashMapでは、Voctorの代わりにCopyOnWriteArrayListを使用し、コンカレントのCopyOnWriteArraySet、およびコンカレントのQueue、ConcurrentLinkedQueueおよびLinkedBlockingQueueを使用し、前者は高性能のキューであり、後者はブロック形式のキューであり、具体的にQueueを実現するには、ArrayBlockingQueue、PriorityBlockingQueue、SynchronousQueueなど、まだまだたくさんあります.
ConcurrentMapインタフェースには2つの重要な実装-ConcurrentHashMap-ConcurrentSkipListMap(ConcurrnetHashMapを補う同時ソート機能をサポート)ConcurrentHashMapは主にSegment(セグメント)方式を利用してロックの粒度を小さくし、同時性能を向上させるメカニズムを実現し、最大16セグメントに分けることができる.また、コードの多くの共有変数はVolatileキーワード宣言を使用しており、最初の時間に修正された内容を取得することを目的としており、図のように性能が非常によく、従来のHashTableは、map全体をロックし、ロックの粒度が比較的大きい.一方、CorrentHashMapは、このmapのある小さなSegmentに対してロックをかけ、どのセグメントでどのセグメントだけをロックするか、他のセグメントは影響せず、ロックの粒度が比較的小さいため、同時性能を向上させる.
具体的にどのように使うかは、前のHashMapとほぼそっくりなので、例を見てみましょう
Copy-On-WriteはCOWと略称され、プログラム設計における最適化戦略である.コピーコンテナは、書き込み時にコピーしたコンテナで、現在のコンテナをコピーして新しいコンテナをコピーし、信頼したいコンテナに要素を追加し、追加後、講原コンテナの参照で新しいコンテナを指します.このような利点は、現在のコンテナには要素が追加されていないため、CopyOnWriteも読み書き分離の考え方であり、読み書きが異なるコンテナであり、読み書きが少ないシーンに適していることです.JDKのCOWコンテナは2種類あります:CopyOnWriteArrayListとCopyOnWriteArraySet、COWコンテナは非常に有用で、非常に多くの同時コンテナシーンで使用することができます.使用方法も元のArrayList,setと同じです
JDKは、コンカレントキュー上で2つの実装を提供しています.1つは、ConcurrentLinkedQueueに代表される高性能キューであり、1つはBlockingQueueインタフェースに代表されるブロックキューであり、いずれもQueueから継承されます.
ConcurrentLinkedQueue:高同時シーンに適したキューであり、ロックなしで高同時状態での高性能を実現している.通常、ConcurrentLinkedQueueの性能はBlockingQueueより優れている.彼はリンクノードに基づく無境界スレッドセキュリティキューであり、このキューの要素は先進的な先出しの原則に従い、ヘッダは最新の加入であり、末尾は最近加入している.このキューではnull要素は許可されません.
ConcurrentLinkedQueueの重要な方法:add()とofferはいずれも要素を追加する方法(ConcurrentLinkedQueueではこの2つの方法に違いはありません)poll()とpeek()はいずれもヘッダ要素ノードであり、前者は要素を削除し、後者は削除しません.例:com.kaishun.base.coll013.UseQueue高性能無ブロック無境界キュー:ConcurrentLinkedQueue
5種類のqueueの実装があります.
配列ベースのブロックキュー実装は、ArrayBlockingQueue内部で、一定の配列を維持しながらキュー内のデータオブジェクトをキャッシュし、その内部に読み書き分離が実現されていないことは、生産と消費が完全に並列できないことを意味し、長さは定義が必要であり、先進先出または先進後出を指定することができる.有界行列とも呼ばれ、多くの場合に適しています.
5つの長さを指定したので、前に5つ追加しましたが、後に再度追加した場合、3秒以内に追加できません.3秒後に1つのfalseを返し、出力します.
超えてもaddメソッドを使うと、IllegalStateException:Queue full、例えば
チェーンテーブルベースのブロックキューは、ArrayBlockingQueueと同様に、内部にもデータバッファキュー(チェーンテーブルで構成されている)が維持されており、LinkBlockingQueueが同時データをコミカルに処理できるのは、内部に読み書き分離ロックが実現され、生産者と消費者の完全な並列運転が実現されているためであり、彼は無境界キューである.
drainToは一度に複数の要素を取ります
優先度に基づくブロックキュー(優先度の判断は、関数が伝達するCompatorオブジェクトを構築することによって決定され、すなわち、伝達キューのオブジェクトはComparableインタフェースを実現しなければならない)であり、PriorityBlockingQueueを実現する際に、内部制御スレッド同期のロックは公平ロックを採用し、彼も無境界のキューである.例:Taskクラス、Comparableメソッドを実装し、comparareToメソッドを書き換える
UsePriorityBlockingQueueクラステストが整列キューであるかどうか
しゅつりょく
説明、このキューはtakeがない場合、まだソートされていません.take()の場合、ソート、比較の方法を利用します.
遅延時間を持つQueueは、指定された遅延時間になった場合にのみキューから取得できます.DelayQueueの要素はDelayedインタフェースを実装する必要があります.DelayQueueはサイズ制限のないキューで、キャッシュタイムアウトのデータを1か所、タスクタイムアウト処理、空き接続のクローズなど、適用シーンが多いです.クラシックなネットカフェのオンラインケース:WangminはDelayedインタフェースを実現しました
WangBaクラス
しゅつりょく
バッファリングされていないキューで、生産者が生成したデータは直接消費者に取得され、消費される個人は仮想キューと理解され、このキューは要素を保存せず、生産と消費は互いに投げ合うだけだ.
-本文は先人の経験を総括して、とても疲れていることを総括して、特にインターネットのアーキテクチャの白鶴翔先生に感謝します
1.1同期容器
同期コンテナはすべてスレッドが安全ですが、いくつかのシーンでは、反復、ジャンプ、条件演算などの複合クラス操作を保護するためにロックが必要になる場合があります.これらの複合操作は、マルチスレッドが同時に実行されるときに実行されます.予期せぬ動作が発生する可能性があります.最も古典的なのは、コンテナの反復中にコンテンツが同時に変更されたため、ConcurrentModificationExceptionです.シンクロコンテナ:古いVector、HashTableなど.これらの容器の同期機能はJDKのCollectionsによって実現される.synchronizedなどのファクトリメソッドが実現されるが,その下位のメカニズムは,従来のsynchronizedキーワードを用いて各共通のメソッドを同期させ,毎回1つのスレッドしかコンテナの状態にアクセスできないようにすることにほかならない.これは、今日のインターネット時代の高同時性のニーズに合致せず、スレッドのセキュリティを保証すると同時に、十分なパフォーマンスが必要であることは明らかです.
JDK5.0以降、同期コンテナの代わりに複数の同時コンテナが提供され、パフォーマンスが向上します.コンカレントコンテナはコンカレントに特化して設計されており、ハッシュを与えるHashTableの代わりにConcurrentHashMapを使用し、ConcurrentHashMapでは、Voctorの代わりにCopyOnWriteArrayListを使用し、コンカレントのCopyOnWriteArraySet、およびコンカレントのQueue、ConcurrentLinkedQueueおよびLinkedBlockingQueueを使用し、前者は高性能のキューであり、後者はブロック形式のキューであり、具体的にQueueを実現するには、ArrayBlockingQueue、PriorityBlockingQueue、SynchronousQueueなど、まだまだたくさんあります.
1.2 ConcurrentMap
ConcurrentMapインタフェースには2つの重要な実装-ConcurrentHashMap-ConcurrentSkipListMap(ConcurrnetHashMapを補う同時ソート機能をサポート)ConcurrentHashMapは主にSegment(セグメント)方式を利用してロックの粒度を小さくし、同時性能を向上させるメカニズムを実現し、最大16セグメントに分けることができる.また、コードの多くの共有変数はVolatileキーワード宣言を使用しており、最初の時間に修正された内容を取得することを目的としており、図のように性能が非常によく、従来のHashTableは、map全体をロックし、ロックの粒度が比較的大きい.一方、CorrentHashMapは、このmapのある小さなSegmentに対してロックをかけ、どのセグメントでどのセグメントだけをロックするか、他のセグメントは影響せず、ロックの粒度が比較的小さいため、同時性能を向上させる.
具体的にどのように使うかは、前のHashMapとほぼそっくりなので、例を見てみましょう
ConcurrentHashMap chm = new ConcurrentHashMap();
chm.put("k1", "v1");
chm.put("k2", "v2");
chm.put("k3", "v3");
chm.putIfAbsent("k4", "vvvv"); // key ,
1.3 Copy-ON-Write容器
Copy-On-WriteはCOWと略称され、プログラム設計における最適化戦略である.コピーコンテナは、書き込み時にコピーしたコンテナで、現在のコンテナをコピーして新しいコンテナをコピーし、信頼したいコンテナに要素を追加し、追加後、講原コンテナの参照で新しいコンテナを指します.このような利点は、現在のコンテナには要素が追加されていないため、CopyOnWriteも読み書き分離の考え方であり、読み書きが異なるコンテナであり、読み書きが少ないシーンに適していることです.JDKのCOWコンテナは2種類あります:CopyOnWriteArrayListとCopyOnWriteArraySet、COWコンテナは非常に有用で、非常に多くの同時コンテナシーンで使用することができます.使用方法も元のArrayList,setと同じです
CopyOnWriteArrayList cwal = new CopyOnWriteArrayList();
CopyOnWriteArraySet cwas = new CopyOnWriteArraySet();
2.1同時Queue
JDKは、コンカレントキュー上で2つの実装を提供しています.1つは、ConcurrentLinkedQueueに代表される高性能キューであり、1つはBlockingQueueインタフェースに代表されるブロックキューであり、いずれもQueueから継承されます.
2.2 ConcurrentLinkedQueue
ConcurrentLinkedQueue:高同時シーンに適したキューであり、ロックなしで高同時状態での高性能を実現している.通常、ConcurrentLinkedQueueの性能はBlockingQueueより優れている.彼はリンクノードに基づく無境界スレッドセキュリティキューであり、このキューの要素は先進的な先出しの原則に従い、ヘッダは最新の加入であり、末尾は最近加入している.このキューではnull要素は許可されません.
ConcurrentLinkedQueueの重要な方法:add()とofferはいずれも要素を追加する方法(ConcurrentLinkedQueueではこの2つの方法に違いはありません)poll()とpeek()はいずれもヘッダ要素ノードであり、前者は要素を削除し、後者は削除しません.例:com.kaishun.base.coll013.UseQueue高性能無ブロック無境界キュー:ConcurrentLinkedQueue
// :ConcurrentLinkedQueue
ConcurrentLinkedQueue q = new ConcurrentLinkedQueue();
q.offer("a");
q.offer("b");
q.offer("c");
q.offer("d");
q.add("e");
System.out.println(q.poll()); //a ,
System.out.println(q.size()); //4
System.out.println(q.peek()); //b
System.out.println(q.size()); //4
---- -----
a
4
b
4
2.3 BlockQueueインタフェース
5種類のqueueの実装があります.
ArrayBlockingQueue
配列ベースのブロックキュー実装は、ArrayBlockingQueue内部で、一定の配列を維持しながらキュー内のデータオブジェクトをキャッシュし、その内部に読み書き分離が実現されていないことは、生産と消費が完全に並列できないことを意味し、長さは定義が必要であり、先進先出または先進後出を指定することができる.有界行列とも呼ばれ、多くの場合に適しています.
ArrayBlockingQueue array = new ArrayBlockingQueue(5);
array.put("a");
array.put("b");
array.add("c");
array.add("d");
array.add("e");
// array.add("f");
System.out.println(array.offer("a", 3, TimeUnit.SECONDS));
5つの長さを指定したので、前に5つ追加しましたが、後に再度追加した場合、3秒以内に追加できません.3秒後に1つのfalseを返し、出力します.
false
超えてもaddメソッドを使うと、IllegalStateException:Queue full、例えば
ArrayBlockingQueue array = new ArrayBlockingQueue(5);
array.put("a");
array.put("b");
array.add("c");
array.add("d");
array.add("e");
array.add("f"); //
System.out.println(array.offer("a", 3, TimeUnit.SECONDS));
LinkedBlockingQueue:
チェーンテーブルベースのブロックキューは、ArrayBlockingQueueと同様に、内部にもデータバッファキュー(チェーンテーブルで構成されている)が維持されており、LinkBlockingQueueが同時データをコミカルに処理できるのは、内部に読み書き分離ロックが実現され、生産者と消費者の完全な並列運転が実現されているためであり、彼は無境界キューである.
//
LinkedBlockingQueue q = new LinkedBlockingQueue();
q.offer("a");
q.offer("b");
q.offer("c");
q.offer("d");
q.offer("e");
q.add("f");
System.out.println(q.size());
for (Iterator iterator = q.iterator(); iterator.hasNext();) {
String string = (String) iterator.next();
System.out.println(string);
}
----------- -----------
6
a
b
c
d
e
f
drainToは一度に複数の要素を取ります
//
LinkedBlockingQueue q = new LinkedBlockingQueue();
q.offer("a");
q.offer("b");
q.offer("c");
q.offer("d");
q.offer("e");
q.add("f");
System.out.println(q.size());
List list = new ArrayList();
// 3 , list
System.out.println(q.drainTo(list, 3));
System.out.println(list.size());
for (String string : list) {
System.out.println(string);
}
----------- -----------
6
3
3
a
b
c
PriorityBlockingQueue:
優先度に基づくブロックキュー(優先度の判断は、関数が伝達するCompatorオブジェクトを構築することによって決定され、すなわち、伝達キューのオブジェクトはComparableインタフェースを実現しなければならない)であり、PriorityBlockingQueueを実現する際に、内部制御スレッド同期のロックは公平ロックを採用し、彼も無境界のキューである.例:Taskクラス、Comparableメソッドを実装し、comparareToメソッドを書き換える
public class Task implements Comparable<Task>{
private int id ;
private String name;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public int compareTo(Task task) {
return this.id > task.id ? 1 : (this.id < task.id ? -1 : 0);
}
public String toString(){
return this.id + "," + this.name;
}
}
UsePriorityBlockingQueueクラステストが整列キューであるかどうか
public class UsePriorityBlockingQueue {
public static void main(String[] args) throws Exception{
PriorityBlockingQueue q = new PriorityBlockingQueue();
Task t1 = new Task();
t1.setId(3);
t1.setName("id 3");
Task t2 = new Task();
t2.setId(4);
t2.setName("id 4");
Task t3 = new Task();
t3.setId(1);
t3.setName("id 1");
//return this.id > task.id ? 1 : 0;
q.add(t1); //3
q.add(t2); //4
q.add(t3); //1
// 1 3 4
System.out.println(" :" + q);
System.out.println(q.take().getId());
System.out.println(" :" + q);
System.out.println(q.take().getId());
System.out.println(q.take().getId());
}
}
しゅつりょく
:[1,id 1, 4,id 4, 3,id 3]
1
:[3,id 3, 4,id 4]
3
4
説明、このキューはtakeがない場合、まだソートされていません.take()の場合、ソート、比較の方法を利用します.
DelayQueue:
遅延時間を持つQueueは、指定された遅延時間になった場合にのみキューから取得できます.DelayQueueの要素はDelayedインタフェースを実装する必要があります.DelayQueueはサイズ制限のないキューで、キャッシュタイムアウトのデータを1か所、タスクタイムアウト処理、空き接続のクローズなど、適用シーンが多いです.クラシックなネットカフェのオンラインケース:WangminはDelayedインタフェースを実現しました
public class Wangmin implements Delayed {
private String name;
//
private String id;
//
private long endTime;
//
private TimeUnit timeUnit = TimeUnit.SECONDS;
public Wangmin(String name,String id,long endTime){
this.name=name;
this.id=id;
this.endTime = endTime;
}
public String getName(){
return this.name;
}
public String getId(){
return this.id;
}
/**
*
*/
@Override
public long getDelay(TimeUnit unit) {
//return unit.convert(endTime, TimeUnit.MILLISECONDS) - unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
return endTime - System.currentTimeMillis();
}
/**
*
*/
@Override
public int compareTo(Delayed delayed) {
Wangmin w = (Wangmin)delayed;
return this.getDelay(this.timeUnit) - w.getDelay(this.timeUnit) > 0 ? 1:0;
}
WangBaクラス
public class WangBa implements Runnable {
private DelayQueue queue = new DelayQueue();
public boolean yinye =true;
public void shangji(String name,String id,int money){
Wangmin man = new Wangmin(name, id, 1000 * money + System.currentTimeMillis());
System.out.println(" "+man.getName()+" "+man.getId()+" "+money+" , ...");
this.queue.add(man);
}
public void xiaji(Wangmin man){
System.out.println(" "+man.getName()+" "+man.getId()+" ...");
}
@Override
public void run() {
while(yinye){
try {
Wangmin man = queue.take();
xiaji(man);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String args[]){
try{
System.out.println(" ");
WangBa siyu = new WangBa();
Thread shangwang = new Thread(siyu);
shangwang.start();
siyu.shangji(" ", "123", 1);
siyu.shangji(" ", "234", 10);
siyu.shangji(" ", "345", 5);
}
catch(Exception e){
e.printStackTrace();
}
}
}
しゅつりょく
123 1 , ...
234 10 , ...
345 5 , ...
123 ...
345 ...
234 ...
SynchronousQueue:
バッファリングされていないキューで、生産者が生成したデータは直接消費者に取得され、消費される個人は仮想キューと理解され、このキューは要素を保存せず、生産と消費は互いに投げ合うだけだ.
final SynchronousQueue q = new SynchronousQueue();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(q.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t1.start();
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
q.add("asdasd");
}
});
t2.start();
--------- -------
asdasd
-本文は先人の経験を総括して、とても疲れていることを総括して、特にインターネットのアーキテクチャの白鶴翔先生に感謝します