簡単RPC機能を実現

7100 ワード

最近RMI RPCに興味があるので、自分で簡単な実現をしました.時間があれば、その後も改善し続けます.
RPCは主にサービス側とクライアントに分けられる. 
サービス側の実現は以下の通りである.
package com.zf.rpc.server;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;


public class RPCServer {
	
	private static final ExecutorService taskPool = Executors.newFixedThreadPool(50) ;

	/**
	 *        
	 * key:       value:    
	 */
	private static final ConcurrentHashMap serviceTargets = 
		new ConcurrentHashMap() ;

	private static AtomicBoolean run = new AtomicBoolean(false) ;
	
	/**
	 *     
	 * @param service
	 */
	public void registService(Object service){
		Class>[] interfaces = service.getClass().getInterfaces() ;
		if(interfaces == null){
			throw new IllegalArgumentException("          "); 
		}
		Class> interfacez = interfaces[0] ;
		String interfaceName = interfacez.getName() ;
		serviceTargets.put(interfaceName, service) ;
	}

	/**
	 *   Server
	 * @param port
	 */
	public void startServer(final int port){
		Runnable lifeThread = new Runnable() {
			@Override
			public void run() {
				ServerSocket lifeSocket = null ;
				Socket client = null ;
				ServiceTask serviceTask = null ;
				try {
					lifeSocket = new ServerSocket(port) ;
					run.set(true) ;
					while(run.get()){
						client = lifeSocket.accept() ;
						serviceTask = new ServiceTask(client); 
						serviceTask.accept() ;
					}
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		};
		taskPool.execute(lifeThread) ;  
		System.out.println("      ...");
	}
	
	public void stopServer(){
		run.set(false) ;
		taskPool.shutdown() ;
	}

	public static final class ServiceTask implements Runnable{

		private Socket client  ;
		
		public ServiceTask(Socket client){
			this.client = client ;
		}
		
		public void accept(){
			taskPool.execute(this) ;
		}

		@Override
		public void run() {
			InputStream is = null ;
			ObjectInput oi = null ;
			OutputStream os = null ;
			ObjectOutput oo = null ;
			try {
				is = client.getInputStream() ;
				os = client.getOutputStream() ;
				oi = new ObjectInputStream(is);
				String serviceName = oi.readUTF() ;
				String methodName = oi.readUTF();
				Class>[] paramTypes =  (Class[]) oi.readObject() ;  
				Object[] arguments = (Object[]) oi.readObject() ;
				System.out.println("serviceName:" + serviceName + " methodName:" + methodName);
				Object targetService = serviceTargets.get(serviceName) ;
				if(targetService == null){
					throw new ClassNotFoundException(serviceName + "     !") ;
				}
				
				Method targetMethod = targetService.getClass().getMethod(methodName, paramTypes) ;
				Object result = targetMethod.invoke(targetService, arguments) ;
				
				oo = new ObjectOutputStream(os) ;
				oo.writeObject(result) ;
			} catch (IOException e) {
				e.printStackTrace();
			} catch (ClassNotFoundException e) {
				e.printStackTrace();
			} catch (SecurityException e) {
				e.printStackTrace();
			} catch (NoSuchMethodException e) {
				e.printStackTrace();
			} catch (IllegalArgumentException e) {
				e.printStackTrace();
			} catch (IllegalAccessException e) {
				e.printStackTrace();
			} catch (InvocationTargetException e) {
				e.printStackTrace();
			}finally{
				try {
					if(oo != null){
						oo.close() ;
					}
					if(os != null){
						os.close() ;
					}
					if(is != null){
						is.close() ;
					}
					if(oi != null){
						oi.close() ;
					}
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}

	}


}

クライアントは次のとおりです.
package com.zf.rpc.client;

import java.io.InputStream;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;



public class RPCClient {

	/**
	 *                
	 * @param 
	 * @param host  RPC   IP
	 * @param port  RPC    
	 * @param serviceInterface      
	 * @return          
	 */
	@SuppressWarnings("unchecked")
	public static  T findService(final String host , final int port ,final Class serviceInterface){
		return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class[]{serviceInterface}, new InvocationHandler() {
			@Override
			public Object invoke(final Object proxy, final Method method, final Object[] args)
			throws Throwable {
				Socket socket = null ;
				InputStream is = null ;
				OutputStream os = null ;
				ObjectInput oi = null ;
				ObjectOutput oo = null ;
				try {
					socket = new Socket(host, port) ;
					os = socket.getOutputStream() ;
					oo = new ObjectOutputStream(os);
					oo.writeUTF(serviceInterface.getName()) ;
					oo.writeUTF(method.getName()) ;
					oo.writeObject(method.getParameterTypes()) ;
					oo.writeObject(args);

					is = socket.getInputStream() ;
					oi = new ObjectInputStream(is) ;
					return oi.readObject() ;
				} catch (Exception e) {
					System.out.println("      ...");
					return null ;
				}finally{
					if(is != null){
						is.close() ;
					}
					if(os != null){
						is.close() ;
					}
					if(oi != null){
						is.close() ;
					}
					if(oo != null){
						is.close() ;
					}
					if(socket != null){
						is.close() ;
					}
				}
			}
		}); 
	}

}

インタフェースとインプリメンテーションを書きます.
package com.zf.rpc.test;

public interface IHelloWorld {

	String sayHello(String name) ;
	
}
package com.zf.rpc.test;

public class HelloWorld implements IHelloWorld{

	@Override
	public String sayHello(String name) {
		return "hello " + name + "!";
	}

}

次はテストを開始できます. 
RPCサービスを書き込み、起動
package com.zf.rpc.test;

import com.zf.rpc.server.RPCServer;

public class RPCServerTest {
	
	public static void main(String[] args) {
		
		RPCServer server = new RPCServer() ;
		server.registService(new HelloWorld()) ;
		server.startServer(8080) ;
		
	}

}

起動すると出力が表示されます
サービスの開始に成功しました...
RPCクライアントを書き込み、起動
package com.zf.rpc.test;

import com.zf.rpc.client.RPCClient;

public class RPCClientTest {

	public static void main(String[] args) {

		IHelloWorld helloWorld = 
			RPCClient.findService("127.0.0.1" , 8080 , IHelloWorld.class) ;
		String	result = helloWorld.sayHello("is_zhoufeng");
		System.out.println(result );

	}

}

クライアント出力が表示されます.
hello is_zhoufeng!
これにより、リモートコールが実現されます.