非同期スレッドプールThreadPoolTaskExecutorによる一括処理


ケース:ユーザーが商品リストを検索した結果,セットは約100 Wの商品があり,クリックしてロットアップ/ダウンを行った.
 
一、非同期スレッドプールの構成
1.springboot
クラスの設定
package ***;

import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
 *      
 * @author yanxh
 *
 */
@Configuration
@EnableAsync
public class ThreadPoolConfig {

    /**
     *      
     */
    @Value("${async.executor.thread.core_pool_size}")
    private int corePoolSize;

    /**
     *      
     */
    @Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize;

    /**
     *       
     */
    @Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity;

    /**
     *                
     */
    @Value("${async.executor.thread.keep_alive_seconds}")
    private int keepAliveSeconds;

    /**
     *         (     )     
     */
    private CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();

    private String threadNamePrefix = "AsyncExecutorThread-";

    @Bean(name = "taskExecutor")
    public ThreadPoolTaskExecutor asyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setKeepAliveSeconds(keepAliveSeconds);
        executor.setRejectedExecutionHandler(callerRunsPolicy);
        executor.setThreadNamePrefix(threadNamePrefix);
        executor.setRejectedExecutionHandler(callerRunsPolicy);
        executor.initialize();
        return executor;
    }
    
}

プロファイルアプリケーション.yml
#     
async:
  executor:
    thread:
      core_pool_size : 10
      max_pool_size : 50
      queue_capacity : 1000
      keep_alive_seconds : 300

 
2.spring
application-context.xml
    
	
		
		
		
		
		
		
		
		
		
		
			
		
	

 
二、業務層が非同期スレッドプールを導入する
	@Autowired
	private ThreadPoolTaskExecutor taskExecutor;


	/**
	 *       
	 * @param params
	 */
	public void batchUpdatePrdByParams(Map params){
		taskExecutor.execute(new BatchUpdatePrdByParamsExecutor(redisClient, txManager, prdBaseMapper));
	}

ここで、BatchUpdatePrdByParamsExecutorクラスはRunnableインタフェースの実装クラスであり、そのビジネスロジックに必要なデータとオブジェクトは、すべてコンストラクタによって伝達される.なお、ここでバックグラウンドスレッドに入ると、要求はすぐにユーザに応答するので、ユーザが応答結果を得てもデータがまだ処理されていない現象を避けるためには、ユーザ応答ページの設定待ちの効果(猿友を倒すことは難しいと信じている)が望ましい.
 
三、スレッド類処理
package ....;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;

import com.chongdong.data.entity.PrdBase;
import com.chongdong.data.mapper.PrdBaseMapper;;
import com.github.pagehelper.PageHelper;

/**
 * @author yanxh
 * 
 */
public class BatchUpdatePrdByParamsExecutorimplements Runnable {

	Logger log = LoggerFactory.getLogger(BatchUpdatePrdByParamsExecutor.class);
    
	private DataSourceTransactionManager txManager;

	private PrdBaseMapper prdBaseMapper;
	
	public BatchUpdatePrdByParamsExecutor(DataSourceTransactionManager txManager, PrdBaseMapper prdBaseMapper) {
		this.txManager = txManager;
		this.prdBaseMapper = prdBaseMapper;
	}

	@Override
	public void run() {
		
		int pageSize = 1000; //        
		while (true) {
			/**
			 *          
			 */
			PageHelper.startPage(1, pageSize, false);
			List baseResult =            ;
			if (CollectionUtils.isEmpty(baseResult)) {
				break;
			}
			// spring    thread   ,       
			DefaultTransactionDefinition def = new DefaultTransactionDefinition();
			def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
			def.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
			TransactionStatus rollbackPoint = txManager.getTransaction(def);//      
			try {

				       


				//     
				txManager.commit(rollbackPoint);
				/**
				 *           ,    
				 */
				if (baseResult.size() < pageSize) {
					break;
				}
			} catch (Exception e) {
				log.error(e.getMessage());
				e.printStackTrace();
				//     
				txManager.rollback(rollbackPoint);
			}
		}
	}
}
       ,           ,      ,        ,    ,      ,        ,    ,       ,              ,                  ,            ,              ,             ,       。