Java tcpサーバの基礎フレームワーク


その中で主に検討したいのは、接続を傍受するAcceptorReactorクラス、1つの傍受データが到着したSessionReactorクラス、1つのサーバ断主制御クラスServerManager、1つの制御データの送信、受信、ユーザー情報の格納を制御するSessionクラスである.
 
サーバが実行されている間は、mainプライマリスレッドと傍受接続スレッド、クライアントデータの到着を傍受するスレッドの3つのスレッドしか走っていません.クライアントデータが到着すると、スレッド処理が別途開かれ、処理が終了すると破棄されます.
 
使用する場合は、自己書き込みクラス継承ServerManagerが自己serverの機能を実現する必要があり、書き込みクラス継承Sessionが自己データ処理を実現する必要があり、server.propertiesでは,サーバポート番号,クライアントデータ符号化,ロードが必要なSessionクラス(すなわち,Sessionから継承された自分で書いたクラス),受信データを送信する際のデータセパレータを構成する.
 
その中のReactorモードはネット上を参考にしていて、具体的なサイトはもう忘れています.
 
AcceptorReactorクラス:
/**
 * 
 */
package zys.net.tcp;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import zys.ThreadRunnable;

/**
 * @author Administrator
 */
final class AcceptorReactor extends ThreadRunnable {

  private ServerManager serverManager;

  private Class<Session> sessionClass;

  private ServerSocketChannel serverSocketChannel;

  private Selector selector;

  public AcceptorReactor() {
    super();
  }

  /**
   * 
   */
  public void run() {
    try {
      serverSocketChannel = ServerSocketChannel.open();
      try {
        ServerSocket sSocket = serverSocketChannel.socket();
        try {
          sSocket.bind(new InetSocketAddress(serverManager.getPort()));
          serverSocketChannel.configureBlocking(false);
          selector = Selector.open();
          try {
            SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            sk.attach(new Acceptor());
            serverManager.logInfo("Listener Reactor started.");
            querySelector();
          } finally {
            selector.close();
          }
        } finally {
          sSocket.close();
        }
      } finally {
        serverSocketChannel.close();
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }

  /**
   * @param aSelector
   * @throws IOException
   */
  private void querySelector() throws IOException {
    ExecutorService pool = Executors.newFixedThreadPool(50);
    try {
      while (!Thread.interrupted()) {
        int n = selector.select();
        if (n != 0) {
          Iterator it = selector.selectedKeys().iterator();
          while (it.hasNext()) {
            SelectionKey key = (SelectionKey) (it.next());
            pool.execute((Runnable) key.attachment());
            it.remove();
          }
        }
      }
    } finally {
      pool.shutdown();
      try {
        if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
          pool.shutdownNow();
        }
      } catch (InterruptedException e) {
        pool.shutdownNow();
      }
    }
  }

  class Acceptor implements Runnable { // inner
    public void run() {
      try {
        SocketChannel c = serverSocketChannel.accept();
        if (c != null) {
          c.socket().setSoLinger(true, 0);
          serverManager.logInfo("One Session conncted.");
          Session session = sessionClass.newInstance();
          session.setManager(serverManager);
          session.setSocketChannel(c);
          session.setConnTime(new Date());
          session.setConnIP(c.socket().getInetAddress().getHostAddress());
          session.setConnStatus(Session.CONN_STATUS_CONNECT);
          serverManager.registerSession(session);
        }
      } catch (Exception ex) {
        // log
      }
    }
  }

  public ServerManager getServerManager() {
    return serverManager;
  }

  public void setServerManager(ServerManager aServerManager) {
    serverManager = aServerManager;
  }

  public Class<Session> getSessionClass() {
    return sessionClass;
  }

  public void setSessionClass(Class<Session> aSessionClass) {
    sessionClass = aSessionClass;
  }
}

 
 
ここで,runメソッドには接続傍受が登録され,querySelectorでは接続要求がキャプチャされ,Acceptorのrunでは傍受の処理が実現される.
 
SessionReactorクラス:
/**
 * 
 */
package zys.net.tcp;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import zys.ThreadRunnable;

/**
 * @author Administrator
 */
final class SessionReactor extends ThreadRunnable {
  private ServerManager serverManager;

  private Selector selector;

  private ArrayList<Session> preparedSessions;

  /**
   * 
   */
  public SessionReactor() {
    super();
    preparedSessions = new ArrayList<Session>();
  }

  /**
   * @param aSession
   * @throws IOException
   */
  public void registerSession(Session aSession) throws IOException {
    synchronized (preparedSessions) {
      preparedSessions.add(aSession);
    }
    selector.wakeup();
  }

