AsyncHttpClientを使用した非同期リクエストの送信
14780 ワード
背景
同期要求をHttpClientで送信し,同時量が大きい場合にHttpClientの接続プールを用いて性能を向上させることが知られている.この方法は効果的ですが、アクセス量が極めて大きい場合やネットワークが悪い場合、ネットワークリクエストが遅く、他のリクエストがブロックされる場合もあります.したがって、ネットワークリクエストを他のリクエストに影響を与えることなく、非同期のリクエストに変更することができます.
では、非同期リクエストはどのように送信されますか?非同期リクエストはAsyncHttpClientクラスを使用して実装できます.
主な方法:
コールバック方法:
【備考】各メソッドの先頭で
callbackの結果を取得するには、次の手順に従います.
次に、テストクラスを新規作成して、効果を確認します.
同期要求をHttpClientで送信し,同時量が大きい場合にHttpClientの接続プールを用いて性能を向上させることが知られている.この方法は効果的ですが、アクセス量が極めて大きい場合やネットワークが悪い場合、ネットワークリクエストが遅く、他のリクエストがブロックされる場合もあります.したがって、ネットワークリクエストを他のリクエストに影響を与えることなく、非同期のリクエストに変更することができます.
では、非同期リクエストはどのように送信されますか?非同期リクエストはAsyncHttpClientクラスを使用して実装できます.
主な方法:
/**
*
*/
@Component
public class AbcHttpClientService implements InitializingBean {
private final static String ABC_URL_PREFIX = "http://www.abc.com/";
private CloseableHttpAsyncClient httpAsyncClient; // httpclient
private volatile RequestConfig config; //
@Autowired
private ILogService logService;
@Override
public void afterPropertiesSet() throws Exception {
init();
}
/**
*
* @param reqParamMap
* @return
*/
public AbcHttpFutureCallback doGet(Map reqParamMap){
CloseableHttpAsyncClient httpAsyncClient = getHttpClient();
String reqUrl = HttpUtil.buildUrl(ABC_URL_PREFIX,null,reqParamMap); // URL
// post , url post
HttpPost httpPost = new HttpPost(reqUrl); // Abc URL
httpPost.setConfig(config);
long begin = System.currentTimeMillis();
AbcHttpFutureCallback callback = null;
try{
CountDownLatch latch = new CountDownLatch(1);
callback = new AbcHttpFutureCallback(JSON.toJSONString(reqParamMap),reqUrl,begin, this.config.getConnectTimeout(), latch,
ThreadLocalHolder.getThreadHolder(), logService);
// , ( )
httpAsyncClient.execute(httpPost,callback);
}catch(Exception e){
//
String param = JSON.toJSONString(reqParamMap);
String logStr = AdamExceptionUtils.getStackTrace(e);
if (TimeoutUtil.isTimeOut(e)) {
logService.sendOverTimeAccountLog(param, logStr, reqUrl, "Abc ");
} else {
logService.sendTechnologyErrorAccountLog(param, logStr, reqUrl, "Abc ");
}
}
return callback;
}
private void init() throws Exception {
if (null != this.httpAsyncClient) {
return;
}
refresh();
}
public synchronized void refresh() {
String AbcServiceTimeout = ConfigContainer.getProperty(Constants.ABC_SERVICE_TIMEOUT);
Integer timeout = 80;
if (StringHelper.isNumber(AbcServiceTimeout)) {
timeout = Integer.valueOf(AbcServiceTimeout);
}
this.config = RequestConfig.custom().setSocketTimeout(timeout).setConnectTimeout(timeout)
.setConnectionRequestTimeout(timeout).build();
try {
this.httpAsyncClient = AsynHttpClientHelper.initAsynHttpClient(timeout, false);
} catch (Exception e) {
this.httpAsyncClient = null;
}
}
/**
* httpclient
*/
private CloseableHttpAsyncClient getHttpClient() {
if (null == this.httpAsyncClient) {
for (int i = 0; i < 3; i++) {
try {
init();
} catch (Exception e) {
String logStr = AdamExceptionUtils.getStackTrace(e);
logService.sendTechnologyErrorAccountLog("", logStr, "", "Abc getHttpClient ");
}
}
}
return this.httpAsyncClient;
}
}
public class AsynHttpClientHelper {
public static CloseableHttpAsyncClient initAsynHttpClient(int timeout, boolean isSetTimeout) throws Exception {
// http socket
Registry sessionStrategyRegistry = RegistryBuilder. create().register("http", NoopIOSessionStrategy.INSTANCE).build();
//
ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor();
//
PoolingNHttpClientConnectionManager connectionManager = new PoolingNHttpClientConnectionManager(ioReactor, sessionStrategyRegistry);
MessageConstraints messageConstraints = MessageConstraints.custom().setMaxHeaderCount(200).setMaxLineLength(20000).build();
ConnectionConfig connectionConfig = ConnectionConfig.custom().setMalformedInputAction(CodingErrorAction.IGNORE).setUnmappableInputAction(CodingErrorAction.IGNORE).setCharset(Consts.UTF_8).setMessageConstraints(messageConstraints).build();
connectionManager.setDefaultConnectionConfig(connectionConfig);
//
connectionManager.setMaxTotal(500); //
connectionManager.setDefaultMaxPerRoute(500); //
CloseableHttpAsyncClient httpAsyncClient = null;
if (isSetTimeout) {
//setConnectTimeout: ,
//setConnectionRequestTimeout: connect Manager( ) Connection , 。
//setSocketTimeout: ( ), 。
RequestConfig config = RequestConfig.custom().setSocketTimeout(timeout).setConnectTimeout(timeout).setConnectionRequestTimeout(timeout).build();
httpAsyncClient = HttpAsyncClients.custom().setConnectionManager(connectionManager).setDefaultRequestConfig(config).build();
} else {
httpAsyncClient = HttpAsyncClients.custom().setConnectionManager(connectionManager).build();
}
httpAsyncClient.start();
return httpAsyncClient;
}
}
コールバック方法:
// , httpclient
public abstract class AbcFutureCallback implements FutureCallback {
/**
*
*/
protected ThreadHolder threadHolder = new ThreadHolder();
public void setThreadHolder(ThreadHolder threadHolder) {
this.threadHolder.copy(threadHolder);
}
public ThreadHolder getThreadHolder() {
return threadHolder;
}
}
/**
*
*/
public class AbcHttpFutureCallback extends AbcFutureCallback {
private String url;
private String param;
private long starttime;
private long timeout;
private String responseString;
private ILogService logService;
private CountDownLatch countDownLatch;
private AbcResponse AbcResponse;
public AbcHttpFutureCallback(String param, String url, long starttime, long timeout, CountDownLatch countDownLatch,
ThreadHolder threadHolder, ILogService logService){
this.param = param;
this.url = url;
this.starttime = starttime;
this.timeout = timeout;
this.countDownLatch = countDownLatch;
this.logService = logService;
setThreadHolder(threadHolder);
}
@Override
public void completed(HttpResponse response) {
ThreadLocalHolder.setThreadHolder(threadHolder);
HttpEntity entity = response.getEntity();
byte[] bytes = null;
//
try{
Header contentEncodingHeader = response.getFirstHeader("Content-Encoding");
if (contentEncodingHeader != null && contentEncodingHeader.getValue().toLowerCase().indexOf("gzip") > -1) {
bytes = EntityUtils.toByteArray(new GzipDecompressingEntity(entity));
} else {
bytes = EntityUtils.toByteArray(response.getEntity());
}
responseString = new String(bytes, "utf-8");
}catch (Exception e){
responseString = String.format("Abc entity error: bytes: %s error msg: %s", bytes,
AdamExceptionUtils.getStackTrace(e));
}
if (null == responseString || isSuccess(response.getStatusLine().getStatusCode()) == false) {
if (null != logService) {
logService.sendErrorAccountLog("Abc : ");
}
responseString = responseString == null?"Abc " : responseString;
} else {
responseString = responseString.replace("'", "'");
responseString = responseString.replace("&", "&");
responseString = responseString.replace(" ", " ");
try{
AbcResponse = JsonUtil.fromStr(responseString,AbcResponse.class);
}catch (Exception ex){
String logStr = AdamExceptionUtils.getStackTrace(ex);
logService.sendOverTimeAccountLog(param, logStr, url, "Abc ");
}
}
//
if (logService.isNeedLog()) {
long end = System.currentTimeMillis();
RequestLogEntity requestLogEntity = new RequestLogEntity();
requestLogEntity.setUrl(url);
requestLogEntity.setHeader("");
requestLogEntity.setRequest(param);
requestLogEntity.setResponse(responseString);
requestLogEntity.setUseTime(end - starttime);
logService.sendRequestLog(requestLogEntity);
}
countDownLatch.countDown(); // countDownLatch
}
/**
*
* @param ex
*/
@Override
public void failed(Exception ex) {
ThreadLocalHolder.setThreadHolder(threadHolder);
String logStr = AdamExceptionUtils.getStackTrace(ex);
if (TimeoutUtil.isTimeOut(ex)) {
logService.sendOverTimeAccountLog(param, logStr, url, " Abc ");
} else {
logService.sendTechnologyErrorAccountLog(param, logStr, url, " Abc ");
}
}
/**
*
*/
@Override
public void cancelled() {
ThreadLocalHolder.setThreadHolder(threadHolder);
logService.sendTechnologyErrorAccountLog(param, "canceled", url, " Abc ");
}
public ILogService getLogService() {
return logService;
}
public long getTimeout() {
return timeout;
}
public CountDownLatch getCountDownLatch() {
return countDownLatch;
}
public AbcResponse getAbcResponse() {
return AbcResponse;
}
public void setAbcResponse(AbcResponse AbcResponse) {
this.AbcResponse = AbcResponse;
}
private static boolean isSuccess(int respStatusCode) {
if (respStatusCode == HttpStatus.SC_OK || respStatusCode == HttpStatus.SC_CREATED) {
return true;
} else {
return false;
}
}
}
@Component
public class LogService implements ILogService {
//
private boolean isNeedInfoLog() {
if (null == ThreadLocalHolder.getRequestLogFlag()) {
return false;
}
return ThreadLocalHolder.getRequestLogFlag() >= 2;
}
/**
* 0 log,1
*/
private boolean isNeedErrorLog() {
if (null == ThreadLocalHolder.getRequestLogFlag()) {
return false;
}
return ThreadLocalHolder.getRequestLogFlag() >= 1;
}
}
public class ThreadLocalHolder {
private static ThreadLocal contextHolder = new ThreadLocal();
public ThreadLocalHolder() {
}
public static Integer getRequestLogFlag() {
if (null == contextHolder.get()) {
initRunningAccount();
}
return ((ThreadHolder)contextHolder.get()).getRequestLogFlag();
}
public static void setRequestLogFlag(Integer requestLogFlag) {
if (null == contextHolder.get()) {
initRunningAccount();
}
((ThreadHolder)contextHolder.get()).setRequestLogFlag(requestLogFlag);
}
public static ThreadHolder getThreadHolder() {
return (ThreadHolder)contextHolder.get();
}
public static void setThreadHolder(ThreadHolder threadHolder) {
contextHolder.set(threadHolder);
}
//
}
【備考】各メソッドの先頭で
ThreadLocalHolder.setThreadHolder(threadHolder);
操作を行う理由は、非同期呼び出しがマルチスレッドを別途開いて操作するため、コールバックを行う際にログの記録に現在のスレッドの情報が必要となるため、ThreadLocalHolder.setThreadHolder(threadHolder)に現在のスレッドを示すためである.実は主にログを記録するためです.callbackの結果を取得するには、次の手順に従います.
public class AbcHttpResultGetter{
protected AbcFutureCallback futureCallback;
public void setFutureCallback(AdsFutureCallback futureCallback) {
this.futureCallback = futureCallback;
}
@Override
protected AbcResponse getResultFromCallback() {
if (null == this.futureCallback || !(this.futureCallback instanceof AbcHttpFutureCallback)) {
return null;
}
AbcHttpFutureCallback callback = (AbcHttpFutureCallback)this.futureCallback;
CountDownLatch latch = callback.getCountDownLatch();
ILogService logService = callback.getLogService();
try{
// , latch 0,
if (!latch.await(callback.getTimeout(), TimeUnit.MILLISECONDS)) {
// failed()
callback.failed(new RuntimeException("Abc CountDownLatch wait timeout"));
}
return callback.getAbcResponse();
}catch (Exception e){
if (null != logService) {
logService.sendErrorAccountLog(" Abc CountDownLatch wait error:" + AdamExceptionUtils.getStackTrace(e));
}
return null;
}
}
}
次に、テストクラスを新規作成して、効果を確認します.
public class AbcHttpClientServiceTest {
@Mock
private ILogService logService;
@InjectMocks
private AbcHttpClientService AbcHttpClientService;
public AbcHttpClientServiceTest() {
ThreadLocalHolder.setThreadHolder(new ThreadHolder());
// debug,
AdsSystemPropertyContainer.addSysConfig(Constants.ABC_SERVICE_TIMEOUT, "400000");
MockitoAnnotations.initMocks(this);
}
@Test
public void doGetTest() throws InterruptedException {
Map reqParamMap = new HashMap<>();
reqParamMap.put("user", "f8b99fd6-e6f7-375f-9532-772a86e55c");
reqParamMap.put("userid", "207887422");
AbcHttpFutureCallback callback = AbcHttpClientService.doGet(reqParamMap);
AbcHttpResultGetter resultGetter = new AbcHttpResultGetter();
resultGetter.setFutureCallback(callback);
AbcResponse result = resultGetter.getResultFromCallback();
System.out.println(rresult.toString());
}
}