【オリジナル】同期回転非同期+RPCのPOS業界応用-キーテクノロジー実現
詳細
簡単に業務モデルを振り返る:レジ<=>POSエージェントサーバー<=>POS機、3者の間で通信を行い、POSエージェントはメッセージ変換と同期として非同期ロールを回転する.以下にいくつかの重要な技術の実現を紹介する:1、メッセージここのメッセージは、POSエージェントサーバとPOS通信の間で約束されたメッセージを指す.POS取引タイプ、支払い、カード決済、印刷などによって、各取引メッセージにはどのようなフィールド情報と長さが含まれているかを約束し、その中の1つの比較特別なフィールドはUUIDであり、このフィールドは各メッセージのキーフィールドであり、一意性があり、各メッセージは異なり、主に同期回転非同期を実現するために用いられる.POSは、エージェントサーバにデータを返して元のコマンドを送信したチャネルを取り戻し、最終的に変換したデータをレジに送信する.元のチャネルを見つけるのは,同期転送非同期の過程でチャネルが一時的に保存されているからである.2、同期回転非同期キー
3、POSプロキシサーバーはRPC呼び出しインタフェースのキーコードを暴露する
簡単に業務モデルを振り返る:レジ<=>POSエージェントサーバー<=>POS機、3者の間で通信を行い、POSエージェントはメッセージ変換と同期として非同期ロールを回転する.以下にいくつかの重要な技術の実現を紹介する:1、メッセージここのメッセージは、POSエージェントサーバとPOS通信の間で約束されたメッセージを指す.POS取引タイプ、支払い、カード決済、印刷などによって、各取引メッセージにはどのようなフィールド情報と長さが含まれているかを約束し、その中の1つの比較特別なフィールドはUUIDであり、このフィールドは各メッセージのキーフィールドであり、一意性があり、各メッセージは異なり、主に同期回転非同期を実現するために用いられる.POSは、エージェントサーバにデータを返して元のコマンドを送信したチャネルを取り戻し、最終的に変換したデータをレジに送信する.元のチャネルを見つけるのは,同期転送非同期の過程でチャネルが一時的に保存されているからである.2、同期回転非同期キー
public class PosResponseFuture {
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();
private String uuid;//
//psoresponse
private final static Map futures = new ConcurrentHashMap();
private final static Object synLock = new Object();
public I write2pos(boolean broadcastFlag,MsgRequest msg) throws PosConnException,TimeOutException,TryLaterException {
synchronized(synLock)
{
long st = System.currentTimeMillis();
lock.lock();
try {
this.uuid = msg.getId();
futures.put(this.uuid, this);//
// pos
log.debug(" POS :{}",msg);
PosIntContext.write2pos(msg);
int timeout = PosIntContext.getApiTimeout();
if (msg.getTimeout()!=-1)
{
timeout = msg.getTimeout();
log.debug(" :{}",timeout);
}
//
// , , POS
// , POS ,
done.await(timeout,TimeUnit.SECONDS);
if (!isDone())
{
throw new TimeOutException(" ("+timeout+" )");
}
} catch (InterruptedException e) {
log.error("write2pos InterruptedException: "+e.getMessage());
throw new PosConnException(e);
} catch (TimeOutException e) {
throw e;
} catch (PosConnException e) {
throw e;
} catch (TryLaterException e) {
throw e;
}
finally {
this.release();
lock.unlock();
}
long en = System.currentTimeMillis();
log.debug("{} :{}",msg.toString(),(en-st));
//POS ,
if (response instanceof MsgResponse)
{
return (I)response;
}
return null;
}
}
/**
* pos
* @Title: received
* @Description: TODO
* @param @param response
* @return void
* @throws
*/
public static void received(MsgResponse response) {
//
PosResponseFuture> future = futures.remove(response.getId());
if (future != null) {
future.doReceived(response);
}
}
/**
*
* @Title: isDone
* @Description: TODO
* @param @return
* @return boolean
* @throws
*/
private boolean isDone() {
return this.response != null;//null
}
/**
*
* @Title: doReceived
* @Description: TODO
* @param @param response
* @return void
* @throws
*/
private void doReceived(MsgResponse response) {
lock.lock();// ,
try {
this.response = response;
done.signal();//notify,
} finally {
lock.unlock();
}
}
/**
*
* @Title: release
* @Description: TODO
* @param
* @return void
* @throws
*/
private void release()
{
PosResponseFuture tmp = futures.remove(this.uuid);
if (tmp!=null)
{
log.debug(" :{}",new Object[]{this.uuid,tmp.getProcessMsg()});
}
else
{
log.debug(" :NULL!");
}
}
public static void main(String args[])
{
}
}
3、POSプロキシサーバーはRPC呼び出しインタフェースのキーコードを暴露する
public class Client {
// rpc
@SuppressWarnings("unchecked")
public T getProxy(Class interfaceClass,final String host,final int port) {
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
new Class>[] { interfaceClass }, new InvocationHandler() {
// AOP
public Object invoke(Object proxy, Method method,Object[] arguments) throws Throwable {
Socket socket = null;
ObjectOutputStream output = null;
ObjectInputStream input = null;
try
{
// 、 , RPC
//
socket = new Socket(host, port);
output = new ObjectOutputStream(socket.getOutputStream());
output.writeUTF(method.getName());
output.writeObject(method.getParameterTypes());
output.writeObject(arguments);
input = new ObjectInputStream(socket.getInputStream());
return input.readObject();
}
catch(Exception e)
{
throw e;
}
finally
{
if (socket!=null)
{
socket.close();
}
if (output!=null)
{
output.close();
}
if (input!=null)
{
input.close();
}
}
}
});
}
public static void main(String args[])
{
HelloService helloService = new Client().getProxy(HelloService.class,"localhost",8080);
long st = System.currentTimeMillis();
for (int i=0; i<1; i++)
{
System.out.println(i+"> "+helloService.sayHello(" "));
}
long en = System.currentTimeMillis();
System.out.println(" :"+(en-st));
}
}
public class Server {
private int port = 8888;
public void rpcServer()
throws Exception
{
ServerSocket server = null;
try
{
server = new ServerSocket(port);
for(;;)
{
final Socket socket = server.accept();
System.out.println(socket.getRemoteSocketAddress());
new Thread(new Runnable() {
@Override
public void run() {
ObjectOutputStream output = null;
ObjectInputStream input = null;
try
{
input = new ObjectInputStream(socket.getInputStream());// rpc client
String methodName = input.readUTF();//
Class>[] parameterTypes = (Class>[])input.readObject();
Object[] arguments = (Object[])input.readObject();//
output = new ObjectOutputStream(socket.getOutputStream());
Method method = new HelloServiceImp().getClass().getMethod(methodName, parameterTypes);
Object result = method.invoke(new HelloServiceImp(), arguments);//
output.writeObject(result);//
}
catch(Exception e)
{
e.printStackTrace();
}
finally
{
try
{
if (output!=null)
{
output.close();
}
if (input!=null)
{
input.close();
}
}
catch(Exception e)
{
}
}
}
}).start();
}
}
catch(Exception e)
{
throw e;
}
finally
{
if (server!=null)
{
server.close();
}
}
}
public static void main(String args[]) throws Exception
{
new Server().rpcServer();
}
}
public interface HelloService {
public String sayHello(String input);
}
public class HelloServiceImp implements HelloService {
@Override
public String sayHello(String input) {
return input + " wellcome.";
}
}