JAVA NIOの三
6321 ワード
参照
このセクションはJDK 1.5を採用した後、java.util.co ncurrentパッケージのAPIサーバーがスレッドの読み取りを実現し、Exectorsツールを採用して、スレッドの池を迅速に作成することができます.また、Exector Serviceサブクラスがカスタムで作成することもできます. サービス側と接続して情報を送信した後、コネクションSOCKETのショートコネクション(HTTPはショートコネクション)をクローズし、SOCKETの長コネクションを採用すると、「心拍検出」を追加する必要があり、本節では長コネクションはまだ実現されていません. Selector Polling読み取り可能なイベントの場合、読み返す問題があります.解決策は、読んだコードブロックに下記のコードselectKey.cancel()またはselectKey.interestOps(selectKey.interestOps)&(~SelectKey.OPuREAD)を追加します.
このセクションはJDK 1.5を採用した後、java.util.co ncurrentパッケージのAPIサーバーがスレッドの読み取りを実現し、Exectorsツールを採用して、スレッドの池を迅速に作成することができます.また、Exector Serviceサブクラスがカスタムで作成することもできます. サービス側と接続して情報を送信した後、コネクションSOCKETのショートコネクション(HTTPはショートコネクション)をクローズし、SOCKETの長コネクションを採用すると、「心拍検出」を追加する必要があり、本節では長コネクションはまだ実現されていません. Selector Polling読み取り可能なイベントの場合、読み返す問題があります.解決策は、読んだコードブロックに下記のコードselectKey.cancel()またはselectKey.interestOps(selectKey.interestOps)&(~SelectKey.OPuREAD)を追加します.
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
*
*/
public class SocketServer {
/**
*
*/
public static final int DEFAULT_PORT = 9999;
/**
*
*/
private Selector selector;
/**
*
*/
private ExecutorService pool;
public SocketServer(String ip, int port) {
ServerSocketChannel ssc = null;
try {
int _port = DEFAULT_PORT;
if (port > 0)
_port = port;
/* */
ssc = ServerSocketChannel.open();
/* */
ssc.configureBlocking(false);
/**
* ServerSocketChannel bind() ,
* ServerSocket
*
*/
ssc.socket().bind(new InetSocketAddress(ip, _port));
/* */
this.selector = Selector.open();
/* */
ssc.register(this.selector, SelectionKey.OP_ACCEPT);
/**
* Executors, ,
* CPU、 , (
*
* ),
* */
pool = new ThreadPoolExecutor(1, 2, 2000L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(5));
} catch (IOException e2) {
System.out.println(" !");
e2.printStackTrace();
try {
if(ssc != null) ssc.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
*
* @throws Exception
*/
public void pollSelect() throws Exception {
/* ( ) , */
while (true) {
int readyChannels = 0;
/* */
if(this.selector.isOpen()){
readyChannels = this.selector.select();
}
if(readyChannels == 0) continue;
/* */
Iterator<SelectionKey> it = this.selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey selectKey = it.next();
it.remove();
try {
process(selectKey);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
/**
*
* @param selectKey
*/
public void process(SelectionKey selectKey) throws Exception{
/* */
if (selectKey.isAcceptable()) {
accept(selectKey);
} else if (selectKey.isReadable()) { /* */
read(selectKey);
/**
*
*
* selectKey.interestOps(selectKey.interestOps() & (~SelectionKey.OP_READ));
* */
selectKey.cancel();
}
}
/**
*
* @param selectKey
*/
public void accept(SelectionKey selectKey) throws Exception {
SocketChannel sc = null;
try {
ServerSocketChannel ssc = (ServerSocketChannel) selectKey
.channel();
sc = ssc.accept();
sc.configureBlocking(false);
System.out.println(" :"
+ sc.socket().getInetAddress().getHostAddress()+" "+sc.socket().getPort()
+ " ");
sc.register(this.selector, SelectionKey.OP_READ);
} catch (Exception e) {
if(sc!=null)
sc.close();
throw new IOException(" !");
}
}
/**
*
* @param selectKey
*/
public void read(SelectionKey selectKey) throws Exception{
SocketChannel sc = (SocketChannel) selectKey
.channel();
/* */
pool.submit(new SocketServerReadThread(sc));
}
public static void main(String[] args) {
SocketServer ss = null;
try {
ss = new SocketServer("localhost", 9999);
ss.pollSelect();
} catch (Exception e) {
e.printStackTrace();
}
}
}
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/**
*
* @author oy
*
*/
public class SocketServerReadThread implements Runnable{
private SocketChannel channel;
public SocketServerReadThread(SocketChannel channel){
this.channel = channel;
}
@Override
public void run() {
try {
//
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer);
byte[] data = buffer.array();
ByteArrayInputStream bais = new ByteArrayInputStream(data);
ObjectInputStream ois = new ObjectInputStream(bais);
User user = (User)ois.readObject();
System.out.println(" :"
+ channel.socket().getInetAddress().getHostAddress()+" "+channel.socket().getPort() + " : " + user.toString());
} catch (Exception e) {
System.out.println(" !");
try {
if(channel != null)
channel.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}