非同期スレッドプールThreadPoolTaskExecutorによる一括処理
ケース:ユーザーが商品リストを検索した結果,セットは約100 Wの商品があり,クリックしてロットアップ/ダウンを行った.
一、非同期スレッドプールの構成
1.springboot
クラスの設定
プロファイルアプリケーション.yml
2.spring
application-context.xml
二、業務層が非同期スレッドプールを導入する
ここで、BatchUpdatePrdByParamsExecutorクラスはRunnableインタフェースの実装クラスであり、そのビジネスロジックに必要なデータとオブジェクトは、すべてコンストラクタによって伝達される.なお、ここでバックグラウンドスレッドに入ると、要求はすぐにユーザに応答するので、ユーザが応答結果を得てもデータがまだ処理されていない現象を避けるためには、ユーザ応答ページの設定待ちの効果(猿友を倒すことは難しいと信じている)が望ましい.
三、スレッド類処理
一、非同期スレッドプールの構成
1.springboot
クラスの設定
package ***;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
*
* @author yanxh
*
*/
@Configuration
@EnableAsync
public class ThreadPoolConfig {
/**
*
*/
@Value("${async.executor.thread.core_pool_size}")
private int corePoolSize;
/**
*
*/
@Value("${async.executor.thread.max_pool_size}")
private int maxPoolSize;
/**
*
*/
@Value("${async.executor.thread.queue_capacity}")
private int queueCapacity;
/**
*
*/
@Value("${async.executor.thread.keep_alive_seconds}")
private int keepAliveSeconds;
/**
* ( )
*/
private CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
private String threadNamePrefix = "AsyncExecutorThread-";
@Bean(name = "taskExecutor")
public ThreadPoolTaskExecutor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(keepAliveSeconds);
executor.setRejectedExecutionHandler(callerRunsPolicy);
executor.setThreadNamePrefix(threadNamePrefix);
executor.setRejectedExecutionHandler(callerRunsPolicy);
executor.initialize();
return executor;
}
}
プロファイルアプリケーション.yml
#
async:
executor:
thread:
core_pool_size : 10
max_pool_size : 50
queue_capacity : 1000
keep_alive_seconds : 300
2.spring
application-context.xml
二、業務層が非同期スレッドプールを導入する
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
/**
*
* @param params
*/
public void batchUpdatePrdByParams(Map params){
taskExecutor.execute(new BatchUpdatePrdByParamsExecutor(redisClient, txManager, prdBaseMapper));
}
ここで、BatchUpdatePrdByParamsExecutorクラスはRunnableインタフェースの実装クラスであり、そのビジネスロジックに必要なデータとオブジェクトは、すべてコンストラクタによって伝達される.なお、ここでバックグラウンドスレッドに入ると、要求はすぐにユーザに応答するので、ユーザが応答結果を得てもデータがまだ処理されていない現象を避けるためには、ユーザ応答ページの設定待ちの効果(猿友を倒すことは難しいと信じている)が望ましい.
三、スレッド類処理
package ....;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import com.chongdong.data.entity.PrdBase;
import com.chongdong.data.mapper.PrdBaseMapper;;
import com.github.pagehelper.PageHelper;
/**
* @author yanxh
*
*/
public class BatchUpdatePrdByParamsExecutorimplements Runnable {
Logger log = LoggerFactory.getLogger(BatchUpdatePrdByParamsExecutor.class);
private DataSourceTransactionManager txManager;
private PrdBaseMapper prdBaseMapper;
public BatchUpdatePrdByParamsExecutor(DataSourceTransactionManager txManager, PrdBaseMapper prdBaseMapper) {
this.txManager = txManager;
this.prdBaseMapper = prdBaseMapper;
}
@Override
public void run() {
int pageSize = 1000; //
while (true) {
/**
*
*/
PageHelper.startPage(1, pageSize, false);
List baseResult = ;
if (CollectionUtils.isEmpty(baseResult)) {
break;
}
// spring thread ,
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
def.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
TransactionStatus rollbackPoint = txManager.getTransaction(def);//
try {
//
txManager.commit(rollbackPoint);
/**
* ,
*/
if (baseResult.size() < pageSize) {
break;
}
} catch (Exception e) {
log.error(e.getMessage());
e.printStackTrace();
//
txManager.rollback(rollbackPoint);
}
}
}
}
, , , , , , , , , , , , , , 。