【妄想】JNIとスレッドプールのメンテナンス

13449 ワード

JNIではC/C++コードで作成されたリソースはJava GCで処理されないため,ここでのリソースはC/C++コードで明確に解放されなければならない.JNIでは、C/C++コールバックJavaのメソッドはCallXXXmethod関数を呼び出して実装され、コールバックのメソッドが終了すると、C/C++は次の行のコードを実行します.
したがって,C/C++によって作成されたOSスレッドはrunメソッドを実行した後に解放されるはずであり,そうでなければスレッドを解放する他の適切な時点もないようであると推測される.意味的には、スレッドのタスクが完了した以上、スレッドが何をしているのか、解放されるべきだからです.
スレッドの作成と解放のオーバーヘッドを低減するために、スレッドプールを維持する必要がある場合があります.一部のスレッドが現在のタスクを完了した後、スレッドプールに戻され、次のタスクを受け入れるのを待つ必要があります.前述の推測によればrunメソッドが終了するとOSスレッドが解放される.スレッドプールを維持するには、スレッドを解放できません.だからrunメソッドの戻りを阻止します.その時ルーンはtarget.run実行が完了したらwaitを利用してスレッドを掛けます.新しいtargetがあるまで、現在のスレッドが起動しません.
以下は、旧版『Thinking in Enterprise Java』で見たスレッドプールの実装です.
public class Worker extends Thread { //     
    public static final Logger logger = Logger.setLogger("Worker"); //   
    private String workerId; //   ID
    private Runnable task;   //    
    private ThreadPool threadPool;  //     ,    。

    static { //   ,  logger
        try {
            logger.setUseParentHandlers(false);
            FileHandler ferr = new FileHandler("WorkerErr.log");
            ferr.setFormatter(new SimpleFormatter());
            logger.addHandler(ferr);
        } catch(IOException e) {
            System.out.println("Logger not initialized.");
        }
    }

    public Worker(String id, ThreadPool pool) {
        workerId = id;
        threadPool = pool;
        start();    //     
    }

    public void setTask(Runnable t) { //       ,     ,      。
        task = t;
        synchronized(this) {
            notify(); //wait、notify           
        }
    }

    public void run() {
        try {
            while(!threadPool.isStopped()) { //
                synchronized(this) {
                    if(task != null) {
                        try {
                            task.run(); //    
                        } catch(Exception e) {
                            logger.log(Level.SERVER, "Exception in source Runnable task", e);
                        }
                        threadPool.putWorker(this); //
                    }
                    wait(); //         ,    。          。

                }
            }
            //    ,          ,         
            System.out.println(this + " Stopped");
        } catch(InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public String toString() {
        return "Worker: " + workerId;
    }

}
public class ThreadPool extends Thread { //   ,        ,         
    private static final int DEFAULT_NUM_WORKERS = 5; //        5
    private LinkedList workerPool = new LinkedList(); //       
    private LinkedList taskQueue = new LinkedList();  //    
    private boolean stopped = false; //        
    
    public ThreadPool() { //
        this(DEFAULT_NUM_WORKERS);
    }

    public ThreadPool(int numOfWorkers) { //        
        for(int i=0;i){
            workerPool.add(new Worker("" + i, this));
        }
        start(); //     
    }

    public void run() { //    
        try {
            while(!stopped) {
                if(taskQueue.isEmpty()) { //        ,      。                 
                    synchronized(taskQueue) {
                        taskQueue.wait(); //         wait  ,            。
                    }
                } else if(workerPool.isEmpty()) { //         ,         。
                    synchronized(workerPool) {
                        workerPool.wait();
                    }
                }
                //   ,           =>             ,              
                getWorker().setTask((Runnable)taskQueue.removeLast());
            }
        } catch(InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public void addTask(Runnable task) {
        synchronized(taskQueue) {
            taskQueue.addFirst(task);
            taskQueue.notify(); //       ,             ,         
        }
    }

    public void putWorker(Worker worker) {
        synchronized(workerPool) {
            workerPool.addFirst(worker);
            workerPool.notify(); //         ,                  ,        
        }
    }

    public Worker getWorker() {
        return (Worker) workerPool.removeLast(); //        ,       。
    }

    public boolean isStopped() {
        return stopped;
    }

    public void stopThreads() { //     
        stopped = true;
        Iterator it = workerPool.Iterator();
        //            ,        ThreadPool   ,   run   =>   OS  
        while(it.hasNext()) { 
            Worker w = (Worker)it.next();
            synchronized(w) {
                w.notify();
            }
        }
    }

}

このコードに対する認識は,注釈にはっきりと表現されている.また、ThreadPoolとWokerの関係は少しオブザーバーモードの味がして、Wokerはオブザーバー、ThreadPoolはオブザーバー/テーマだと思います.しかし、標準的なオブザーバーモードとは異なり、ThreadPoolは新しいタスクを受け入れ(変更された)、すべてのWorkerに通知しなかった.