簡単RPC機能を実現
7100 ワード
最近RMI RPCに興味があるので、自分で簡単な実現をしました.時間があれば、その後も改善し続けます.
RPCは主にサービス側とクライアントに分けられる.
サービス側の実現は以下の通りである.
クライアントは次のとおりです.
インタフェースとインプリメンテーションを書きます.
次はテストを開始できます.
RPCサービスを書き込み、起動
起動すると出力が表示されます
サービスの開始に成功しました...
RPCクライアントを書き込み、起動
クライアント出力が表示されます.
hello is_zhoufeng!
これにより、リモートコールが実現されます.
RPCは主にサービス側とクライアントに分けられる.
サービス側の実現は以下の通りである.
package com.zf.rpc.server;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
public class RPCServer {
private static final ExecutorService taskPool = Executors.newFixedThreadPool(50) ;
/**
*
* key: value:
*/
private static final ConcurrentHashMap serviceTargets =
new ConcurrentHashMap() ;
private static AtomicBoolean run = new AtomicBoolean(false) ;
/**
*
* @param service
*/
public void registService(Object service){
Class>[] interfaces = service.getClass().getInterfaces() ;
if(interfaces == null){
throw new IllegalArgumentException(" ");
}
Class> interfacez = interfaces[0] ;
String interfaceName = interfacez.getName() ;
serviceTargets.put(interfaceName, service) ;
}
/**
* Server
* @param port
*/
public void startServer(final int port){
Runnable lifeThread = new Runnable() {
@Override
public void run() {
ServerSocket lifeSocket = null ;
Socket client = null ;
ServiceTask serviceTask = null ;
try {
lifeSocket = new ServerSocket(port) ;
run.set(true) ;
while(run.get()){
client = lifeSocket.accept() ;
serviceTask = new ServiceTask(client);
serviceTask.accept() ;
}
} catch (IOException e) {
e.printStackTrace();
}
}
};
taskPool.execute(lifeThread) ;
System.out.println(" ...");
}
public void stopServer(){
run.set(false) ;
taskPool.shutdown() ;
}
public static final class ServiceTask implements Runnable{
private Socket client ;
public ServiceTask(Socket client){
this.client = client ;
}
public void accept(){
taskPool.execute(this) ;
}
@Override
public void run() {
InputStream is = null ;
ObjectInput oi = null ;
OutputStream os = null ;
ObjectOutput oo = null ;
try {
is = client.getInputStream() ;
os = client.getOutputStream() ;
oi = new ObjectInputStream(is);
String serviceName = oi.readUTF() ;
String methodName = oi.readUTF();
Class>[] paramTypes = (Class[]) oi.readObject() ;
Object[] arguments = (Object[]) oi.readObject() ;
System.out.println("serviceName:" + serviceName + " methodName:" + methodName);
Object targetService = serviceTargets.get(serviceName) ;
if(targetService == null){
throw new ClassNotFoundException(serviceName + " !") ;
}
Method targetMethod = targetService.getClass().getMethod(methodName, paramTypes) ;
Object result = targetMethod.invoke(targetService, arguments) ;
oo = new ObjectOutputStream(os) ;
oo.writeObject(result) ;
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SecurityException e) {
e.printStackTrace();
} catch (NoSuchMethodException e) {
e.printStackTrace();
} catch (IllegalArgumentException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}finally{
try {
if(oo != null){
oo.close() ;
}
if(os != null){
os.close() ;
}
if(is != null){
is.close() ;
}
if(oi != null){
oi.close() ;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
クライアントは次のとおりです.
package com.zf.rpc.client;
import java.io.InputStream;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;
public class RPCClient {
/**
*
* @param
* @param host RPC IP
* @param port RPC
* @param serviceInterface
* @return
*/
@SuppressWarnings("unchecked")
public static T findService(final String host , final int port ,final Class serviceInterface){
return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class[]{serviceInterface}, new InvocationHandler() {
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args)
throws Throwable {
Socket socket = null ;
InputStream is = null ;
OutputStream os = null ;
ObjectInput oi = null ;
ObjectOutput oo = null ;
try {
socket = new Socket(host, port) ;
os = socket.getOutputStream() ;
oo = new ObjectOutputStream(os);
oo.writeUTF(serviceInterface.getName()) ;
oo.writeUTF(method.getName()) ;
oo.writeObject(method.getParameterTypes()) ;
oo.writeObject(args);
is = socket.getInputStream() ;
oi = new ObjectInputStream(is) ;
return oi.readObject() ;
} catch (Exception e) {
System.out.println(" ...");
return null ;
}finally{
if(is != null){
is.close() ;
}
if(os != null){
is.close() ;
}
if(oi != null){
is.close() ;
}
if(oo != null){
is.close() ;
}
if(socket != null){
is.close() ;
}
}
}
});
}
}
インタフェースとインプリメンテーションを書きます.
package com.zf.rpc.test;
public interface IHelloWorld {
String sayHello(String name) ;
}
package com.zf.rpc.test;
public class HelloWorld implements IHelloWorld{
@Override
public String sayHello(String name) {
return "hello " + name + "!";
}
}
次はテストを開始できます.
RPCサービスを書き込み、起動
package com.zf.rpc.test;
import com.zf.rpc.server.RPCServer;
public class RPCServerTest {
public static void main(String[] args) {
RPCServer server = new RPCServer() ;
server.registService(new HelloWorld()) ;
server.startServer(8080) ;
}
}
起動すると出力が表示されます
サービスの開始に成功しました...
RPCクライアントを書き込み、起動
package com.zf.rpc.test;
import com.zf.rpc.client.RPCClient;
public class RPCClientTest {
public static void main(String[] args) {
IHelloWorld helloWorld =
RPCClient.findService("127.0.0.1" , 8080 , IHelloWorld.class) ;
String result = helloWorld.sayHello("is_zhoufeng");
System.out.println(result );
}
}
クライアント出力が表示されます.
hello is_zhoufeng!
これにより、リモートコールが実現されます.