私は内部の一つのNIOを共有します。


最近内部にNIOの共有をしました。JKD 1.6のJDKに基づいています。PPTはあまり好きではないので、DEMOだけを書いて、コードをシェアしてみました。NIOの使い方と、どのように拡張してもコードの注釈に書いてあります。必要な学生に助けてください。
 
 
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author coffee
 * mail:[email protected]
 */
public class NIoTest {

	private static Logger logger = LoggerFactory.getLogger(NIoTest.class);

	private Selector acceptSelector;
	private Selector rwSelector;
	private BlockingQueue<SocketChannel> waitRegeditChannel = new LinkedBlockingQueue<SocketChannel>();

	public static void main(String[] args) {
		NIoTest ns = new NIoTest();
		ns.start();
	}

	public void start() {
		InetSocketAddress localAddress = new InetSocketAddress("127.0.0.1", 8888);
		ServerSocketChannel serverSocketChannel;
		try {
			acceptSelector = Selector.open();
			rwSelector = Selector.open();
			serverSocketChannel = ServerSocketChannel.open();
			//    
			serverSocketChannel.configureBlocking(false);
			ServerSocket socket = serverSocketChannel.socket();
			//      
			socket.setReuseAddress(false);
			socket.setSoTimeout(60000);
			socket.setReceiveBufferSize(1024);
			socket.bind(localAddress);
			serverSocketChannel.register(acceptSelector, SelectionKey.OP_ACCEPT);
			Executor e = Executors.newFixedThreadPool(2);//          
			e.execute(new Accept());
			e.execute(new RWThread());
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	public class Accept implements Runnable {
		@Override
		public void run() {
			while (true) {
				try {
					int count = acceptSelector.select(500);
					// logger.debug("accept");
					if (count > 0) {
						Iterator<SelectionKey> keys = acceptSelector.selectedKeys().iterator();
						while (keys.hasNext()) {
							SelectionKey key = keys.next();
							//      
							keys.remove();
							ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
							//             
							SocketChannel socketChannel = serverSocketChannel.accept();
							socketChannel.configureBlocking(false);
							//         ,                              ,       。
							// logger.debug("      ");
							// socketChannel.register(rwSelector,
							// SelectionKey.OP_READ);
							// logger.debug("      ");
							waitRegeditChannel.put(socketChannel);
							//   ,          ,         ,          
							rwSelector.wakeup();
						}
					}
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}
	}

	private class RWThread implements Runnable {
		/*
		 * (non-Javadoc)
		 * 
		 * @see java.lang.Thread#run()
		 */
		@Override
		public void run() {
			while (true) {
				try {
					while (!waitRegeditChannel.isEmpty()) {
						SocketChannel socketChannel = waitRegeditChannel.poll();
						socketChannel.register(rwSelector, SelectionKey.OP_READ);//       
						logger.debug("       :" + socketChannel.socket());
					}
					int count = rwSelector.select(1000);
					// logger.debug("rw");
					if (count > 0) {
						Iterator<SelectionKey> keys = rwSelector.selectedKeys().iterator();
						while (keys.hasNext()) {
							SelectionKey key = keys.next();
							keys.remove();
							//                   ,            ,          
							processKey(key);
						}
					}
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}

		private void processKey(SelectionKey key) {
			SocketChannel socketChannel = (SocketChannel) key.channel();
			ByteBuffer bb = ByteBuffer.allocate(1024);
			int count;
			try {
				//              ,OS   TCP     ,read           。
				count = socketChannel.read(bb);
				if (count < 0) {
					//         ,     ,      
					socketChannel.close();
					//   key.cancel()    select()        
					key.cancel();
					return;
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
			// buffer          API,buffer      NIO     
			bb.flip();
			int limit = bb.limit();
			byte[] tmpbytes = new byte[limit];
			bb.get(tmpbytes);
			logger.debug("     :" + new String(tmpbytes));
			if (!isCache(key, tmpbytes)) {
				byte[] bytes = (byte[]) key.attachment();
				String requestStr = new String(bytes);
				logger.debug("     :" + requestStr);
				bb.clear();
				if (requestStr.equals("gettime")) {
					bb.put(new Date().toString().getBytes());
					key.attach(new byte[0]);
				} else if (requestStr.endsWith("clear")) {
					key.attach(new byte[0]);
					try {
						bb.put("     ".getBytes("GB2312"));
					} catch (UnsupportedEncodingException e) {
						e.printStackTrace();
					}
				} else {
					try {
						bb.put("       ".getBytes("GB2312"));
					} catch (UnsupportedEncodingException e) {
						e.printStackTrace();
					}
				}
				bb.flip();
				try {
					socketChannel.write(bb);
				} catch (IOException e) {
					e.printStackTrace();
				}
			}

		}

		private boolean isCache(SelectionKey key, byte[] tmpbytes) {
			Object obj = key.attachment();
			byte[] bytes;
			if (obj != null) {
				bytes = (byte[]) obj;
			} else {
				bytes = new byte[0];
			}
			int sumLength = bytes.length + tmpbytes.length;
			ByteBuffer bb = ByteBuffer.allocate(sumLength);
			bb.put(bytes);
			bb.put(tmpbytes);
			bb.flip();
			tmpbytes = bb.array();
			if (tmpbytes[sumLength - 1] == 10) {
				tmpbytes = new byte[sumLength - 2];
				bb.get(tmpbytes);
				key.attach(tmpbytes);
				return false;
			} else {
				key.attach(tmpbytes);
				return true;
			}
		}
	}
}