スレッドプールとキャッシュキューで実装された非同期スレッドマネージャは、ThreadPoolManagerソースコードを適用します.
5059 ワード
ビジネスシーン
ソース:
A B , B , B , , :
ソース:
/**
* com.hshc.threadpool.ThreadPoolManager.java
* Copyright 2017 Lifangyu, Inc. All rights reserved.
* PROPRIETARY/CONFIDENTIAL.Use is subject to license terms.
*/
package com.hshc.threadpool;
import lombok.extern.slf4j.Slf4j;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.*;
/**
* Desc:
*
* @author lifangyu
* @date 2017/12/8.
*/
@Slf4j
public class ThreadPoolManager {
private static ThreadPoolManager instance;
/**
*
*/
private static int COREPOOLSIZE = 4;
/**
*
*/
private static int MAXPOOLSIZE = 10;
/**
*
*/
private static int KEEPALIVETIME = 0;
/**
*
*/
private static int WORKQUEUESIZE = 10;
/**
*
*/
Queue msgQueue = new LinkedList<>();
/**
* , [ ]
*
* @param corepoolsize
* @param maxpoolsize
* @param keepalivetime
* @param workqueuesize
* @return ThreadPoolManager
* @author lifangyu
*/
public static ThreadPoolManager newInstance(int corepoolsize, int maxpoolsize, int keepalivetime, int workqueuesize) {
COREPOOLSIZE = corepoolsize;
MAXPOOLSIZE = maxpoolsize;
KEEPALIVETIME = keepalivetime;
WORKQUEUESIZE = workqueuesize;
instance = new ThreadPoolManager();
return instance;
}
/**
* [ ]
*
* @return ThreadPoolManager
* @author lifangyu
*/
public static ThreadPoolManager getInstance() {
if (instance == null) {
synchronized (new Object()) {
if (instance == null) {
instance = new ThreadPoolManager();
}
}
}
return instance;
}
/**
* [ ]
* , , AccessDBThread,
*/
final Runnable accessBufferThread = new Runnable() {
@Override
public void run() {
if (hasMoreAcquire()) {
T vo = (T) msgQueue.poll();
Runnable task = new CallBackThread(vo);
threadPool.execute(task);
}
}
};
final RejectedExecutionHandler handler = new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
log.info(" ID:{} execute thread [{}] ", Thread.currentThread().getId(), ((CallBackThread) r).getVo().toString());
msgQueue.offer(((CallBackThread) r).getVo());
}
};
/**
*
*/
@SuppressWarnings({"rawtypes", "unchecked"})
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(COREPOOLSIZE, MAXPOOLSIZE, KEEPALIVETIME,
TimeUnit.SECONDS, new ArrayBlockingQueue(WORKQUEUESIZE), this.handler);
/**
*
*/
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(100);
@SuppressWarnings("rawtypes")
final ScheduledFuture taskHandler = scheduler.scheduleAtFixedRate(accessBufferThread, 0, 1, TimeUnit.SECONDS);
private boolean hasMoreAcquire() {
return !msgQueue.isEmpty();
}
public void noticeCallBack(String url,T vo) {
Runnable task = new CallBackThread(url,vo);
threadPool.execute(task);
}
}
/**
* com.hshc.threadpool.CallBackThread.java
* Copyright 2017 Lifangyu, Inc. All rights reserved.
* HuaShengHaoChe PROPRIETARY/CONFIDENTIAL.Use is subject to license terms.
*/
package com.hshc.threadpool;
import com.hshc.common.utils.PropertiesUtil;
import com.hshc.common.utils.RestHttpUtil;
import com.hshc.wh.eas.domain.EasVoucherInstorageRequestVo;
import com.hshc.wh.eas.domain.EasVoucherMaterialRequestVo;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
/**
* Desc:TODO
*
* @author lifangyu
* @date 2017/12/8.
*/
@Slf4j
@Data
public class CallBackThread implements Runnable {private String url;
private T vo;
public CallBackThread(T vo) {
if (vo instanceof EasVoucherMaterialRequestVo) {
this.url = PropertiesUtil.getConfigValueByKey("hshc.whapi.eas.material.url");
} else if (vo instanceof EasVoucherInstorageRequestVo) {
this.url = PropertiesUtil.getConfigValueByKey("hshc.whapi.eas.purchase.url");
}
log.info("CallBackThread.url:{}", url);
this.vo = vo;
}
public CallBackThread(String url, T vo) {
this.url = url;
this.vo = vo;
}
@Override
public void run() {
log.info(" ID:{} execute callbackFeignClient.recharge start [url:{},param:{}] ", Thread.currentThread().getId(), url, vo.toString());
//
String result = RestHttpUtil.sendPost(url, vo);
log.info(" ID:{} execute callbackFeignClient.recharge end [result:{}] ", Thread.currentThread().getId(), vo.toString(), result);
}
}
1. jar:lombok;
2. :Queue msgQueue, , msgQueue , , , .