class Reactor implements Runnable {
final Selector selector;
//ServerSocketChannel
// , java.net.ServerSocket , TCP IO , OP_ACCEPT 。
final ServerSocketChannel serverSocket;
Reactor(int port) throws IOException {
selector = Selector.open(); //
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
// channel blocking ,
// non-blocking SelectableChannel IO 。
serverSocket.configureBlocking(false); // non-blocking 。
/**
*SelectionKey register(Selector sel, int ops)
* channel Selector SelectionKey。
* , Selector select() channel。ops bit mask, IO 。
*SelectionKey register(Selector sel, int ops, Object att)
* , att attachment SelectionKey , session state 。
*Selector 4 4 IO , bit mask。
*int OP_ACCEPT : accept,ServerSocketChannel IO。
*int OP_CONNECT: ( ),SocketChannel IO。
*/
SelectionKey sk = serverSocket.register(selector,SelectionKey.OP_ACCEPT);
sk.attach(new Acceptor()); // attachment
}
/*
Alternatively, use explicit SPI provider:
SelectorProvider p = SelectorProvider.provider();
selector = p.openSelector();
serverSocket = p.openServerSocketChannel();
*/
public void run() { // normally in a new Thread
try {
while (!Thread.interrupted()) {
/**
* Selector , 3 SelectionKey :
*1. key set Selector channel, keys() 。
*2. Selected-key set select() IO channel, selectedKeys() 。
*3. Cancelled-key set cancel channel, select() , channel SelectionKey key set cancelled-key set 。 。
*/
// channel, IO , ,
// SelectionKey selected-key set。
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext())
dispatch((SelectionKey)(it.next());
selected.clear();
}
} catch (IOException ex) { /* ... */ }
}
void dispatch(SelectionKey k) {
Runnable r = (Runnable)(k.attachment());
if (r != null)
r.run();
}
class Acceptor implements Runnable { // inner
public void run() {
try {
//SocketChannel accept() : , SocketChannel 。
SocketChannel c = serverSocket.accept();
if (c != null)
new Handler(selector, c);
}catch(IOException ex) { /* ... */ }
}
}
}
final class Handler implements Runnable {
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(MAXIN);
ByteBuffer output = ByteBuffer.allocate(MAXOUT);
static final int READING = 0, SENDING = 1;
int state = READING;
Handler(Selector sel, SocketChannel c) throws IOException {
socket = c;
c.configureBlocking(false);
// Optionally try first read now
/**
* SocketChannel
* , java.net.Socket , TCP IO ,
* OP_CONNECT,OP_READ OP_WRITE 。
*/
// ? sk = socket.register(sel, SelectionKey.OP_READ, this)
sk = socket.register(sel, 0);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
// select() 。
sel.wakeup();
}
boolean inputIsComplete() { /* ... */ }
boolean outputIsComplete() { /* ... */ }
void process() { /* ... */ }
public void run() {
try {
if (state == READING) read();
else if (state == SENDING) send();
} catch (IOException ex) { /* ... */ }
}
void read() throws IOException {
socket.read(input);
if (inputIsComplete()) {
process();
state = SENDING;
// Normally also do first write now
sk.interestOps(SelectionKey.OP_WRITE);
}
}
void send() throws IOException {
socket.write(output);
// void cancel() : cancel SelectionKey 。
if (outputIsComplete()) sk.cancel();
}
}
/**
*
*/
/**
* ========= =============
* GoF State-Object pattern
* , " "
* :http://www.jdon.com/designpatterns/designpattern_State.htm
*
*/
class Handler {
// ...
public void run() { // initial state is reader
socket.read(input);
if (inputIsComplete()) {
process();
sk.attach(new Sender());
sk.interest(SelectionKey.OP_WRITE);
sk.selector().wakeup();
}
}
class Sender implements Runnable {
public void run(){ // ...
socket.write(output);
if (outputIsComplete()) sk.cancel();
}
}
}
/**
* ========= =============
* Handler with Thread Pool
*
*/
class Handler implements Runnable {
// uses util.concurrent thread pool
static PooledExecutor pool = new PooledExecutor(...);
static final int PROCESSING = 3;
// ...
synchronized void read() { // ...
socket.read(input);
if (inputIsComplete()) {
state = PROCESSING;
pool.execute(new Processer());
}
}
synchronized void processAndHandOff() {
process();
state = SENDING; // or rebind attachment
sk.interest(SelectionKey.OP_WRITE);
}
class Processer implements Runnable {
public void run() { processAndHandOff(); }
}
}
/**
* ========= =============
* Multiple Reactor Threads
*
*/
//Use to match CPU and IO rates
//Static or dynamic construction
//" Each with own Selector, Thread, dispatch loop
//Main acceptor distributes to other reactors
Selector[] selectors; // also create threads
int next = 0;
class Acceptor { // ...
public synchronized void run() { ...
Socket connection = serverSocket.accept();
if (connection != null)
new Handler(selectors[next], connection);
if (++next == selectors.length) next = 0;
}
}