第三章-Executorsの最大化


もっと読む
Executorsのいくつかの高度な特性
タスクのキャンセル
タスクをexecutorに送信すると、このタスクの実行をキャンセルすることができます.submit()メソッドを使用してRunnableオブジェクトをexecutorに送信すると、submit()メソッドはFutureというインタフェースクラスを実装したオブジェクトを返します.このクラスのcancel()メソッドでタスクの実行をキャンセルできます.cancel()メソッドはboolean値をパラメータとして受信します.パラメータがtrueでexecutorがこのタスクを実行している場合、タスクを実行するスレッドは中断されます.
cancel()メソッドは、タスクがキャンセルされたかどうかを示すboolean値を返します.
 
タスクスケジューリング
ThreadPoolExecutorクラスは、インタフェースExecutorとExecutorServiceの基本的な実装クラスです.同時にJavaは、タスクのスケジューリングを実現するための拡張クラスを提供します.これにより、次のことができます.
  • タスクの実行を遅延する
  • は、一定の周波数でタスクを実行することと、一定の遅延時間でタスク
  • を実行することとを含む周期的にタスクを実行する.
    executorを再ロードする方法
    既存のクラス(ThreadPoolExecutorまたはS h e d u l edThreadPoolExecutor)を継承することで、カスタムexecutorを実現できます.ThreadPoolExecutorクラスを継承している場合は、次の方法を再ロードできます.
  • beforeExecute():このメソッドはexecuter内のパラレルタスクが実行される前に呼び出されます.このメソッドは、実行されるRunnableオブジェクトと、実行を担当するThreadオブジェクトの2つのパラメータを受信します.このメソッドで受信したRunnableオブジェクトは、実際にはクラスFutureTaskのインスタンスであり、submit()メソッドでexecutorのRunnableオブジェクトに送信されたわけではありません(このオブジェクトはFutureTaskにパッケージされています).
  • afterExecute():このメソッドはexecuterのパラレルタスクが実行された後に呼び出されます.実行されたRunnableオブジェクトと、タスクから放出される可能性のある例外を保存する2つのパラメータを受信します.beforeExecute()メソッドに似ており、RunnableオブジェクトはFutureTaskクラスのインスタンスです.
  • newTaskFor():このメソッドは、submit()メソッドで送信されたRunnableオブジェクトを実行するためのタスクを作成します.RunnableFutureインタフェースの実装を返さなければなりません.デフォルトでは、OpenJDK 8とOracle JDK 8はFutureTaskクラスのインスタンスを返しますが、これは将来のJDKバージョンで異なる場合があります.

  • ScheduledThreadPoolExecutorクラスを継承している場合は、decorateTask()メソッドを再ロードできます.この方法は上のnewTaskFor()のようなものであるが,スケジューリングタスクに対するものである.executorによって実行されたタスクを再ロードできます.
     
    初期化パラメータを変更
    いくつかのパラメータを変更してexecutorの動作を変更することもできます.次のような機能があります.
  • BlockingQueue:各executorの内部では、実行されるタスクを保存するためにBlockingQueueが維持されます.インタフェースのいずれかの実装クラスに転送できます.たとえば、executorがタスクを実行するデフォルトの順序を変更できます.
  • ThreadFactory:このカスタムスレッドファクトリを使用して、タスクを実行するためのスレッドを作成するThreadFactoryインタフェースを実装したカスタムクラスを入力できます.たとえば、カスタムThredFactoryクラスが返すカスタムスレッドは、各タスクの実行時間をログに保存できます.
  • RejectedExecutionHandler:shutdown()またはshutdownNow()メソッドを呼び出すと、executorに送信される新しいタスクはすべて拒否されます.この場合、RejectedExecutionHandlerインタフェースを実装したカスタムクラスに転送して処理できます.

  • 最初の例-サーバアプリケーション
    第2章では,クライアント/サーバ側アプリケーションを実現した.この例では、そのアプリケーションを次のように拡張します.
     
  • は、サーバに送信された要求
  • をキャンセルするための新しい要求を導入する.
  • 各要求は、要求の優先度を表す新しいパラメータを渡すことを可能にする.要求を制御ための実行順序
  • サーバは、ユーザごとに実行する要求の合計数と、実行する総消費時間
  • とを計算することができる.
     
    //                           
    //           ,                 
    public class ExecutorStatistics {
        private AtomicLong executionTime = new AtomicLong(0L);
        private AtomicInteger numTasks = new AtomicInteger(0);
    
        public void addExecutionTime(long time) {
            executionTime.addAndGet(time);
        }
    
        public void addTask() {
            numTasks.incrementAndGet();
        }
    
        @Override
        public String toString() {
            return "Executed Tasks: " + getNumTasks() + 
                    ". Execution Time: "+ getExecutionTime();
        }
    
        public AtomicLong getExecutionTime() {
            return executionTime;
        }
    
        public AtomicInteger getNumTasks() {
            return numTasks;
        }
    }
     
     
     
    //  executor   shutdown() shutdownNow() ,executor         
    //            ,                  
    public class RejectedTaskController implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable task,
                                      ThreadPoolExecutor executor) {
            ConcurrentCommand command = (ConcurrentCommand) task;
            Socket clientSocket = command.getSocket();
            try {
                PrintWriter out = new
                        PrintWriter(clientSocket.getOutputStream(), true);
                String message = "The server is shutting down."
                        + " Your request can not be served."
                        + " Shutting Down: "
                        + executor.isShutdown()
                        + ". Terminated: "
                        + executor.isTerminated()
                        + ". Terminating: "
                        + executor.isTerminating();
                out.println(message);
                out.close();
                clientSocket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
     
     
     
    /**
     *    Runnable     executor ,executor        Runnable  
     *       FutureTask      ,    executor       
     *        FutureTask     Comparable  ,       executor
     *              (    )
     */
    public class ServerTask extends FutureTask implements Comparable> {
        private ConcurrentCommand command;
        public ServerTask(ConcurrentCommand command) {
            super(command, null);
            this.command=command;
        }
    
        public ConcurrentCommand getCommand() {
            return command;
        }
        public void setCommand(ConcurrentCommand command) {
            this.command = command;
        }
    
        @Override
        public int compareTo(ServerTask other) {
            return command.compareTo(other.getCommand());
        }
    }
     
     
    //   executor,              
    public class ServerExecutor extends ThreadPoolExecutor {
        //            ,   ServerTask  ( Runnable  )
        //         
        private ConcurrentHashMap startTimes;
    
        //                ,      
        //    ExecutorStatistics  
        private ConcurrentHashMap
                executionStatistics;
    
        private static int CORE_POOL_SIZE =
                Runtime.getRuntime().availableProcessors();
        private static int MAXIMUM_POOL_SIZE =
                Runtime.getRuntime().availableProcessors();
        private static long KEEP_ALIVE_TIME = 10;
        private static RejectedTaskController REJECTED_TASK_CONTROLLER
                = new RejectedTaskController();
    
        public ServerExecutor() {
            super(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME,
                    TimeUnit.SECONDS, new PriorityBlockingQueue<>(),
                    REJECTED_TASK_CONTROLLER);
            startTimes = new ConcurrentHashMap<>();
            executionStatistics = new ConcurrentHashMap<>();
        }
    
        //           ,             
        protected void beforeExecute(Thread t, Runnable r) {
            super.beforeExecute(t, r);
            startTimes.put(r, new Date());
        }
    
        //           ,               ,
        //                   
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            super.afterExecute(r, t);
            ServerTask> task = (ServerTask>) r;
            ConcurrentCommand command = task.getCommand();
            if (t == null) {
                if (!task.isCancelled()) {
                    //    startTimes             
                    Date startDate = startTimes.remove(r);
                    Date endDate = new Date();
                    long executionTime = endDate.getTime() - startDate.getTime();
                    ExecutorStatistics statistics =
                            executionStatistics.computeIfAbsent
                                    (command.getUsername(), n -> new ExecutorStatistics());
                    statistics.addExecutionTime(executionTime);
                    statistics.addTask();
    
                    //  ConcurrentServer              ,       
                    ConcurrentServer.finishTask(command.getUsername(), command);
                } else {
                    String message = "The task"
                            + command.hashCode() + "of user"
                            + command.getUsername() + "has been cancelled. ";
                    System.out.println(message);
                }
            } else {
                String message = "The exception "
                        + t.getMessage()
                        + " has been thrown.";
                System.out.println(message);
            }
        }
    
        //     executor Runnable     ServerTask  ,             
        @Override
        protected  RunnableFuture newTaskFor(Runnable runnable,
                                                   T value) {
            return new ServerTask(runnable);
        }
    
        public void writeStatistics() {
            for (Map.Entry entry : executionStatistics.entrySet()) {
                String user = entry.getKey();
                ExecutorStatistics stats = entry.getValue();
                System.out.println(user + ":" + stats);
            }
        }
    }
     
     
     
    //       
    public abstract class Command {
        protected String[] command;
        public Command (String [] command) {
            this.command=command;
        }
        public abstract String execute ();
    }
    
    /**
     *            ,               
     * 1.               
     * 2.            
     * 3.             
     */
    public abstract class ConcurrentCommand extends Command implements Comparable, Runnable {
        private String username;
        private byte priority;
        private Socket socket;
        public ConcurrentCommand(Socket socket, String[] command) {
            super(command);
            username=command[1];
            priority=Byte.parseByte(command[2]);
            this.socket=socket;
        }
    
        @Override
        public abstract String execute();
    
        @Override
        public void run() {
            String ret = execute();
    
            try {
                PrintWriter out = new
                        PrintWriter(socket.getOutputStream(),true);
                out.println(ret);
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public int compareTo(ConcurrentCommand o) {
            return Byte.compare(o.getPriority(), this.getPriority());
        }
    
        public byte getPriority() {
            return priority;
        }
    
        public Socket getSocket() {
            return socket;
        }
    
        public String getUsername() {
            return username;
        }
    }
    
    //   Query      
    public class ConcurrentQueryCommand extends ConcurrentCommand {
        public ConcurrentQueryCommand(Socket socket, String [] command) {
            super(socket, command);
        }
    
        public String execute() {
            WDIDAO dao=WDIDAO.getDAO();
            if (command.length==5) {
                return dao.query(command[3], command[4]);
            } else if (command.length==6) {
                try {
                    return dao.query(command[3], command[4],
                            Short.parseShort(command[5]));
                } catch (NumberFormatException e) {
                    return "ERROR;Bad Command";
                }
            } else {
                return "ERROR;Bad Command";
            }
        }
    }
    
    //  Report      
    public class ConcurrentReportCommand extends ConcurrentCommand {
        public ConcurrentReportCommand(Socket socket, String [] command) {
            super(socket, command);
        }
    
        public String execute() {
            WDIDAO dao=WDIDAO.getDAO();
            return dao.report(command[3]);
        }
    }
    
    //  Stop      
    public class ConcurrentStopCommand extends ConcurrentCommand {
        public ConcurrentStopCommand(Socket socket, String [] command) {
            super(socket, command);
        }
    
        public String execute() {
            ConcurrentServer.shutdown();
            return "Server stopped";
        }
    }
    
    //  Cancel      
    public class ConcurrentCancelCommand extends ConcurrentCommand {
        public ConcurrentCancelCommand(Socket socket, String [] command) {
            super(socket, command);
        }
    
        public String execute() {
            ConcurrentServer.cancelTasks(getUsername());
            return message;
        }
    }
    
    //               
    public class ConcurrentErrorCommand extends ConcurrentCommand {
        public ConcurrentErrorCommand(Socket socket, String [] command) {
            super(socket, command);
        }
    
        public String execute() {
            return "Unknown command: " + command[0];
        }
    }
    
    //  status     
    public class ConcurrentStatusCommand extends ConcurrentCommand {
        public ConcurrentStatusCommand (Socket socket, String[] command) {
            super(socket, command);
        }
    
        @Override
        public String execute() {
            StringBuilder sb=new StringBuilder();
            ThreadPoolExecutor executor = ConcurrentServer.getExecutor();
            sb.append("Server Status;");
            sb.append("Actived Threads: ");
            sb.append(executor.getActiveCount());
            sb.append(";");
            sb.append("Maximum Pool Size: ");
            sb.append(executor.getMaximumPoolSize());
            sb.append(";");
            sb.append("Core Pool Size: ");
            sb.append(executor.getCorePoolSize());
            sb.append(";");
            sb.append("Pool Size: ");
            sb.append(executor.getPoolSize());
            sb.append(";");
            sb.append("Largest Pool Size: ");
            sb.append(executor.getLargestPoolSize());
            sb.append(";");
            sb.append("Completed Task Count: ");
            sb.append(executor.getCompletedTaskCount());
            sb.append(";");
            sb.append("Task Count: ");
            sb.append(executor.getTaskCount());
            sb.append(";");
            sb.append("Queue Size: ");
            sb.append(executor.getQueue().size());
            sb.append(";");
            return sb.toString();
        }
    }
     
     
    /**
     *      ,     RequestTask    ,      ConcurrentServer      socket,
     *           executor  。
     *                              ,       executor   
     */
    public class ConcurrentServer {
        private static volatile boolean stopped = false;
    
        //                 sockets
        private static LinkedBlockingQueue pendingConnections;
    
        //       executor          Future  ,      ,
        //        ConcurrentMap (     ConcurrentCommand,         Future  )
        private static ConcurrentMap>> taskController;
    
        //   RequestTask   Thread
        private static Thread requestThread;
    
        //           executor
        private static RequestTask task;
    
        private static ServerSocket serverSocket;
    
        public static void main(String[] args) {
            pendingConnections = new LinkedBlockingQueue<>();
            taskController = new ConcurrentHashMap>>();
    
            //   RequestTask  
            task = new RequestTask(pendingConnections, taskController);
            requestThread = new Thread(task);
            requestThread.start();
    
            System.out.println("Initialization completed.");
    
            serverSocket = new ServerSocket(Constants.CONCURRENT_PORT);
            do {
                try {
                    Socket clientSocket = serverSocket.accept();
                    pendingConnections.put(clientSocket);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            } while (!stopped);
            finishServer();
        }
    
        //      stopped   true   serverSocket
        public static void shutdown() {
            stopped = true;
            try {
                serverSocket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        //      executor   RequestTask  
        private static void finishServer() {
            System.out.println("Shutting down the server...");
            task.shutdown();
            System.out.println("Shutting down Request task");
            requestThread.interrupt();
            System.out.println("Request task ok");
            System.out.println("Closing socket");
            System.out.println("Shutting down logger");
            System.out.println("Logger ok");
            System.out.println("Main server thread ended");
        }
    
        //          
        public static void cancelTasks(String username) {
            ConcurrentMap> userTasks = taskController.get(username);
            if (userTasks == null) {
                return;
            }
            int taskNumber = 0;
            Iterator> it = userTasks.values().iterator();
            while(it.hasNext()) {
                ServerTask> task = it.next();
                ConcurrentCommand command = task.getCommand();
                if(!(command instanceof ConcurrentCancelCommand) &&
                        task.cancel(true)) {
                    taskNumber++;
                    it.remove();
                }
            }
        }
    
        //           ,             Future      ConcurrentMap   
        public static void finishTask(String username, ConcurrentCommand command) {
            ConcurrentMap> userTasks
                    = taskController.get(username);
            userTasks.remove(command);
        }
    }
    
    
    public class RequestTask implements Runnable {
        //       sockets
        private LinkedBlockingQueue pendingConnections;
    
        //              
        private ServerExecutor executor = new ServerExecutor();
    
        //          Future  
        private ConcurrentMap>> taskController;
    
        public RequestTask(LinkedBlockingQueue
                                   pendingConnections, ConcurrentHashMap>> taskController) {
            this.pendingConnections = pendingConnections;
            this.taskController = taskController;
        }
    
        public void run() {
            try {
                while (!Thread.currentThread().interrupted()) {
                    try {
                        Socket clientSocket = pendingConnections.take();
                        BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
                        String line = in.readLine();
                        ConcurrentCommand command;
    
                        String[] commandData = line.split(";");
                        System.out.println("Command: " + commandData[0]);
                        switch (commandData[0]) {
                            case "q":
                                System.out.println("Query");
                                command = new ConcurrentQueryCommand(clientSocket, commandData);
                                break;
                            case "r":
                                System.out.println("Report");
                                command = new ConcurrentReportCommand(clientSocket, commandData);
                                break;
                            case "s":
                                System.out.println("Status");
                                command = new ConcurrentStatusCommand(executor, clientSocket, commandData);
                                break;
                            case "z":
                                System.out.println("Stop");
                                command = new ConcurrentStopCommand(clientSocket, commandData);
                                break;
                            case "c":
                                System.out.println("Cancel");
                                command = new ConcurrentCancelCommand(clientSocket, commandData);
                                break;
                            default:
                                System.out.println("Error");
                                command = new ConcurrentErrorCommand(clientSocket, commandData);
                                break;
                        }
                        ServerTask> controller = (ServerTask>) executor.submit(command);
                        storeContoller(command.getUsername(), controller, command);
    
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            } catch (InterruptedException e) {
                // No Action Required
            }
        }
    
        //           Future  
        private void storeContoller(String userName, ServerTask>controller, ConcurrentCommand command) {
            taskController.computeIfAbsent(userName, k -> new ConcurrentHashMap>()).put(command, controller);
        }
    
        //   executor
        public void shutdown() {
            String message = "Request Task: "
                    + pendingConnections.size()
                    + " pending connections.";
            System.out.println(message);
            executor.shutdown();
        }
    
        //   executor            
        public void terminate() {
            try {
                executor.awaitTermination(1, TimeUnit.DAYS);
                executor.writeStatistics();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
     
    Executorの追加の方法
    次のExecutorメソッドを再ロードすることもできます.
  • shutdown():このメソッドを呼び出してexecutorを終了する必要があります.この方法を再ロードして、追加のリソースを解放することができます.この方法は、executorがキューに並んで実行されるすべてのタスクを処理するのを待つ
  • である.
  • shutdownNow():sutdown()メソッドとは異なり、executorがキュー中のタスク
  • を処理するのを待たない
  • submit()、invokeall()、invokeany():これらのメソッドを呼び出してexecutorに同時タスクを送信します.タスクをexecutorに追加するタスクキューの前または後に追加の操作が必要な場合は、それらを再ロードできます.タスクがキューに挿入される前または後に実行される追加の操作は、タスクの実行前または実行後とは異なり、タスクの実行前後に追加の操作を行う場合は、beforeExecute()メソッドとafterExecute()メソッドを再ロードする必要があります.

  • ScheduledThreadPoolExecutorクラスでは、タスクの実行を遅らせたり、周期的なタスクを遅らせたりする方法があります.
  • schedule():この方法では、指定された遅延後にタスクを実行できます.タスクは一度だけ実行されます.
  • scheduleAtFixedRate():この方法では、指定された遅延後にタスクを周期的に実行できます.これは、scheduleWithFixedDelay()とは異なり、scheduleWithFixedDelay()メソッドの場合、2回の実行間隔は、前回の実行が終了してから次の実行が開始されるまでの間隔です.scheduleAtFixedRate()の場合、2回の実行間隔は、前回の実行開始から次の実行開始までの間隔です.