  public void clearPreparedSessions() {
    synchronized (preparedSessions) {
      preparedSessions.clear();
    }
    selector.wakeup();
  }

  public void stop() throws Exception {
    super.stop();
    selector.wakeup();
  }

  /*
   * (non-Javadoc)
   * 
   * @see java.lang.Runnable#run()
   */
  public void run() {
    try {
      selector = Selector.open();
      try {
        serverManager.logInfo("Session Reactor started.");
        querySelector();
      } finally {
        selector.close();
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  private void querySelector() throws Exception {
    ExecutorService pool = Executors.newFixedThreadPool(200);
    try {
      int preparedCount = 0;
      while (!Thread.interrupted()) {
        synchronized (preparedSessions) {
          preparedCount = preparedSessions.size();
          if (preparedCount > 0) {
            Iterator<Session> sessionIt = preparedSessions.iterator();
            while (sessionIt.hasNext()) {
              Session session = sessionIt.next();
              SocketChannel channel = session.getSocketChannel();
              channel.configureBlocking(false);
              SelectionKey skReader = channel.register(selector, SelectionKey.OP_READ);
              skReader.attach(new Reader(session));
              serverManager.logInfo("One Session registered.");
            }
            preparedSessions.clear();
          }
        }

        int n = selector.select();
        if (n != 0) {
          Iterator it = selector.selectedKeys().iterator();
          while (it.hasNext()) {
            // dispatch((SelectionKey) (it.next()));
            SelectionKey key = (SelectionKey) (it.next());
            pool.execute((Runnable) key.attachment());
            it.remove();
          }
        }
      }

      Iterator<SelectionKey> it = selector.keys().iterator();
      while (it.hasNext()) {
        SelectionKey key = (SelectionKey) (it.next());
        Reader r = (Reader) (key.attachment());
        key.cancel();
        r.getSession().distroy();
      }
    } finally {
      pool.shutdown();
      try {
        if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
          pool.shutdownNow();
        }
      } catch (InterruptedException e) {
        pool.shutdownNow();
      }
    }
  }

  class Reader implements Runnable { // inner
    private Session session;

    public Reader(Session aSession) {
      session = aSession;
    }

    public void run() {
      if(!session.isActive()){
        return;
      }
      SocketChannel channel = session.getSocketChannel();
      int count;
      ByteBuffer buffer = null;
      try {
        synchronized(channel){
          while(true){
            buffer = ByteBuffer.allocate(10);
            count = channel.read(buffer);
            if (count > 0) {
              buffer = ByteBuffer.allocate(Integer.valueOf(new String(buffer.array(), 0, count, serverManager.getCharSet())));
              count = channel.read(buffer);
              String sMsg = new String(buffer.array(), 0, count, serverManager.getCharSet());
              serverManager.logDebugReceive(sMsg);
              session.onReceive(sMsg);
            }else{
              if(count == -1){
                if (session.isActive()) {
                  session.distroy();
                }
              }
              break;
            }
          }
        }
      } catch (Exception e) {
        e.printStackTrace();
        serverManager.logError(this.getClass(), "Reader.run" , e.getMessage());
        if (session.isActive()) {
          session.distroy();
        }
      }
    }

    public Session getSession() {
      return session;
    }
  }

  public void setServerManager(ServerManager aServerManager) {
    serverManager = aServerManager;
  }
}

 
 
ここで、registerSessionには受信データを登録する必要があるSessionオブジェクトが用意されており、querySelectorにおけるsynchronized(preparedSessions){バーが用意したSessionオブジェクトは受信データ傍受として登録されており、その後受信データを処理する要求が処理され、Readerは受信データを処理するクラスである.
 
すべてのコードがアップロードされました.パッケージzys.net.tcpはコアクラス、パッケージサーバはテストクラス、機能はクライアント接続後1秒おきにクライアントに文字列変数COST_を送信するPARAMS_STRの値は、クライアントデータの事実更新を実現するため、送信形式は10ビットのデータ長を表す数字(例えば0000000013)、5ビットのデータ種別を表す数字(例えば20000、毎秒送信の同期データを表す)、続いて実際のデータである.telnet xxx.xxx.xxx.xxx.xxx.xxx 9999でテストすることができる.
 
すべてのコアコードおよびテストコードがアップロードされ、utf-8が符号化されています.
 
ユーザのインタラクションが頻繁な場合、サーバのオープンスレッドが多すぎてサーバの効率が低下するのではないかと心配されていますが、100クライアントの場合、データのインタラクションは正常で、遅延は少しも見えません.失敗したのはzysです.ThreadRunnable類は、いくつかの場所で引用されています.この類はズボンを脱いでおならをします.
 
批判を歓迎して、提案して、[email protected]に送ってください、ありがとうございます!