JAva nioネットワークフレームワーク実装
6740 ワード
mavenプロジェクトhttps://github.com/solq360/commonチェーン符号化/復号化 リンク層チェーン処理 配管管理socket マルチプロトコル処理が非常に便利 netty NioEventLoopシングルスレッドシリアル処理 =========サポート機能:自動化符号化 rpcインタフェース強化 使用
簡単なチャットの例
server
TestNioServer
client
TestNioClient従来方式接続
サーバ接続
ソース実装プロセス
チェーンコーディングは、複数のICoderによる入出力変換処理 である. CoderParserクラス組立複数のICoder 符号化プロセッサ注意優先度 nio read -> packageCoder -> link coders -> handle handle write -> link coders -> packageCoder -> nio write ICoderParserManager管理呼び出し処理
コアdecodencode半包/帖包処理:AbstractISocketChannel doReadメソッドの要約は、復号から返された状態に従って処理される. 半パケット:完了状態でない場合、復号を継続し、最後のパケットインデックスから の処理を開始する.帖包:パケット復号移動パケットインデックスが完了すると、次の復号処理 に従う.
未完侍加
簡単なチャットの例
server
TestNioServer
// session
ISessionFactory sessionFactory = new SessionFactory();
// /
ICoderParserManager coderParserManager = new CoderParserManager();
// / ,
coderParserManager.register(CoderParser.valueOf("server chat", PackageDefaultCoder.valueOf(), new ChatTestServerHandle()));
// ServerSocket
ServerSocket serverSocket=ServerSocket.valueOf(SocketChannelConfig.valueOf(6969), 10,20,coderParserManager, sessionFactory);
//
serverSocket.start();
//
serverSocket.sync();
//
serverSocket.stop();
client
TestNioClient従来方式接続
// /
ICoderParserManager coderParserManager = new CoderParserManager();
// / ,
coderParserManager.register(CoderParser.valueOf("chat", PackageDefaultCoder.valueOf(), new ChatHandle()));
// ClientSocket
final ClientSocket clientSocket = ClientSocket.valueOf(SocketChannelConfig.valueOf(6969), new SocketPool("client", null), coderParserManager, new EmptyHandle());
//
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
clientSocket.send(" ");
System.out.println("send ");
this.cancel();
}
}, 1000);
//
clientSocket.start();
//
clientSocket.sync();
//
clientSocket.stop();
サーバ接続
// session
ISessionFactory sessionFactory = new SessionFactory();
// /
ICoderParserManager coderParserManager = new CoderParserManager();
// / ,
coderParserManager.register(CoderParser.valueOf("chat", PackageDefaultCoder.valueOf(), new ChatHandle()));
// ClientSocket
final ServerSocket serverSocket = ServerSocket.valueOf(SocketChannelConfig.valueOf(8888), 10, 20, coderParserManager, sessionFactory);
//
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("registerClientSocket");
//
ClientSocket clientSocket = serverSocket.registerClient(SocketChannelConfig.valueOf(6969));
clientSocket.send(" ");
this.cancel();
}
}, 1000);
//
serverSocket.start();
//
serverSocket.sync();
//
serverSocket.stop();
ソース実装プロセス
チェーンコーディング
public interface ICoderParserManager {
/**
*
*
* @return CoderResult
* */
CoderResult decode(ByteBuffer buffer, ICoderCtx ctx);
/**
*
* */
ByteBuffer encode(Object message, ICoderCtx ctx);
void error(ByteBuffer buffer, ICoderCtx ctx);
/** / */
void register(CoderParser coderParser);
}
コアdecodencode
@Override
public CoderResult decode(ByteBuffer buffer, ICoderCtx ctx) {
final SocketChannelCtx socketChannelCtx = (SocketChannelCtx) ctx;
final ClientSocket clientSocket = socketChannelCtx.getClientSocket();
for (CoderParser coderParser : coderParsers.values()) {
final IPackageCoder packageCoder = coderParser.getPackageCoder();
final ICoder<?, ?>[] linkCoders = coderParser.getCoders();
final IHandle handle = coderParser.getHandle();
Object value = null;
synchronized (buffer) {
//
if (socketChannelCtx.getCurrPackageIndex() >= buffer.limit()) {
return CoderResult.valueOf(ResultValue.UNFINISHED);
}
//
if (!packageCoder.verify(buffer, ctx)) {
continue;
}
//
value = packageCoder.decode(buffer, ctx);
if (value == null) {
//
return CoderResult.valueOf(ResultValue.UNFINISHED);
}
}
//
if (linkCoders != null) {
for (ICoder coder : linkCoders) {
value = coder.decode(value, ctx);
if (value == null) {
throw new CoderException(" : " + coder.getClass());
}
}
}
//
value = handle.decode(value, ctx);
clientSocket.readBefore(socketChannelCtx, value);
handle.handle(value, ctx);
clientSocket.readAfter(socketChannelCtx, value);
return CoderResult.valueOf(ResultValue.SUCCEED);
}
return CoderResult.valueOf(ResultValue.NOT_FIND_CODER);
}
@Override
public ByteBuffer encode(Object message, ICoderCtx ctx) {
for (CoderParser coderParser : coderParsers.values()) {
final IPackageCoder packageCoder = coderParser.getPackageCoder();
final ICoder<?, ?>[] linkCoders = coderParser.getCoders();
final IHandle handle = coderParser.getHandle();
//
if (!handle.verify(message, ctx)) {
continue;
}
//
Object value = handle.encode(message, ctx);
//
if (linkCoders != null) {
for (int i = linkCoders.length - 1; i >= 0; i--) {
ICoder coder = linkCoders[i];
value = coder.encode(value, ctx);
if (value == null) {
throw new CoderException(" : " + coder.getClass());
}
}
}
//
value = packageCoder.encode(value, ctx);
if (value != null) {
return (ByteBuffer) value;
}
throw new CoderException(" :" + packageCoder.getClass());
}
throw new CoderException(" / ");
}
boolean run = true;
//
while (run) {
ByteBuffer cpbuffer = socketChannelCtx.coderBegin();
cpbuffer.mark();
CoderResult coderResult = coderParserManager.decode(cpbuffer, socketChannelCtx);
switch (coderResult.getValue()) {
case SUCCEED:
break;
case NOT_FIND_CODER:
final int readySize = socketChannelCtx.getWriteIndex() - socketChannelCtx.getCurrPackageIndex();
final int headLimit = 255;
if (readySize >= headLimit) {
throw new CoderException(" / ");
}
run = false;
break;
case UNFINISHED:
case UNKNOWN:
case ERROR:
default:
run = false;
// TODO throw
break;
}
}
未完侍加