Java同時プログラミング実戦-読書ノート
スレッドセキュリティの保証方法
たとえば、サーブレットのインプリメンテーションクラスの各カスタムメソッドは非共有状態です.しかし、クラスがクラス変数を導入すると、実装クラスは共有状態になります.(サーブレットのインスタンスは1つしかありません.サーブレットインスタンスに対応するサービスメソッドを実行するスレッドが要求されます)
check then act
問題はcheckが完了した後,actの前に観察結果が無効(改ざんされた)で予期せぬ異常を引き起こしたことである.たとえば、古典的な不活性初期化(単一モード)のエラーの書き方:
if(instance == null) {
instance = new Object
}
return instance
共有状態の複合操作
例えばi++は、この変数がスレッドセキュリティであっても、複合操作はスレッドセキュリティの問題をもたらすことに注意する.例えばvectorのput-if-absent
synchronized
Javaに内蔵された反発ロック.なお、そのメカニズムは
per-thread
であり、per-invocation
ではない.これは、内蔵ロックが再入可能であることを意味する.クライアントロック
古典的なput-if-absent問題、例えば私たちはVectorを使用して、その内部実装は一連の方法を
synchronized
でロックすると言っていますが、もしあなたが次の論理を実現したいならば:ある要素が存在しないならば、増加して、あなたはこのように実現する可能性があります:if(!vector.contains('X'))
vector.add('X')
これは、上述した複合操作であり、同期の問題があり、複合操作全体にロック、すなわちクライアントロックを追加する必要がある.
synchronized(vector) {
if(!vector.contains('X'))
vector.add('X')
}
私たちの例えば
ConcurrentHashMap
原生は同様の方法を提供しています.古典的な例
次に、キャッシュ実装の古典的な例を示します.インタフェースを定義します.
public interface Computable{
V compute(A arg);
}
インタフェースのすべての実装の
compute
メソッドに多くの時間がかかると仮定し、結果をキャッシュする必要があり、キャッシュクラスを設計する必要がある.次のようになります.public class CacheResult2implements Computable{
private Mapcache = new HashMap<>();
private Computablecomputable;
public CacheResult2(Computablecomputable) {
this.computable = computable;
}
@Override
public V compute(A arg) {
V result = cache.get(arg);
if (result == null) {
result = cache.put(arg, computable.compute(arg));
}
return result;
}
}
HashMap
キャッシュ結果を使用して、同時性の問題が明らかになりました.次に、この方法にsynchronized
のキーワードを付けると、3つのスレッド、t 1,t 2,t 3,t 1がargを1の値として計算したい場合、t 2がargを2の値として計算したい場合、t 3がt 1と同様にargが1の値として計算される場合、極端な場合はt 1,t 2,t 3がそれぞれロックされ、t 3にかかる時間がキャッシュされない時間が長くなる可能性がある.考えを変える.スレッドセキュリティの集合クラスを使用すると、ある計算に時間がかかる場合、複数のスレッドが同時にこの計算(同じargであることに注意)を行う可能性が高いという問題が同様に発生し、効率は同様に低い.最終的な解決策はFutureを使用することであり、あるスレッドがargの結果が計算中であることを発見した場合、彼は計算を行わない.もう一つの注意点はmapのputIfAbsent
法を用いて原子性を確保してください.そうしないと、同じように複数回計算する問題が発生する可能性がありますが、この可能性は相対的に小さいです.コードを入力:public class CacheResultimplements Computable{
private Map> cache = new ConcurrentHashMap<>();
private Computablecomputable;
public CacheResult(Computablecomputable) {
this.computable = computable;
}
@Override
public V compute(A arg) {
while (true) {
Future future = cache.get(arg);
if (future == null) {
Callable callable = () -> computable.compute(arg);
FutureTask futureTask = new FutureTask<>(callable);
future = cache.putIfAbsent(arg, futureTask); //げんしせい
if(future==null){// を1つのスレッドしか できないことを する
future = futureTask;
futureTask.run();
}
}
try {
return future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
cache.remove(arg);
}
}
}
}
Interrupt
このアクションは、スレッドを中断するのではなく、スレッドを中断状態に設定します.停止するかどうかは、対応するスレッドによって決定されます.コードが
InterruptedException
を投げ出す必要がある方法を呼び出すと、通常は2つの方法があります.上層部に渡されます.またはinterruptを呼び出して割り込み状態を復元します.(最初のcatchが例外を処理したくない場合、後のcatchに処理する場合は、InterruptedExceptionをキャプチャした場合、ステータスクリアを中断する)を再intereupt現在のスレッドが必要です.try {
} catch(InterruptedException e) {
Thread.currentThread.interrupt();//
}
スレッドを呼び出すInterruptメソッドは、コンテキストによって異なる効果があります.JDKコメントを参照してください.
If this thread is blocked in an invocation of the {@link
* Object#wait() wait()}, {@link Object#wait(long) wait(long)}, or {@link
* Object#wait(long, int) wait(long, int)} methods of the {@link Object}
* class, or of the {@link #join()}, {@link #join(long)}, {@link
* #join(long, int)}, {@link #sleep(long)}, or {@link #sleep(long, int)},
* methods of this class, then its interrupt status will be cleared and it
* will receive an {@link InterruptedException}.
*
*
If this thread is blocked in an I/O operation upon an {@link
* java.nio.channels.InterruptibleChannel InterruptibleChannel}
* then the channel will be closed, the thread's interrupt
* status will be set, and the thread will receive a {@link
* java.nio.channels.ClosedByInterruptException}.
*
*
If this thread is blocked in a {@link java.nio.channels.Selector}
* then the thread's interrupt status will be set and it will return
* immediately from the selection operation, possibly with a non-zero
* value, just as if the selector's {@link
* java.nio.channels.Selector#wakeup wakeup} method were invoked.
*
*
If none of the previous conditions hold then this thread's interrupt
* status will be set.
catchブロックでは、最後のケースに対応します.
If none of the previous conditions hold then this thread's interrupt status will be set.
現在のスレッドは、割り込み状態を再開します.
ロック
CountDownLatch:
public long worksCost(List works) throws InterruptedException {
CountDownLatch start = new CountDownLatch(1);
CountDownLatch end = new CountDownLatch(works.size());
for (Runnable work : works) {
new Thread(() -> {
try {
start.await();
try {
work.run();
} finally {
end.countDown();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
long times = System.currentTimeMillis();
start.countDown();
end.await();
return System.currentTimeMillis() - times;
}
FutureTask:その
get
メソッドを呼び出す場合、2つの例外を処理する必要があります:InterruptedException
:現在getスレッドが中断されるのを待つ.ExecutionException
:実行プロセスに異常が発生しました.異常はこのクラスにカプセル化されて投げ出される.次の例では、一般的な使い方を示します.public class DataLoader {
FutureTask futureTask = new FutureTask(() -> {
return null;//sth need time to load from file or db
});
Thread thread = new Thread(futureTask);
public DataLoader() {
thread.start();
}
public Object load() throws InterruptedException {
try {
return futureTask.get();
} catch (ExecutionException e) {
if (e.getCause() instanceof SomeKnownException) {
throw (SomeKnownException)e.getCause();
} else {
throw SomeRuntimeException;
}
}
}
}
ExecutorCompletionService
ExecutorCompletionService
はCompletionService
を実現し、計算のハンドルのセットとして単純に理解できる:Executor
&BlockingQueue
の結合体.タスクが完了したかどうかを手動でポーリングするのではなく、ブロックキューを直接借りることを回避します.その内部原理は、FutureTask
のdone
メソッドを書き換え、対応するtaskが終了した後にtaskをキューに入れることです.図のようにソースコード:private class QueueingFuture extends FutureTask {
QueueingFuture(RunnableFuture task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future task;
}
Executor#invokeAll
この効果を望む場合は、タスクのセットを実行し、結果セット(Future)を一度に取得し、まだ完了していないタスクcancelに時間を設定することもできます.ではinvokeAllを使えば簡単にできます.その実装は、
AbstractExcutorService
の抽象クラスにある.public List> invokeAll(Collection extends Callable> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList> futures = new ArrayList>(tasks.size());
boolean done = false;
try {
for (Callable t : tasks) {
RunnableFuture f = newTaskFor(t);
futures.add(f);
execute(f);
}
for (int i = 0, size = futures.size(); i < size; i++) {
Future f = futures.get(i);
if (!f.isDone()) {
try {
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
マルチスレッドのメリット&デメリット
メリット:
悪いところ