第三章-Executorsの最大化
もっと読む
Executorsのいくつかの高度な特性
タスクのキャンセル
タスクをexecutorに送信すると、このタスクの実行をキャンセルすることができます.submit()メソッドを使用してRunnableオブジェクトをexecutorに送信すると、submit()メソッドはFutureというインタフェースクラスを実装したオブジェクトを返します.このクラスのcancel()メソッドでタスクの実行をキャンセルできます.cancel()メソッドはboolean値をパラメータとして受信します.パラメータがtrueでexecutorがこのタスクを実行している場合、タスクを実行するスレッドは中断されます.
cancel()メソッドは、タスクがキャンセルされたかどうかを示すboolean値を返します.
タスクスケジューリング
ThreadPoolExecutorクラスは、インタフェースExecutorとExecutorServiceの基本的な実装クラスです.同時にJavaは、タスクのスケジューリングを実現するための拡張クラスを提供します.これにより、次のことができます.タスクの実行を遅延する は、一定の周波数でタスクを実行することと、一定の遅延時間でタスク を実行することとを含む周期的にタスクを実行する.
executorを再ロードする方法
既存のクラス(ThreadPoolExecutorまたはS h e d u l edThreadPoolExecutor)を継承することで、カスタムexecutorを実現できます.ThreadPoolExecutorクラスを継承している場合は、次の方法を再ロードできます. beforeExecute():このメソッドはexecuter内のパラレルタスクが実行される前に呼び出されます.このメソッドは、実行されるRunnableオブジェクトと、実行を担当するThreadオブジェクトの2つのパラメータを受信します.このメソッドで受信したRunnableオブジェクトは、実際にはクラスFutureTaskのインスタンスであり、submit()メソッドでexecutorのRunnableオブジェクトに送信されたわけではありません(このオブジェクトはFutureTaskにパッケージされています). afterExecute():このメソッドはexecuterのパラレルタスクが実行された後に呼び出されます.実行されたRunnableオブジェクトと、タスクから放出される可能性のある例外を保存する2つのパラメータを受信します.beforeExecute()メソッドに似ており、RunnableオブジェクトはFutureTaskクラスのインスタンスです. newTaskFor():このメソッドは、submit()メソッドで送信されたRunnableオブジェクトを実行するためのタスクを作成します.RunnableFutureインタフェースの実装を返さなければなりません.デフォルトでは、OpenJDK 8とOracle JDK 8はFutureTaskクラスのインスタンスを返しますが、これは将来のJDKバージョンで異なる場合があります.
ScheduledThreadPoolExecutorクラスを継承している場合は、decorateTask()メソッドを再ロードできます.この方法は上のnewTaskFor()のようなものであるが,スケジューリングタスクに対するものである.executorによって実行されたタスクを再ロードできます.
初期化パラメータを変更
いくつかのパラメータを変更してexecutorの動作を変更することもできます.次のような機能があります. BlockingQueue:各executorの内部では、実行されるタスクを保存するためにBlockingQueueが維持されます.インタフェースのいずれかの実装クラスに転送できます.たとえば、executorがタスクを実行するデフォルトの順序を変更できます. ThreadFactory:このカスタムスレッドファクトリを使用して、タスクを実行するためのスレッドを作成するThreadFactoryインタフェースを実装したカスタムクラスを入力できます.たとえば、カスタムThredFactoryクラスが返すカスタムスレッドは、各タスクの実行時間をログに保存できます. RejectedExecutionHandler:shutdown()またはshutdownNow()メソッドを呼び出すと、executorに送信される新しいタスクはすべて拒否されます.この場合、RejectedExecutionHandlerインタフェースを実装したカスタムクラスに転送して処理できます.
最初の例-サーバアプリケーション
第2章では,クライアント/サーバ側アプリケーションを実現した.この例では、そのアプリケーションを次のように拡張します.
は、サーバに送信された要求 をキャンセルするための新しい要求を導入する.各要求は、要求の優先度を表す新しいパラメータを渡すことを可能にする.要求を制御ための実行順序 サーバは、ユーザごとに実行する要求の合計数と、実行する総消費時間 とを計算することができる.
Executorの追加の方法
次のExecutorメソッドを再ロードすることもできます. shutdown():このメソッドを呼び出してexecutorを終了する必要があります.この方法を再ロードして、追加のリソースを解放することができます.この方法は、executorがキューに並んで実行されるすべてのタスクを処理するのを待つ である. shutdownNow():sutdown()メソッドとは異なり、executorがキュー中のタスク を処理するのを待たない submit()、invokeall()、invokeany():これらのメソッドを呼び出してexecutorに同時タスクを送信します.タスクをexecutorに追加するタスクキューの前または後に追加の操作が必要な場合は、それらを再ロードできます.タスクがキューに挿入される前または後に実行される追加の操作は、タスクの実行前または実行後とは異なり、タスクの実行前後に追加の操作を行う場合は、beforeExecute()メソッドとafterExecute()メソッドを再ロードする必要があります.
ScheduledThreadPoolExecutorクラスでは、タスクの実行を遅らせたり、周期的なタスクを遅らせたりする方法があります. schedule():この方法では、指定された遅延後にタスクを実行できます.タスクは一度だけ実行されます. scheduleAtFixedRate():この方法では、指定された遅延後にタスクを周期的に実行できます.これは、scheduleWithFixedDelay()とは異なり、scheduleWithFixedDelay()メソッドの場合、2回の実行間隔は、前回の実行が終了してから次の実行が開始されるまでの間隔です.scheduleAtFixedRate()の場合、2回の実行間隔は、前回の実行開始から次の実行開始までの間隔です.
Executorsのいくつかの高度な特性
タスクのキャンセル
タスクをexecutorに送信すると、このタスクの実行をキャンセルすることができます.submit()メソッドを使用してRunnableオブジェクトをexecutorに送信すると、submit()メソッドはFutureというインタフェースクラスを実装したオブジェクトを返します.このクラスのcancel()メソッドでタスクの実行をキャンセルできます.cancel()メソッドはboolean値をパラメータとして受信します.パラメータがtrueでexecutorがこのタスクを実行している場合、タスクを実行するスレッドは中断されます.
cancel()メソッドは、タスクがキャンセルされたかどうかを示すboolean値を返します.
タスクスケジューリング
ThreadPoolExecutorクラスは、インタフェースExecutorとExecutorServiceの基本的な実装クラスです.同時にJavaは、タスクのスケジューリングを実現するための拡張クラスを提供します.これにより、次のことができます.
executorを再ロードする方法
既存のクラス(ThreadPoolExecutorまたはS h e d u l edThreadPoolExecutor)を継承することで、カスタムexecutorを実現できます.ThreadPoolExecutorクラスを継承している場合は、次の方法を再ロードできます.
ScheduledThreadPoolExecutorクラスを継承している場合は、decorateTask()メソッドを再ロードできます.この方法は上のnewTaskFor()のようなものであるが,スケジューリングタスクに対するものである.executorによって実行されたタスクを再ロードできます.
初期化パラメータを変更
いくつかのパラメータを変更してexecutorの動作を変更することもできます.次のような機能があります.
最初の例-サーバアプリケーション
第2章では,クライアント/サーバ側アプリケーションを実現した.この例では、そのアプリケーションを次のように拡張します.
//
// ,
public class ExecutorStatistics {
private AtomicLong executionTime = new AtomicLong(0L);
private AtomicInteger numTasks = new AtomicInteger(0);
public void addExecutionTime(long time) {
executionTime.addAndGet(time);
}
public void addTask() {
numTasks.incrementAndGet();
}
@Override
public String toString() {
return "Executed Tasks: " + getNumTasks() +
". Execution Time: "+ getExecutionTime();
}
public AtomicLong getExecutionTime() {
return executionTime;
}
public AtomicInteger getNumTasks() {
return numTasks;
}
}
// executor shutdown() shutdownNow() ,executor
// ,
public class RejectedTaskController implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable task,
ThreadPoolExecutor executor) {
ConcurrentCommand command = (ConcurrentCommand) task;
Socket clientSocket = command.getSocket();
try {
PrintWriter out = new
PrintWriter(clientSocket.getOutputStream(), true);
String message = "The server is shutting down."
+ " Your request can not be served."
+ " Shutting Down: "
+ executor.isShutdown()
+ ". Terminated: "
+ executor.isTerminated()
+ ". Terminating: "
+ executor.isTerminating();
out.println(message);
out.close();
clientSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* Runnable executor ,executor Runnable
* FutureTask , executor
* FutureTask Comparable , executor
* ( )
*/
public class ServerTask extends FutureTask implements Comparable> {
private ConcurrentCommand command;
public ServerTask(ConcurrentCommand command) {
super(command, null);
this.command=command;
}
public ConcurrentCommand getCommand() {
return command;
}
public void setCommand(ConcurrentCommand command) {
this.command = command;
}
@Override
public int compareTo(ServerTask other) {
return command.compareTo(other.getCommand());
}
}
// executor,
public class ServerExecutor extends ThreadPoolExecutor {
// , ServerTask ( Runnable )
//
private ConcurrentHashMap startTimes;
// ,
// ExecutorStatistics
private ConcurrentHashMap
executionStatistics;
private static int CORE_POOL_SIZE =
Runtime.getRuntime().availableProcessors();
private static int MAXIMUM_POOL_SIZE =
Runtime.getRuntime().availableProcessors();
private static long KEEP_ALIVE_TIME = 10;
private static RejectedTaskController REJECTED_TASK_CONTROLLER
= new RejectedTaskController();
public ServerExecutor() {
super(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME,
TimeUnit.SECONDS, new PriorityBlockingQueue<>(),
REJECTED_TASK_CONTROLLER);
startTimes = new ConcurrentHashMap<>();
executionStatistics = new ConcurrentHashMap<>();
}
// ,
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
startTimes.put(r, new Date());
}
// , ,
//
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
ServerTask> task = (ServerTask>) r;
ConcurrentCommand command = task.getCommand();
if (t == null) {
if (!task.isCancelled()) {
// startTimes
Date startDate = startTimes.remove(r);
Date endDate = new Date();
long executionTime = endDate.getTime() - startDate.getTime();
ExecutorStatistics statistics =
executionStatistics.computeIfAbsent
(command.getUsername(), n -> new ExecutorStatistics());
statistics.addExecutionTime(executionTime);
statistics.addTask();
// ConcurrentServer ,
ConcurrentServer.finishTask(command.getUsername(), command);
} else {
String message = "The task"
+ command.hashCode() + "of user"
+ command.getUsername() + "has been cancelled. ";
System.out.println(message);
}
} else {
String message = "The exception "
+ t.getMessage()
+ " has been thrown.";
System.out.println(message);
}
}
// executor Runnable ServerTask ,
@Override
protected RunnableFuture newTaskFor(Runnable runnable,
T value) {
return new ServerTask(runnable);
}
public void writeStatistics() {
for (Map.Entry entry : executionStatistics.entrySet()) {
String user = entry.getKey();
ExecutorStatistics stats = entry.getValue();
System.out.println(user + ":" + stats);
}
}
}
//
public abstract class Command {
protected String[] command;
public Command (String [] command) {
this.command=command;
}
public abstract String execute ();
}
/**
* ,
* 1.
* 2.
* 3.
*/
public abstract class ConcurrentCommand extends Command implements Comparable, Runnable {
private String username;
private byte priority;
private Socket socket;
public ConcurrentCommand(Socket socket, String[] command) {
super(command);
username=command[1];
priority=Byte.parseByte(command[2]);
this.socket=socket;
}
@Override
public abstract String execute();
@Override
public void run() {
String ret = execute();
try {
PrintWriter out = new
PrintWriter(socket.getOutputStream(),true);
out.println(ret);
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public int compareTo(ConcurrentCommand o) {
return Byte.compare(o.getPriority(), this.getPriority());
}
public byte getPriority() {
return priority;
}
public Socket getSocket() {
return socket;
}
public String getUsername() {
return username;
}
}
// Query
public class ConcurrentQueryCommand extends ConcurrentCommand {
public ConcurrentQueryCommand(Socket socket, String [] command) {
super(socket, command);
}
public String execute() {
WDIDAO dao=WDIDAO.getDAO();
if (command.length==5) {
return dao.query(command[3], command[4]);
} else if (command.length==6) {
try {
return dao.query(command[3], command[4],
Short.parseShort(command[5]));
} catch (NumberFormatException e) {
return "ERROR;Bad Command";
}
} else {
return "ERROR;Bad Command";
}
}
}
// Report
public class ConcurrentReportCommand extends ConcurrentCommand {
public ConcurrentReportCommand(Socket socket, String [] command) {
super(socket, command);
}
public String execute() {
WDIDAO dao=WDIDAO.getDAO();
return dao.report(command[3]);
}
}
// Stop
public class ConcurrentStopCommand extends ConcurrentCommand {
public ConcurrentStopCommand(Socket socket, String [] command) {
super(socket, command);
}
public String execute() {
ConcurrentServer.shutdown();
return "Server stopped";
}
}
// Cancel
public class ConcurrentCancelCommand extends ConcurrentCommand {
public ConcurrentCancelCommand(Socket socket, String [] command) {
super(socket, command);
}
public String execute() {
ConcurrentServer.cancelTasks(getUsername());
return message;
}
}
//
public class ConcurrentErrorCommand extends ConcurrentCommand {
public ConcurrentErrorCommand(Socket socket, String [] command) {
super(socket, command);
}
public String execute() {
return "Unknown command: " + command[0];
}
}
// status
public class ConcurrentStatusCommand extends ConcurrentCommand {
public ConcurrentStatusCommand (Socket socket, String[] command) {
super(socket, command);
}
@Override
public String execute() {
StringBuilder sb=new StringBuilder();
ThreadPoolExecutor executor = ConcurrentServer.getExecutor();
sb.append("Server Status;");
sb.append("Actived Threads: ");
sb.append(executor.getActiveCount());
sb.append(";");
sb.append("Maximum Pool Size: ");
sb.append(executor.getMaximumPoolSize());
sb.append(";");
sb.append("Core Pool Size: ");
sb.append(executor.getCorePoolSize());
sb.append(";");
sb.append("Pool Size: ");
sb.append(executor.getPoolSize());
sb.append(";");
sb.append("Largest Pool Size: ");
sb.append(executor.getLargestPoolSize());
sb.append(";");
sb.append("Completed Task Count: ");
sb.append(executor.getCompletedTaskCount());
sb.append(";");
sb.append("Task Count: ");
sb.append(executor.getTaskCount());
sb.append(";");
sb.append("Queue Size: ");
sb.append(executor.getQueue().size());
sb.append(";");
return sb.toString();
}
}
/**
* , RequestTask , ConcurrentServer socket,
* executor 。
* , executor
*/
public class ConcurrentServer {
private static volatile boolean stopped = false;
// sockets
private static LinkedBlockingQueue pendingConnections;
// executor Future , ,
// ConcurrentMap ( ConcurrentCommand, Future )
private static ConcurrentMap>> taskController;
// RequestTask Thread
private static Thread requestThread;
// executor
private static RequestTask task;
private static ServerSocket serverSocket;
public static void main(String[] args) {
pendingConnections = new LinkedBlockingQueue<>();
taskController = new ConcurrentHashMap>>();
// RequestTask
task = new RequestTask(pendingConnections, taskController);
requestThread = new Thread(task);
requestThread.start();
System.out.println("Initialization completed.");
serverSocket = new ServerSocket(Constants.CONCURRENT_PORT);
do {
try {
Socket clientSocket = serverSocket.accept();
pendingConnections.put(clientSocket);
} catch (Exception e) {
e.printStackTrace();
}
} while (!stopped);
finishServer();
}
// stopped true serverSocket
public static void shutdown() {
stopped = true;
try {
serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
// executor RequestTask
private static void finishServer() {
System.out.println("Shutting down the server...");
task.shutdown();
System.out.println("Shutting down Request task");
requestThread.interrupt();
System.out.println("Request task ok");
System.out.println("Closing socket");
System.out.println("Shutting down logger");
System.out.println("Logger ok");
System.out.println("Main server thread ended");
}
//
public static void cancelTasks(String username) {
ConcurrentMap> userTasks = taskController.get(username);
if (userTasks == null) {
return;
}
int taskNumber = 0;
Iterator> it = userTasks.values().iterator();
while(it.hasNext()) {
ServerTask> task = it.next();
ConcurrentCommand command = task.getCommand();
if(!(command instanceof ConcurrentCancelCommand) &&
task.cancel(true)) {
taskNumber++;
it.remove();
}
}
}
// , Future ConcurrentMap
public static void finishTask(String username, ConcurrentCommand command) {
ConcurrentMap> userTasks
= taskController.get(username);
userTasks.remove(command);
}
}
public class RequestTask implements Runnable {
// sockets
private LinkedBlockingQueue pendingConnections;
//
private ServerExecutor executor = new ServerExecutor();
// Future
private ConcurrentMap>> taskController;
public RequestTask(LinkedBlockingQueue
pendingConnections, ConcurrentHashMap>> taskController) {
this.pendingConnections = pendingConnections;
this.taskController = taskController;
}
public void run() {
try {
while (!Thread.currentThread().interrupted()) {
try {
Socket clientSocket = pendingConnections.take();
BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
String line = in.readLine();
ConcurrentCommand command;
String[] commandData = line.split(";");
System.out.println("Command: " + commandData[0]);
switch (commandData[0]) {
case "q":
System.out.println("Query");
command = new ConcurrentQueryCommand(clientSocket, commandData);
break;
case "r":
System.out.println("Report");
command = new ConcurrentReportCommand(clientSocket, commandData);
break;
case "s":
System.out.println("Status");
command = new ConcurrentStatusCommand(executor, clientSocket, commandData);
break;
case "z":
System.out.println("Stop");
command = new ConcurrentStopCommand(clientSocket, commandData);
break;
case "c":
System.out.println("Cancel");
command = new ConcurrentCancelCommand(clientSocket, commandData);
break;
default:
System.out.println("Error");
command = new ConcurrentErrorCommand(clientSocket, commandData);
break;
}
ServerTask> controller = (ServerTask>) executor.submit(command);
storeContoller(command.getUsername(), controller, command);
} catch (IOException e) {
e.printStackTrace();
}
}
} catch (InterruptedException e) {
// No Action Required
}
}
// Future
private void storeContoller(String userName, ServerTask>controller, ConcurrentCommand command) {
taskController.computeIfAbsent(userName, k -> new ConcurrentHashMap>()).put(command, controller);
}
// executor
public void shutdown() {
String message = "Request Task: "
+ pendingConnections.size()
+ " pending connections.";
System.out.println(message);
executor.shutdown();
}
// executor
public void terminate() {
try {
executor.awaitTermination(1, TimeUnit.DAYS);
executor.writeStatistics();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Executorの追加の方法
次のExecutorメソッドを再ロードすることもできます.
ScheduledThreadPoolExecutorクラスでは、タスクの実行を遅らせたり、周期的なタスクを遅らせたりする方法があります.