Java tcpサーバの基礎フレームワーク
その中で主に検討したいのは、接続を傍受するAcceptorReactorクラス、1つの傍受データが到着したSessionReactorクラス、1つのサーバ断主制御クラスServerManager、1つの制御データの送信、受信、ユーザー情報の格納を制御するSessionクラスである.
サーバが実行されている間は、mainプライマリスレッドと傍受接続スレッド、クライアントデータの到着を傍受するスレッドの3つのスレッドしか走っていません.クライアントデータが到着すると、スレッド処理が別途開かれ、処理が終了すると破棄されます.
使用する場合は、自己書き込みクラス継承ServerManagerが自己serverの機能を実現する必要があり、書き込みクラス継承Sessionが自己データ処理を実現する必要があり、server.propertiesでは,サーバポート番号,クライアントデータ符号化,ロードが必要なSessionクラス(すなわち,Sessionから継承された自分で書いたクラス),受信データを送信する際のデータセパレータを構成する.
その中のReactorモードはネット上を参考にしていて、具体的なサイトはもう忘れています.
AcceptorReactorクラス:
ここで,runメソッドには接続傍受が登録され,querySelectorでは接続要求がキャプチャされ,Acceptorのrunでは傍受の処理が実現される.
SessionReactorクラス:
ここで、registerSessionには受信データを登録する必要があるSessionオブジェクトが用意されており、querySelectorにおけるsynchronized(preparedSessions){バーが用意したSessionオブジェクトは受信データ傍受として登録されており、その後受信データを処理する要求が処理され、Readerは受信データを処理するクラスである.
すべてのコードがアップロードされました.パッケージzys.net.tcpはコアクラス、パッケージサーバはテストクラス、機能はクライアント接続後1秒おきにクライアントに文字列変数COST_を送信するPARAMS_STRの値は、クライアントデータの事実更新を実現するため、送信形式は10ビットのデータ長を表す数字(例えば0000000013)、5ビットのデータ種別を表す数字(例えば20000、毎秒送信の同期データを表す)、続いて実際のデータである.telnet xxx.xxx.xxx.xxx.xxx.xxx 9999でテストすることができる.
すべてのコアコードおよびテストコードがアップロードされ、utf-8が符号化されています.
ユーザのインタラクションが頻繁な場合、サーバのオープンスレッドが多すぎてサーバの効率が低下するのではないかと心配されていますが、100クライアントの場合、データのインタラクションは正常で、遅延は少しも見えません.失敗したのはzysです.ThreadRunnable類は、いくつかの場所で引用されています.この類はズボンを脱いでおならをします.
批判を歓迎して、提案して、[email protected]に送ってください、ありがとうございます!
サーバが実行されている間は、mainプライマリスレッドと傍受接続スレッド、クライアントデータの到着を傍受するスレッドの3つのスレッドしか走っていません.クライアントデータが到着すると、スレッド処理が別途開かれ、処理が終了すると破棄されます.
使用する場合は、自己書き込みクラス継承ServerManagerが自己serverの機能を実現する必要があり、書き込みクラス継承Sessionが自己データ処理を実現する必要があり、server.propertiesでは,サーバポート番号,クライアントデータ符号化,ロードが必要なSessionクラス(すなわち,Sessionから継承された自分で書いたクラス),受信データを送信する際のデータセパレータを構成する.
その中のReactorモードはネット上を参考にしていて、具体的なサイトはもう忘れています.
AcceptorReactorクラス:
/**
*
*/
package zys.net.tcp;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import zys.ThreadRunnable;
/**
* @author Administrator
*/
final class AcceptorReactor extends ThreadRunnable {
private ServerManager serverManager;
private Class<Session> sessionClass;
private ServerSocketChannel serverSocketChannel;
private Selector selector;
public AcceptorReactor() {
super();
}
/**
*
*/
public void run() {
try {
serverSocketChannel = ServerSocketChannel.open();
try {
ServerSocket sSocket = serverSocketChannel.socket();
try {
sSocket.bind(new InetSocketAddress(serverManager.getPort()));
serverSocketChannel.configureBlocking(false);
selector = Selector.open();
try {
SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
sk.attach(new Acceptor());
serverManager.logInfo("Listener Reactor started.");
querySelector();
} finally {
selector.close();
}
} finally {
sSocket.close();
}
} finally {
serverSocketChannel.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* @param aSelector
* @throws IOException
*/
private void querySelector() throws IOException {
ExecutorService pool = Executors.newFixedThreadPool(50);
try {
while (!Thread.interrupted()) {
int n = selector.select();
if (n != 0) {
Iterator it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = (SelectionKey) (it.next());
pool.execute((Runnable) key.attachment());
it.remove();
}
}
}
} finally {
pool.shutdown();
try {
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow();
}
} catch (InterruptedException e) {
pool.shutdownNow();
}
}
}
class Acceptor implements Runnable { // inner
public void run() {
try {
SocketChannel c = serverSocketChannel.accept();
if (c != null) {
c.socket().setSoLinger(true, 0);
serverManager.logInfo("One Session conncted.");
Session session = sessionClass.newInstance();
session.setManager(serverManager);
session.setSocketChannel(c);
session.setConnTime(new Date());
session.setConnIP(c.socket().getInetAddress().getHostAddress());
session.setConnStatus(Session.CONN_STATUS_CONNECT);
serverManager.registerSession(session);
}
} catch (Exception ex) {
// log
}
}
}
public ServerManager getServerManager() {
return serverManager;
}
public void setServerManager(ServerManager aServerManager) {
serverManager = aServerManager;
}
public Class<Session> getSessionClass() {
return sessionClass;
}
public void setSessionClass(Class<Session> aSessionClass) {
sessionClass = aSessionClass;
}
}
ここで,runメソッドには接続傍受が登録され,querySelectorでは接続要求がキャプチャされ,Acceptorのrunでは傍受の処理が実現される.
SessionReactorクラス:
/**
*
*/
package zys.net.tcp;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import zys.ThreadRunnable;
/**
* @author Administrator
*/
final class SessionReactor extends ThreadRunnable {
private ServerManager serverManager;
private Selector selector;
private ArrayList<Session> preparedSessions;
/**
*
*/
public SessionReactor() {
super();
preparedSessions = new ArrayList<Session>();
}
/**
* @param aSession
* @throws IOException
*/
public void registerSession(Session aSession) throws IOException {
synchronized (preparedSessions) {
preparedSessions.add(aSession);
}
selector.wakeup();
}
public void clearPreparedSessions() {
synchronized (preparedSessions) {
preparedSessions.clear();
}
selector.wakeup();
}
public void stop() throws Exception {
super.stop();
selector.wakeup();
}
/*
* (non-Javadoc)
*
* @see java.lang.Runnable#run()
*/
public void run() {
try {
selector = Selector.open();
try {
serverManager.logInfo("Session Reactor started.");
querySelector();
} finally {
selector.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void querySelector() throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(200);
try {
int preparedCount = 0;
while (!Thread.interrupted()) {
synchronized (preparedSessions) {
preparedCount = preparedSessions.size();
if (preparedCount > 0) {
Iterator<Session> sessionIt = preparedSessions.iterator();
while (sessionIt.hasNext()) {
Session session = sessionIt.next();
SocketChannel channel = session.getSocketChannel();
channel.configureBlocking(false);
SelectionKey skReader = channel.register(selector, SelectionKey.OP_READ);
skReader.attach(new Reader(session));
serverManager.logInfo("One Session registered.");
}
preparedSessions.clear();
}
}
int n = selector.select();
if (n != 0) {
Iterator it = selector.selectedKeys().iterator();
while (it.hasNext()) {
// dispatch((SelectionKey) (it.next()));
SelectionKey key = (SelectionKey) (it.next());
pool.execute((Runnable) key.attachment());
it.remove();
}
}
}
Iterator<SelectionKey> it = selector.keys().iterator();
while (it.hasNext()) {
SelectionKey key = (SelectionKey) (it.next());
Reader r = (Reader) (key.attachment());
key.cancel();
r.getSession().distroy();
}
} finally {
pool.shutdown();
try {
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow();
}
} catch (InterruptedException e) {
pool.shutdownNow();
}
}
}
class Reader implements Runnable { // inner
private Session session;
public Reader(Session aSession) {
session = aSession;
}
public void run() {
if(!session.isActive()){
return;
}
SocketChannel channel = session.getSocketChannel();
int count;
ByteBuffer buffer = null;
try {
synchronized(channel){
while(true){
buffer = ByteBuffer.allocate(10);
count = channel.read(buffer);
if (count > 0) {
buffer = ByteBuffer.allocate(Integer.valueOf(new String(buffer.array(), 0, count, serverManager.getCharSet())));
count = channel.read(buffer);
String sMsg = new String(buffer.array(), 0, count, serverManager.getCharSet());
serverManager.logDebugReceive(sMsg);
session.onReceive(sMsg);
}else{
if(count == -1){
if (session.isActive()) {
session.distroy();
}
}
break;
}
}
}
} catch (Exception e) {
e.printStackTrace();
serverManager.logError(this.getClass(), "Reader.run" , e.getMessage());
if (session.isActive()) {
session.distroy();
}
}
}
public Session getSession() {
return session;
}
}
public void setServerManager(ServerManager aServerManager) {
serverManager = aServerManager;
}
}
ここで、registerSessionには受信データを登録する必要があるSessionオブジェクトが用意されており、querySelectorにおけるsynchronized(preparedSessions){バーが用意したSessionオブジェクトは受信データ傍受として登録されており、その後受信データを処理する要求が処理され、Readerは受信データを処理するクラスである.
すべてのコードがアップロードされました.パッケージzys.net.tcpはコアクラス、パッケージサーバはテストクラス、機能はクライアント接続後1秒おきにクライアントに文字列変数COST_を送信するPARAMS_STRの値は、クライアントデータの事実更新を実現するため、送信形式は10ビットのデータ長を表す数字(例えば0000000013)、5ビットのデータ種別を表す数字(例えば20000、毎秒送信の同期データを表す)、続いて実際のデータである.telnet xxx.xxx.xxx.xxx.xxx.xxx 9999でテストすることができる.
すべてのコアコードおよびテストコードがアップロードされ、utf-8が符号化されています.
ユーザのインタラクションが頻繁な場合、サーバのオープンスレッドが多すぎてサーバの効率が低下するのではないかと心配されていますが、100クライアントの場合、データのインタラクションは正常で、遅延は少しも見えません.失敗したのはzysです.ThreadRunnable類は、いくつかの場所で引用されています.この類はズボンを脱いでおならをします.
批判を歓迎して、提案して、[email protected]に送ってください、ありがとうございます!