JAVA NIOの三

6321 ワード

参照
  
このセクションはJDK 1.5を採用した後、java.util.co ncurrentパッケージのAPIサーバーがスレッドの読み取りを実現し、Exectorsツールを採用して、スレッドの池を迅速に作成することができます.また、Exector Serviceサブクラスがカスタムで作成することもできます.     サービス側と接続して情報を送信した後、コネクションSOCKETのショートコネクション(HTTPはショートコネクション)をクローズし、SOCKETの長コネクションを採用すると、「心拍検出」を追加する必要があり、本節では長コネクションはまだ実現されていません.    Selector Polling読み取り可能なイベントの場合、読み返す問題があります.解決策は、読んだコードブロックに下記のコードselectKey.cancel()またはselectKey.interestOps(selectKey.interestOps)&(~SelectKey.OPuREAD)を追加します.

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
*    
*/
public class SocketServer {

	/**
	 *          
	 */
	public static final int DEFAULT_PORT = 9999;

	/**
	 *    
	 */
	private Selector selector;
	/**
	 *      
	 */
	private ExecutorService pool;
	
	public SocketServer(String ip, int port) {
		ServerSocketChannel ssc = null;
		try {
			int _port = DEFAULT_PORT;
			if (port > 0)
				_port = port;
			/*      */
			ssc = ServerSocketChannel.open();
			/*       */
			ssc.configureBlocking(false);
			/**
			 *        ServerSocketChannel  bind()  ,
			 *           ServerSocket         
			 *           
			 */
			ssc.socket().bind(new InetSocketAddress(ip, _port));
			/*       */
			this.selector = Selector.open();
			/*           */
			ssc.register(this.selector, SelectionKey.OP_ACCEPT);
			
			/**
			 *     Executors,       ,        
			 * CPU、         ,           (
			 *         
			 * ),            
			 * */
			pool = new ThreadPoolExecutor(1, 2, 2000L,
					TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(5));
		} catch (IOException e2) {
			System.out.println("       !");
			e2.printStackTrace();
			try {
				if(ssc != null) ssc.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
	
	/**
	 *      
	 * @throws Exception
	 */
	public void pollSelect() throws Exception {
		/* (  )     ,      */
		while (true) {
			int readyChannels = 0;
			/*       */
			if(this.selector.isOpen()){
				readyChannels = this.selector.select();
			}
			if(readyChannels == 0) continue;
			/*          */
			Iterator<SelectionKey> it = this.selector.selectedKeys().iterator();
			while (it.hasNext()) {
				SelectionKey selectKey = it.next();
				it.remove();
				try {
					process(selectKey);
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}
	}

	/**
	 *     
	 * @param selectKey
	 */
	public void process(SelectionKey selectKey) throws Exception{
		/*         */
		if (selectKey.isAcceptable()) { 
			accept(selectKey);
		} else if (selectKey.isReadable()) { /*      */
			read(selectKey);
			/**
			 *      
			 *               
			 * selectKey.interestOps(selectKey.interestOps() & (~SelectionKey.OP_READ));
			 * */
			selectKey.cancel();
		}
	}
	
	/**
	 *     
	 * @param selectKey
	 */
	public void accept(SelectionKey selectKey) throws Exception {
		SocketChannel sc = null;
		try {
			ServerSocketChannel ssc = (ServerSocketChannel) selectKey
					.channel();
			sc = ssc.accept();
			sc.configureBlocking(false);
			System.out.println("   :"
                    + sc.socket().getInetAddress().getHostAddress()+"  "+sc.socket().getPort()
                    + "    ");
			sc.register(this.selector, SelectionKey.OP_READ);
		} catch (Exception e) {
			if(sc!=null) 
				sc.close();
			throw new IOException("        !");
		}
	}
	
	/**
	 *     
	 * @param selectKey
	 */
	public void read(SelectionKey selectKey) throws Exception{
		SocketChannel sc = (SocketChannel) selectKey
				.channel();
        /*       */
		pool.submit(new SocketServerReadThread(sc));
	}
	
	
	public static void main(String[] args) {
		SocketServer ss = null;
		try {
			ss = new SocketServer("localhost", 9999);
			ss.pollSelect();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}


import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

/**
 *        
 * @author oy
 *
 */
public class SocketServerReadThread implements Runnable{
	
	private SocketChannel channel;
	
	public SocketServerReadThread(SocketChannel channel){
		this.channel = channel;
	}
	
	@Override
	public void run() {
		try {
			//         
			ByteBuffer buffer = ByteBuffer.allocate(1024);
			channel.read(buffer);
			byte[] data = buffer.array();
			ByteArrayInputStream bais = new ByteArrayInputStream(data);
			ObjectInputStream ois = new ObjectInputStream(bais);
			User user = (User)ois.readObject();
			System.out.println("   :"
                    + channel.socket().getInetAddress().getHostAddress()+"  "+channel.socket().getPort() + "   : " + user.toString());
		} catch (Exception e) {
			System.out.println("        !");
			try {
				if(channel != null) 
					channel.close();
			} catch (IOException e1) {
				e1.printStackTrace();
			}
		}
	}
}