スレッドプール消費MQメッセージキューソリューション


最近、私の手元にあるメールプラットフォームを最適化し、メッセージキューを使用してメールを送信するメッセージキューの傍受は、1回の送信を処理してから、2番目のメッセージコンテンツを取得することができることを発見しました.スレッドプールでMQ内のタスクを消費できるかどうか考えていましたが、問題が来て、スレッドプールを使うとスレッドプールがいっぱいになってからは断固として受信します.また、スレッドプールのキューに多くのメッセージが格納されていると、サービスを再起動するとメッセージが失われます. どうすればいいのか、考えてみれば、スレッドプールが自分でラインオフプールをカプセル化する実現を継承しよう.最初に思いついたのは、ThreadPoolExecuterを継承し、beforeExecuteとafterExecuteを書き換えて前後置処理を行うときにロックをかけ、ロックを解除してスレッドプールにタスクを実行する前にブロックさせることです.
public class BlockThreadPoolExecute extends ThreadPoolExecutor {
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
    }
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        try{
            lock.lock();
            this.condition.signal();
        }finally {
            this.lock.unlock();
        }
    }
}

 試験後、beforeExecuteは機能しないことが分かった.ソースコードを見てからbeforeExecuteメソッドはスレッドタスクが実行されるときに実行されることに気づきました.その結果、ブロックされたのは新しいスレッドで、MQがスレッドプールにタスクを置くことに影響しません.beforeExecuteは使えないので、仕方なくオンラインプールで実行する方法で文章を書くしかありません.
/**
 *      
 *                   
 *         MQ    
 *      ,               
 */
public class BlockThreadPoolExecute extends ThreadPoolExecutor {
    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = this.lock.newCondition();

    public BlockThreadPoolExecute(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    public void execute(Runnable command) {
        //      
        this.lock.lock();
        super.execute(command);
        try {
            //                    ,       
            if (getPoolSize() == getMaximumPoolSize()) {
                this.condition.await();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            this.lock.unlock();
        }
    }
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        try{
            lock.lock();
            this.condition.signal();
        }finally {
            this.lock.unlock();
        }


    }
}

テスト済み
public class TestThreadPools {
/*          SynchronousQueue       ,         */
public static final  ExecutorService poolExecuter = new BlockThreadPoolExecute(3
			, 3 , 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));
    public static void main(String[]args) {
        
        for (int i=0;i<100;i++) {
            poolExecuter.execute(()->{
                String threadName= Thread.currentThread().getName();
                System.out.println(threadName+"    ");
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(threadName+"    ");
            });
        }

    }
}

実行結果
pool-1-thread-1    1562132691801
pool-1-thread-3    1562132691801
pool-1-thread-2    1562132691801
pool-1-thread-1    1562132693802
pool-1-thread-3    1562132693802
pool-1-thread-2    1562132693802
pool-1-thread-1    1562132693803
pool-1-thread-3    1562132693803
pool-1-thread-3    1562132695807
pool-1-thread-2    1562132695808
pool-1-thread-1    1562132695809
pool-1-thread-3    1562132695809
pool-1-thread-2    1562132697809
pool-1-thread-1    1562132697809
pool-1-thread-3    1562132697810
pool-1-thread-2    1562132697810

スレッドプールがブロックされていることを確認し、空きスレッドがある場合にのみ実行されるので、メッセージキューの消費メッセージを安心して入手できます.