Java同時プログラミングの基本知識
同時プログラミングの原則
RunnableとThreadここではRunnableインタフェースを実現することとThreadクラスを継承することの違いを説明します.10枚の切符を売るタスクを例にとると、Threadクラスを継承すると、3つのスレッドを起動することは3つのウィンドウを開くことに相当し、各ウィンドウに10枚の切符を売るタスクがあり、それぞれ販売されています.Runnableインタフェースを実現すれば、3つのスレッドを起動してかなり3つのウィンドウを開いて切符を売って、この3つのウィンドウは全部で10枚の切符を売っています.
synchronizedキーワード
1.synchronizedオブジェクトロック
synchronized(this)メソッドとsynchronizedメソッドは、現在のオブジェクトをロックし、synchronized(obj)は臨界オブジェクトをロックします.synchronizedを使用する場合は、臨界オブジェクトをロックすることが望ましい.任意の複数のスレッドの任意の複数のユーザがアクセスする際に問題がないようにするには、現在のオブジェクトをロックする方法を考えてみましょう.現在のオブジェクトのレベルが重いので、一般的には使用しません.
次のSyncクラスの2つのメソッドtest_01とtest_02()ロックされているのはプログラムによって作成されたSyncオブジェクトで、細粒度制御推奨用test_02().
public synchronized void test_01() {
System.out.println(" ");
}
public void test_02() {
synchronized (this) {
System.out.println(" ");
}
}
次の方法はSyncオブジェクトのobjectオブジェクト(つまり臨界オブジェクト)をロックします.
public void test_03() {
synchronized (object) {
System.out.println(" ");
}
}
2.synchronized静的メソッドで現在のクラスをロックするには
静的同期メソッドは、Syncクラスのstatic test_のような現在のタイプのクラスオブジェクトをロックします.04()メソッドに同期ロックsynchronizedが付加すると、synchronizedはsyncである.class.
//次の2つの方法は静的同期方法です
public static synchronized void test_04() {
System.out.println(" Sync.class");
}
public static void test_05() {
synchronized (Sync.class) {
System.out.println(" Sync.class ");
}
}
3.synchronizedが静的および非静的に作用する方法の違い
synchronizedの役割は非静的方法であり、単一のオブジェクトをロックすることに相当し、異なるオブジェクト間に競争関係はない.静的メソッドとして作用する場合、ロックロードクラス、すなわちclassをロックします.この場合、すべてのオブジェクトが同じロックを競合することに相当します.
次の例では、スレッド1はi=5のときに異常を放出し、このときスレッド1ロックが解放され、スレッド2はメソッドの呼び出しを開始する.
public class Test {
static class Test02 implements Runnable {
private int i = 0;
@Override
public synchronized void run() {
while (true) {
System.out.println(Thread.currentThread().getName() + "_" + i++);
if (i == 5) { // i==5 ,
i = 1 / 0;
}
try {
TimeUnit.SECONDS.sleep(1);
}catch (InterruptedException ignored) { }
}
}
}
public static void main(String[] args) {
Test02 test02 = new Test02();
new Thread(test02, "LQ").start();
new Thread(test02, "WH").start();
}
}
public class Test {static Object object = new Object();void m() {System.out.println(Thread.currentThread().getName() + "start...");synchronized (object){while (true) {try {TimeUnit.SECONDS.sleep(1);} catch (Exception ignored) {}System.out.println(Thread.currentThread().getName() + "-"+ object.hashCode());}}}
static class Test01 implements Runnable {
@Override
public void run() {
new Test().m();
}
}
static class Test02 implements Runnable {
@Override
public void run() {
new Test().m();
}
}
public static void main(String[] args) {
Test01 test01 = new Test01();
Thread thread = new Thread(test01, "LQ");
thread.start();
try {
TimeUnit.SECONDS.sleep(3);
} catch (Exception ignored) {}
Test02 test02 = new Test02();
thread = new Thread(test02, "WH");
thread.start();
}
}WHスレッドに新しいObjectが作成され、WHは正常に動作します.
public class Test {
static Object object = new Object();
void m() {
System.out.println(Thread.currentThread().getName() + " start...");
synchronized (object) {
while (true) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception ignored){}
System.out.println(Thread.currentThread().getName() + "-" + object.hashCode());
}
}
}
static class Test01 implements Runnable {
@Override
public void run() {
new Test().m();
}
}
static class Test02 implements Runnable {
@Override
public void run() {
object = new Object();
new Test().m();
}
}
public static void main(String[] args) {
Test01 test01 = new Test01();
Thread thread = new Thread(test01, "LQ");
thread.start();
try {
TimeUnit.SECONDS.sleep(3);
} catch (Exception ignored) {}
Test02 test02 = new Test02();
thread = new Thread(test02, "WH");
thread.start();
}
}
上のコードでは、WHスレッドが起動すると1つだけ待機状態になります.objectはLQスレッドにロックされているためですが、WHスレッドで新たにnew Object()をnew Object()として付与すると、WHスレッドは正常に動作します.なぜなら、同期ロックはメモリ内のオブジェクトに対してロックされているので、LQロックは1回目のnewのオブジェクトで、WHロックは2回目のnewのオブジェクトで、下図の通りです.
定数:String a="aaa"とString b="aaa"は同じオブジェクトであるため、Aメソッドがaをロックし、Bメソッドがbをロックし、LQスレッド呼び出しAメソッドを起動し、WHスレッド呼び出しBメソッドを起動すると、LQスレッドが終了するまでWHスレッドが実行される.したがって、同期コードブロックを定義するときは、ロックのターゲットオブジェクトとして定数を使用しないでください.
volatileキーワードコンピュータにはCPU、メモリ、キャッシュがあり、CPUが実行されるとデフォルトでキャッシュ中のデータが見つかります.CPUが中断した場合、オペレーティングシステムによるCPUの管理特性により、キャッシュをクリアし、メモリのデータをキャッシュに読み直したり、キャッシュをクリアしない場合があり、キャッシュのデータを使用して後続の計算を行うことができます.CPUが中断しない場合、デフォルトのCPUはキャッシュデータのみを探します.volatileというキーワードはキャッシュデータの特性を変えるものではなく、メモリのデータ特性を直接変えるものであり、1つのオブジェクトにvolatileキーワード修飾を加えた場合、最下位OSオペレーティングシステムに通知し、計算を行うたびにメモリデータが変更されたかどうかをCPUに伝えることに相当する.これがメモリの可視性である.volatileキーワードは、メモリの可視性を保証するためです.
次のコードでデッドロックが発生します.
public class Volatile01 {
private static boolean b = true;
private void m() {
System.out.println("start...");
while (b) {}
System.out.println("end...");
}
static class Volatile_01 implements Runnable {
@Override
public void run() {
new Volatile01().m();
}
}
public static void main(String[] args) {
Volatile_01 = new Volatile_01();
new Thread(volatile_01).start();
try {
TimeUnit.SECONDS.sleep(1);
}catch (InterruptedException ignored) {}
b = false;
}
}
上記コードブロック中の共有変数bをvolatileで修飾すると(可視性が保証される)、ループから飛び出すことができる.
public class Volatile01 {
private static volatile boolean b = true;
private void m() {
System.out.println("start...");
while (b){}
System.out.println("end...");
}
static class Volatile_01 implements Runnable {
@Override
public void run(){
new Volatile01().m();
}
}
public static void main(String[] args) {
Volatile_01 = new Volatile_01();
new Thread(volatile_01).start();
try{
TimeUnit.SECONDS.sleep(1);
}catch (InterruptedException ignored){}
b = false;
}
}
join()メソッドは、joinを呼び出すスレッドの実行が完了するまで、複数のスレッドを結合し、スレッドをブロックします.
次のプログラムで印刷した結果は100000で、join()を使わないと印刷した結果は100000よりはるかに小さくなります.join()を使用すると、データの正確さを保証するために、スレッドのセットが実行されるのを待ってから後続の論理処理を行うことができます.
public class Test {
private static volatile int count = 0;
private void m() {
for (int i = 0; i < 10000; i++) {
count++;
}
}
static class Test02 implements Runnable {
@Override
public synchronized void run() {
new Test().m();
}
}
public static void main(String[] args) {
Test02 test02 = new Test02();
List threads = new ArrayList<>();
for (int i = 0; i < 10; i++) {
threads.add(new Thread(test02));
}
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(count);
}
}
上記のコードにはsynchronizedキーワードを用いて原子性を実現しているが、synchronizedを用いずにAtomicIntegerオブジェクトを用いてもよい.AtomicIntegerは原子的な操作オブジェクトであるため、コードは以下の通りである.
public class Test{
private static AtomicInteger count = new AtomicInteger();
private void m(){
for (int i = 0; i < 10000; i++){
count.incrementAndGet();
}
}
static class Test02 implements Runnable{
@Override
public void run(){
new Test().m();
}
}
public static void main(String[] args){
Test02 test02 = new Test02();
List threads = new ArrayList<>();
for (int i = 0; i < 10; i++){
threads.add(new Thread(test02));
}
for (Thread thread : threads){
thread.start();
try{
thread.join();
}catch (InterruptedException e){
e.printStackTrace();
}
}
System.out.println(count);
}
}
CountDownLatchオブジェクトCountDownLatchは1つのラッチに相当し、ラッチオブジェクトを作成するときにロックの個数を指定することができ、あるメソッドがラッチのawait()メソッドを呼び出すと、そのメソッドがawait()に実行されると、ラッチが解放されるのを待つのをブロックされ、ラッチにロックがなく、つまりラッチが開放されているときに実行を継続します.ラッチの施錠方法を減らすときcountDown()です.
以下の例では、m 1においてawait()が呼び出され、m 2においてcountDown()が呼び出されるので、m 2の論理によれば、m 2の実行が完了するとラッチ上のロック数は0となり、m 1メソッドは実行を継続することができる.
public class Test {
private CountDownLatch countDownLatch = new CountDownLatch(5);
private void m1() {
try {
countDownLatch.await(); //
} catch (Exception ignored) {
}
System.out.println("method m1.");
}
private void m2() {
while (countDownLatch.getCount() != 0) {
countDownLatch.countDown(); //
System.out.println("method m2");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ignored) {
}
}
}
public static void main(String[] args) {
Test count01 = new Test();
new Thread(count01::m2).start();
new Thread(count01::m1).start();
}
}
, , , , 。
wait()、notify() notifyAll()
wait(): wait(), , notify() notifyAll() 。
notify(): 。
notifyAll(): notifyAll() , waiting 。
( ) , 10, , 。
public class DeviceSingleton {
private DeviceSingleton() {
}
private final int max = 10;
private int count = 0;
private static final DeviceSingleton DEVICE_SINGLETON = new DeviceSingleton();
public static DeviceSingleton getInstance() {
return DEVICE_SINGLETON;
}
private final List devices = new ArrayList<>();
/**
*
*/
public synchronized void add(E data) {
//
while (devices.size() == max) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("add: " + data);
ThreadUtils.sleep(1000);
devices.add(data);
count++;
this.notify();
}
/**
*
*/
public synchronized E get() {
E data = null;
while (devices.size() == 0) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
ThreadUtils.sleep(1000);
data = devices.remove(0);
count--;
this.notifyAll();
return data;
}
/**
*
*/
public synchronized int size() {
return count;
}
@Data
static class Device {
private int id;
private String name;
public Device(int id, String name) {
this.id = id;
this.name = name;
}
}
static class ThreadUtils {
public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (Exception ignore) {}
}
}
}
public class Test {
public static void main(String[] args) throws InterruptedException {
DeviceSingleton deviceSingleton = DeviceSingleton.getInstance();
for (int i = 0; i < 10; i++) {
new Thread(() ->
{
for (int j = 0; j < 5; j++) {
System.out.println(deviceSingleton.get());
}
}, "consumer-" + i).start();
}
Thread.sleep(2000);
for (int i = 0; i < 2; i++) {
new Thread(() ->
{
for (int j = 0; j < 25; j++) {
deviceSingleton.add(new DeviceSingleton.Device(j, "device " + j));
}
}, "producer").start();
}
}
}
ReentrantLock
-
synchronized , , synchronized 。 , (lock.unlock())。 :
public class ReentrantLockTest {
private final Lock lock = new ReentrantLock();
private void m1() {
lock.lock(); //
for (int i = 0; i < 10; i++) {
System.out.println("method m1() " + i);
ThreadUtils.sleep(1000);
}
lock.unlock(); //
}
private void m2() {
lock.lock(); //
System.out.println("method m2()");
lock.unlock(); //
}
public static void main(String[] args) {
ReentrantLockTest reentrantLockTest = new ReentrantLockTest();
new Thread(reentrantLockTest::m1).start();
new Thread(reentrantLockTest::m2).start();
}
}
- lock.tryLock()
false, , , true, 。 :
public class ReentrantLockTest {
private Lock lock = new ReentrantLock();
private void m1() {
lock.lock(); //
for (int i = 0; i < 10; i++) {
ThreadUtils.sleep(1000);
System.out.println("method m1() " + i);
}
lock.unlock(); //
}
private void m2() {
boolean isLocked = false;
try {
/*
, , , false, true
, ,
,
,
, , , , 。 isLocked = lock.tryLock(5, TimeUnit.SECONDS); 5 (5 ), 。
*/
isLocked = lock.tryLock();
System.out.println(isLocked ? "m2() synchronized" : "m2() unsynchronized");
} catch (Exception e) {
e.printStackTrace();
} finally {
//
if (isLocked) {
lock.unlock();
}
}
}
public static void main(String[] args) {
ReentrantLockTest reentrantLockTest = new ReentrantLockTest();
new Thread(reentrantLockTest::m1).start();
new Thread(reentrantLockTest::m2).start();
}
}
- lock.lockInterruptibly()
interrupt , interrupted , , InterruptedException , 。 :
public class ReentrantLockTest {
private Lock lock = new ReentrantLock();
private void m1() {
lock.lock(); //
for (int i = 0; i < 5; i++) {
ThreadUtils.sleep(1000);
System.out.println("method m1() " + i);
}
lock.unlock(); //
}
private void m2() {
try {
/*
, ,
*/
lock.lockInterruptibly(); //
System.out.println("method m2()");
} catch (InterruptedException e) {
System.out.println(" ");
} finally {
try {
lock.unlock();
} catch (Exception ignored) {
}
}
}
public static void main(String[] args) {
ReentrantLockTest reentrantLockTest = new ReentrantLockTest();
Thread thread1 = new Thread(reentrantLockTest::m1);
thread1.start();
ThreadUtils.sleep(1000);
Thread thread2 = new Thread(reentrantLockTest::m2);
thread2.start();
ThreadUtils.sleep(1000);
thread2.interrupt(); //
}
}
: ReentrantLock , , ,notifyAll 。( thread.interruped(); object.notifyAll())。
?
Windows , , ( )。
-
。 , , , <=10, , , 。
public class ReentrantLockTest {
static class TestReentrantLock extends Thread {
// ReentrantLock true
private ReentrantLock lock = new ReentrantLock(true);
public void run() {
for (int i = 0; i < 5; i++) {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " get lock.");
ThreadUtils.sleep(1000);
} finally {
lock.unlock();
}
}
}
}
public static void main(String[] args) {
TestReentrantLock lock = new TestReentrantLock();
lock.start();
new Thread(lock).start();
new Thread(lock).start();
}
}
- Condition
Lock , , 、 。 Condition 。
public class DeviceContainer {
private DeviceContainer() {
}
private static final DeviceContainer DEVICE_CONTAINER = new DeviceContainer<>();
public static DeviceContainer getInstance() {
return DEVICE_CONTAINER;
}
private final List list = new LinkedList<>();
private final int max = 10;
private int count = 0;
private Lock lock = new ReentrantLock();
private Condition producer = lock.newCondition();
private Condition consumer = lock.newCondition();
public void add(T t) {
lock.lock();
try {
while (this.size() == max) {
System.out.println(Thread.currentThread().getName() + " ");
// max , ,
//
producer.await();
}
System.out.println(Thread.currentThread().getName() + " ");
list.add(t);
count++;
//
consumer.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public T get() {
T t = null;
lock.lock();
try {
while (this.size() == 0) {
System.out.println(Thread.currentThread().getName() + " ");
//
consumer.await();
}
System.out.println(Thread.currentThread().getName() + " ");
t = list.remove(0);
count--;
//
producer.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return t;
}
private int size() {
return count;
}
}
public class Test {
public static void main(String[] args) throws InterruptedException {
DeviceContainer deviceSingleton = DeviceContainer.getInstance();
for (int i = 0; i < 10; i++) {
new Thread(() ->
{
for (int j = 0; j < 5; j++) {
System.out.println(deviceSingleton.get());
}
}, "consumer-" + i).start();
}
ThreadUtils.sleep(1000);
for (int i = 0; i < 2; i++) {
new Thread(() ->
{
for (int j = 0; j < 25; j++) {
deviceSingleton.add(new Device(j, "device " + j));
}
}, "producer-" + i).start();
}
}
}
Java
- Map/Set
ConcurrentHashMap/ConcurrentHashSet: Map/Set, , , synchronized 。key value null( HashMap HashSet)
ConcurrentSkipListMap/ConcurrentSkipListSet: Map/Set, , , ConcurrentHashMap/ConcurrentHashSet 。
CopyOnWriteArraySet: , , , 。
- List
CopyOnWriteArrayList: , , , 。
- Queue
ConcurrentLinkedQueue/ ConcurrentLinkedDeue: , ,ConcurrentLinkedQueue ,ConcurrentLinkedDeue , ***。
ArrayBlockingQueue/LinkedBlockingQueue: , , 0 。ArrayBlockingQueue , ;LinkedBlockingQueue , ***。ArrayBlockingQueue API , 。 。add ;put ;offer , false, true; offer , , true, , false。LinkedBlockingQueue add ;offer false, true; offer , , true, , false。
PriorityQueue: , ,***。
PriorityBlockingQueue: , ,***。
LinkedTransferQueue: , transfer 。 add , 。transfer TransferQueue , (take() )。 , transfer 。 。
SynchronousQueue: , 。 0 , TransferQuque。 。add , , 。put , , put 。
DelayQueue: ,***。 , 。 : , 。
, JVM , , JVM , 。 shutdown , 。
Executor
。Executor execute, 。 Runnable , Runnable。
public class Executor01 {
public static void main(String[] args) {
new Executor_01().execute(() ->
System.out.println(Thread.currentThread().getName() + " test executor.")
);
}
static class Executor_01 implements Executor {br/>@Override
public void execute(@NotNull Runnable command) {
new Thread(command).start();
}
}
}
ExecutorService
Executor , Executor , Future submit。
Executors
Executor , , , , 。 :void execute(),Future submit(Callable),Future submit(Runnable),void shutdown,boolean isShutdown(),boolean isTerminated()。
public class Test {
public static void main(String[] args) throws InterruptedException {
// 5
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 6; i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + " executor.");
ThreadUtils.sleep(1000);
});
}
System.out.println(executorService);
//
executorService.shutdown();
// , , , , false
System.out.println(executorService.isTerminated());
// , shutdown
System.out.println(executorService.isShutdown());
System.out.println(executorService);
ThreadUtils.sleep(1000);
// 5 , , , true
System.out.println(executorService.isTerminated());
System.out.println(executorService.isShutdown());
System.out.println(executorService);
}
}
Future
, 。 get 。
:get()、get(long, TimeUnit) isDown()。
get(): ;
get(long, TimeUnit): , , , 。
isDown(): call , , isDown ExecutorService isShutdown ,isShutdown 。
public class ExecutorServiceTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
testExecutorService();
}
private static void testExecutorService() throws ExecutionException, InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(1);
Future future = service.submit(() -> {
ThreadUtils.sleep(1000);
return Thread.currentThread().getName() + " submit.";
});
// call ,
// , , ExecutorService isShutDowm , isShutdowm , shutdown
System.out.println(future.isDone());
// call
System.out.println(future.get()); // false
System.out.println(future.isDone());
System.out.println(future.get()); // true
//
service.shutdown();
}
}
Callable
。 Runnable , 。
:call(), Runnable run , call 。
Callable Runnable : , Callable, 。
ThreadPoolExecutor
new ThreadPoolExecutor , ThreadPoolExecutor :
:
corePoolSize
maximumPoolSize
keepAliveTime
unitTimeUnit
workQueueBlockingQueue
threadFactoryThreadFactory
handlerRejectedExecutionHandler
:
corePoolSize , corePoolSize , ; , workQueueBlockingQueue , , ; , , maximumPoolSize , , , 。 corePoolSize , keepAliveTime, , corePoolSize。
public class ExecutorThreadPoolTest {
public static void main(String[] args) {
testExecutorThreadPool();
}
private static void testExecutorThreadPool() {
// , 2, 4, 10
ThreadPoolExecutor executor = new ThreadPoolExecutor(2,
4,
10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
new MyTreadFactory(),
new MyIgnorePolicy());
// ,
executor.prestartAllCoreThreads();
//
for (int i = 1; i <= 10; i++) {
MyTask task = new MyTask(String.valueOf(i));
executor.execute(task);
}
}
static class MyTreadFactory implements ThreadFactory {
private final AtomicInteger mThreadNum = new AtomicInteger(1);
@Override
public Thread newThread(Runnable runnable) {
Thread t = new Thread(runnable, " 【" + mThreadNum.getAndIncrement() + "】");
System.out.println(t.getName() + " ");
return t;
}
}
public static class MyIgnorePolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
doLog(runnable, executor);
}
private void doLog(Runnable runnable, ThreadPoolExecutor executor) {
System.err.println(runnable.toString() + " ");
}
}
@Data
static class MyTask implements Runnable {
private String name;
public MyTask(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println(this.toString() + " ");
ThreadUtils.sleep(1000);
}
@Override
public String toString() {
return " 【" + name + "】";
}
}
}
FixedThreadPool
, Executors , , 。 :
, nThreads, 0, LinkedBlockingQueue, Integer.MAX_VALUE。
public class Test {
public static void main(String[] args) {
new Test().test();
}
public void test() {
// 10 FixedThreadPool
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
service.execute(()-> System.out.println(Thread.currentThread().getName()));
}
//
service.shutdown();
}
}
CachedThreadPool
, Executors , Integer.MAX_VALUE, , ( FixedThreadPool ,FixedThreadPool shutdown )。 :
, 0, Integer.MAX_VALUE, 60 , SynchronousQueue。
public class Test {
public static void main(String[] args) {
new Test().test();
}
public void test() {
//
ExecutorService service = Executors.newCachedThreadPool();
System.out.println(service);
for (int i = 0; i < 5; i++) {
service.execute(() -> {
ThreadUtils.sleep(1000);
System.out.println(Thread.currentThread().getName() + " executor.");
});
}
System.out.println(service);
ThreadUtils.sleep(65);
System.out.println(service);
}
}
ScheduledThreadPool
, , Executors , 。 , 、 。 :
, Integer.MAX_VALUE, 0, DelayedWorkQuquq。
:scheduledAtFixedRate、schedule、execute 。
public class Test {
public static void main(String[] args) {
new Test().test();
}
public void test() {
//
ScheduledExecutorService service = Executors.newScheduledThreadPool(3);
System.out.println(service);
// , 500 , 300
service.scheduleAtFixedRate(() -> {
ThreadUtils.sleep(1000);
System.out.println(Thread.currentThread().getName() + " executor.");
}, 500, 300, TimeUnit.MILLISECONDS);
System.out.println(service);
service.shutdown();
}
}
SingleThreadExecutor
。 。 。 , 。 :
1, LinkedBlockingQueue。
public class Test {
public static void main(String[] args) {
new Test().test();
}
public void test() {
//
ExecutorService service = Executors.newSingleThreadExecutor();
System.out.println(service);
for (int i = 0; i < 5; i++) {
service.execute(() -> {
System.out.println(Thread.currentThread().getName() + " executor.");
ThreadUtils.sleep(1000);
});
}
service.shutdown();
}
}
ForkJoinPool
, 。 CPU 。
ForkJoinPool , , ,, 。 , fork join ( )。
ForkJoinTask (RecursiveTask RecursiveAction, , ), 。
ForkJoinTask RecursiveTask RecursiveAction,RecursiveTask ,RecursiveAction ( Callable Runnable )。
ForkJoinTask compute , 。
、 。
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
long result = 0L;
for (int NUMBER : NUMBERS) {
result += NUMBER;
}
System.out.println(result);
ForkJoinPool pool = new ForkJoinPool();
//
AddTask task = new AddTask(0, NUMBERS.length);
//
Future future = pool.submit(task);
System.out.println(future.get());
}
private static final int[] NUMBERS = new int[1000000];
private static final int MAX_SIZE = 50000;
private static final Random RANDOM = new Random();
static {
for (int i = 0; i < NUMBERS.length; i++) {
NUMBERS[i] = RANDOM.nextInt(1000);
}
}
static class AddTask extends RecursiveTask {
int begin, end;
AddTask(int begin, int end) {
this.begin = begin;
this.end = end;
}
@Override
protected Long compute() {
if ((end - begin) < MAX_SIZE) {
long sum = 0L;
for (int i = begin; i < end; i++) {
sum += NUMBERS[i];
}
return sum;
} else {
//
int middle = begin + (end - begin) / 2;
AddTask task1 = new AddTask(begin, middle);
AddTask task2 = new AddTask(middle, end);
// ,
task1.fork();
task2.fork();
// join , , ,
return task1.join() + task2.join();
}
}
}
}
, , , 。 , , , 。
public class Test {
public static void main(String[] args) {
new Test().test();
}
public void test() {
ThreadGroup group = new ThreadGroup("LQ");
Thread thread = new Thread(group, () ->
System.out.println("group is " + Thread.currentThread().getThreadGroup().getName())
);
thread.start();
}
}
、 , , , 。