rpcシリーズ4-はタイムアウトシーンを処理する.及び提供hook
5522 ワード
質問:クライアントがリモートコールを開始し、サービス側が長時間返さない場合はどうしますか?
これは呼び出しタイムアウトの問題に関連し、通常、sqlクエリータイムアウト、httpリクエストタイムアウトなど、多くのシーンでタイムアウト時間を規定しています.サービス側メソッドの実行時間が所定のtimeout時間を超える場合、クライアントは現在の呼び出しを呼び出し、TimeoutExceptionを投げ出す必要がある.
さて、次はRpcBuidlerを改造し、タイムアウトの処理をサポートさせます.同様に、予想されるテストスキームと結果を先に示します.
ClientTestでのテストコード:
非同期メソッドの実装経験がありますが、このタイムアウト処理プロセスは非同期と非常に類似しており、Futureメカニズムを利用して実装されています.doInvokeメソッドを再構築し、非同期タスクを返します.
コールバックメソッドinvokeは次のように変更されます.
このように改造された後,すべてのメソッド呼び出しはFutureによって結果を取得することが分かった.
Hookを提供し、開発者にRPCレベルのAOPを行わせる.
まず、タイトルが提供するHookインタフェースを見てみましょう.
hookが実現する機能は簡単で,クライアントがリモートコールを行う前後でbeforeとafterメソッドを実行する.
フック関数を追加する実行ロジックをdoInvokeメソッドで開始するだけでよい.次のようになります.
同時にasyncCallとinvokeメソッドの最後にafterの実行ロジックを追加する.具体的な実装はソースコードを見ることができます.
githubソースを添付
これは呼び出しタイムアウトの問題に関連し、通常、sqlクエリータイムアウト、httpリクエストタイムアウトなど、多くのシーンでタイムアウト時間を規定しています.サービス側メソッドの実行時間が所定のtimeout時間を超える場合、クライアントは現在の呼び出しを呼び出し、TimeoutExceptionを投げ出す必要がある.
さて、次はRpcBuidlerを改造し、タイムアウトの処理をサポートさせます.同様に、予想されるテストスキームと結果を先に示します.
// UserService :
public interface UserService {
// other method
/**
*
*/
public boolean timeoutTest();
}
//
public class UserServiceImpl implements UserService {
// other method
@Override
public boolean timeoutTest() {
try {
//
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {}
return true;
}
}
ClientTestでのテストコード:
@Test
public void timeoutTest(){
long beginTime = System.currentTimeMillis();
try {
boolean result = userService.timeoutTest();
} catch (Exception e) {
long period = System.currentTimeMillis() - beginTime;
System.out.println("period:" + period);
Assert.assertTrue(period < 3100);
}
}
非同期メソッドの実装経験がありますが、このタイムアウト処理プロセスは非同期と非常に類似しており、Futureメカニズムを利用して実装されています.doInvokeメソッドを再構築し、非同期タスクを返します.
private Future doInvoke(final RpcRequest request) throws IOException, ClassNotFoundException{
// FutureTask
Future retVal = (Future) handlerPool.submit(new Callable(){
@Override
public RpcResponse call() throws Exception {
Object res = null;
try{
// ,
Socket socket = new Socket(host,port);
try{
ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream in = new ObjectInputStream(socket.getInputStream());
try{
//
out.writeObject(request);
// server ---
res = in.readObject();
}finally{
out.close();
in.close();
}
}finally{
socket.close();
}
}catch(Exception e){
throw e;
}
return (RpcResponse)res;
}
});
return retVal;
}
コールバックメソッドinvokeは次のように変更されます.
@Override
public Object invoke(Object proxy, Method method,
Object[] args) throws Throwable {
// , null
if(asyncMethods.get().contains(method.getName())) return null;
Object retVal = null;
RpcRequest request = new RpcRequest(method.getName(), method.getParameterTypes(),args,RpcContext.getAttributes());
RpcResponse rpcResp = null;
try{
Future response = doInvoke(request);
//
rpcResp = (RpcResponse)response.get(TIMEOUT,TimeUnit.MILLISECONDS);
}catch(TimeoutException e){
throw e;
}catch(Exception e){}
if(!rpcResp.isError()){
retVal = rpcResp.getResponseBody();
}else{
throw new RpcException(rpcResp.getErrorMsg());
}
return retVal;
}
このように改造された後,すべてのメソッド呼び出しはFutureによって結果を取得することが分かった.
Hookを提供し、開発者にRPCレベルのAOPを行わせる.
まず、タイトルが提供するHookインタフェースを見てみましょう.
public interface ConsumerHook {
public void before(RpcRequest request);
public void after(RpcRequest request);
}
//
public class UserConsumerHook implements ConsumerHook{
@Override
public void before(RpcRequest request) {
RpcContext.addAttribute("hook key","this is pass by hook");
}
@Override
public void after(RpcRequest request) {
System.out.println("I have finished Rpc calling.");
}
}
hookが実現する機能は簡単で,クライアントがリモートコールを行う前後でbeforeとafterメソッドを実行する.
public final class RpcConsumer implements InvocationHandler{
//。。。
//
private ConsumerHook hook;
public RpcConsumer hook(ConsumerHook hook){
this.hook = hook;
return this;
}
static{
userService = (UserService)consumer.targetHostPort(host, port)
.interfaceClass(UserService.class)
.timeout(TIMEOUT)
.hook(new UserConsumerHook())//
.newProxy();
}
//。。。
}
//UserServiceImpl
public Map getMap() {
Map newMap = new HashMap();
newMap.put("name","getMap");
newMap.putAll(RpcContext.getAttributes());
return newMap;
}
フック関数を追加する実行ロジックをdoInvokeメソッドで開始するだけでよい.次のようになります.
private Future doInvoke(final RpcRequest request) throws IOException, ClassNotFoundException{
//
hook.before(request);
//。。。
}
同時にasyncCallとinvokeメソッドの最後にafterの実行ロジックを追加する.具体的な実装はソースコードを見ることができます.
githubソースを添付