rpcシリーズ4-はタイムアウトシーンを処理する.及び提供hook

5522 ワード

質問:クライアントがリモートコールを開始し、サービス側が長時間返さない場合はどうしますか?
これは呼び出しタイムアウトの問題に関連し、通常、sqlクエリータイムアウト、httpリクエストタイムアウトなど、多くのシーンでタイムアウト時間を規定しています.サービス側メソッドの実行時間が所定のtimeout時間を超える場合、クライアントは現在の呼び出しを呼び出し、TimeoutExceptionを投げ出す必要がある.
さて、次はRpcBuidlerを改造し、タイムアウトの処理をサポートさせます.同様に、予想されるテストスキームと結果を先に示します.
//    UserService                :
public interface UserService {
    
    // other method
    
    /**
     *     
     */
    public boolean timeoutTest();
    
}
//   
public class UserServiceImpl implements UserService {
    
     // other method
    
    @Override
    public boolean timeoutTest() {
        try {
            //       
            Thread.sleep(10 * 1000);
        } catch (InterruptedException e) {}
        return true;
    }
}

ClientTestでのテストコード:
    @Test
    public void timeoutTest(){
        long beginTime = System.currentTimeMillis();
        try {
            boolean result = userService.timeoutTest(); 
        } catch (Exception e) {
            long period = System.currentTimeMillis() - beginTime;
            System.out.println("period:" + period);
            Assert.assertTrue(period < 3100);
        }
    }

非同期メソッドの実装経験がありますが、このタイムアウト処理プロセスは非同期と非常に類似しており、Futureメカニズムを利用して実装されています.doInvokeメソッドを再構築し、非同期タスクを返します.
    private Future doInvoke(final RpcRequest request) throws IOException, ClassNotFoundException{

        //     FutureTask    
        Future retVal = (Future) handlerPool.submit(new Callable(){
            @Override
            public RpcResponse call() throws Exception {
                Object res = null;
                try{
                    //    ,       
                    Socket socket = new Socket(host,port);
                    try{
                        ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
                        ObjectInputStream in = new ObjectInputStream(socket.getInputStream());
                        try{
                            //  
                            out.writeObject(request);
                            //  server      ---  
                            res = in.readObject();
                        }finally{
                            out.close();
                            in.close();
                        }
                    }finally{
                        socket.close();
                    }
                }catch(Exception e){
                    throw e;
                }
                return (RpcResponse)res;
            }
        });
        return retVal;
    }

コールバックメソッドinvokeは次のように変更されます.
    @Override
    public Object invoke(Object proxy, Method method,
            Object[] args) throws Throwable {
        //       ,    null
        if(asyncMethods.get().contains(method.getName())) return null;
        Object retVal = null;

        RpcRequest request = new RpcRequest(method.getName(), method.getParameterTypes(),args,RpcContext.getAttributes());
        RpcResponse rpcResp  = null;
        try{
            Future response = doInvoke(request);
            //      
            rpcResp  = (RpcResponse)response.get(TIMEOUT,TimeUnit.MILLISECONDS);
        }catch(TimeoutException e){
            throw e;
        }catch(Exception e){}
        
        if(!rpcResp.isError()){
            retVal = rpcResp.getResponseBody();
        }else{
            throw new RpcException(rpcResp.getErrorMsg());
        }
        return retVal;
    }

このように改造された後,すべてのメソッド呼び出しはFutureによって結果を取得することが分かった.
Hookを提供し、開発者にRPCレベルのAOPを行わせる.
まず、タイトルが提供するHookインタフェースを見てみましょう.
public interface ConsumerHook {
    public void before(RpcRequest request);
    public void after(RpcRequest request);
}
//   
public class UserConsumerHook implements ConsumerHook{
    @Override
    public void before(RpcRequest request) {
        RpcContext.addAttribute("hook key","this is pass by hook");
    }

    @Override
    public void after(RpcRequest request) {
        System.out.println("I have finished Rpc calling.");
    }
}

hookが実現する機能は簡単で,クライアントがリモートコールを行う前後でbeforeとafterメソッドを実行する.
public final class RpcConsumer implements InvocationHandler{

    //。。。
    
    //  
    private ConsumerHook hook;

    public RpcConsumer hook(ConsumerHook hook){
        this.hook = hook;
        return this;
    }

static{
        userService = (UserService)consumer.targetHostPort(host, port)
                            .interfaceClass(UserService.class)
                            .timeout(TIMEOUT)
                            .hook(new UserConsumerHook())//    
                            .newProxy();
    }
//。。。
}

//UserServiceImpl      
public Map getMap() {
        Map newMap = new HashMap();
        newMap.put("name","getMap");
        newMap.putAll(RpcContext.getAttributes());
        return newMap;
}

フック関数を追加する実行ロジックをdoInvokeメソッドで開始するだけでよい.次のようになります.
    private Future doInvoke(final RpcRequest request) throws IOException, ClassNotFoundException{
        //    
        hook.before(request);
        //。。。
}

同時にasyncCallとinvokeメソッドの最後にafterの実行ロジックを追加する.具体的な実装はソースコードを見ることができます.
githubソースを添付