Scalable IO in Javaの簡単な解読



    class Reactor implements Runnable {
    final Selector selector;
    //ServerSocketChannel
    //      ,   java.net.ServerSocket   ,   TCP  IO  ,  OP_ACCEPT  。
    final ServerSocketChannel serverSocket;

    Reactor(int port) throws IOException {
        selector = Selector.open();  //    
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress(port));
        //   channel       blocking  ,
        //   non-blocking SelectableChannel       IO  。
        serverSocket.configureBlocking(false); //  non-blocking  。
        /**
        *SelectionKey register(Selector sel, int ops)
        *   channel     Selector       SelectionKey。
        *    ,    Selector select()         channel。ops       bit mask,        IO  。
        *SelectionKey register(Selector sel, int ops, Object att)
        *             ,    att     attachment       SelectionKey ,        session state       。
        *Selector   4        4 IO  ,                bit mask。
        *int OP_ACCEPT :          accept,ServerSocketChannel      IO。
        *int OP_CONNECT:         (   ),SocketChannel      IO。
        */
        SelectionKey sk = serverSocket.register(selector,SelectionKey.OP_ACCEPT);
        sk.attach(new Acceptor());  //   attachment
    }
    /*
    Alternatively, use explicit SPI provider:
    SelectorProvider p = SelectorProvider.provider();
    selector = p.openSelector();
    serverSocket = p.openServerSocketChannel();
    */


    public void run() { // normally in a new Thread
        try {
            while (!Thread.interrupted()) {
                /**
                *   Selector , 3 SelectionKey   :
                *1. key set          Selector  channel,        keys()    。
                *2. Selected-key set       select()         IO   channel,        selectedKeys()  。
                *3. Cancelled-key set     cancel      channel,    select()   ,  channel   SelectionKey  key set cancelled-key set   。          。
                */
                //       channel,       IO       ,     ,
                //     SelectionKey  selected-key set。
                selector.select();  
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                while (it.hasNext())
                    dispatch((SelectionKey)(it.next());
                selected.clear();
            }
        } catch (IOException ex) { /* ... */ }
    }

    void dispatch(SelectionKey k) {
        Runnable r = (Runnable)(k.attachment());
        if (r != null)
            r.run();
    }


    class Acceptor implements Runnable { // inner
        public void run() {
            try {
                //SocketChannel accept() :      ,         SocketChannel  。
                SocketChannel c = serverSocket.accept();
                if (c != null)
                    new Handler(selector, c);
            }catch(IOException ex) { /* ... */ }
        }
    }
}


final class Handler implements Runnable {
    final SocketChannel socket;
    final SelectionKey sk;
    ByteBuffer input = ByteBuffer.allocate(MAXIN);
    ByteBuffer output = ByteBuffer.allocate(MAXOUT);
    static final int READING = 0, SENDING = 1;
    int state = READING;

    Handler(Selector sel, SocketChannel c) throws IOException {
        socket = c; 
        c.configureBlocking(false);
        // Optionally try first read now
        /**
        * SocketChannel
        *       ,   java.net.Socket   ,   TCP  IO  ,
        *   OP_CONNECT,OP_READ OP_WRITE  。
        */
        //        ?   sk = socket.register(sel, SelectionKey.OP_READ, this)
        sk = socket.register(sel, 0);
        sk.attach(this);
        sk.interestOps(SelectionKey.OP_READ);
        //         select()      。
        sel.wakeup();
    }

    boolean inputIsComplete() { /* ... */ }
    boolean outputIsComplete() { /* ... */ }
    void process() { /* ... */ }


    public void run() {
        try {
            if (state == READING) read();
            else if (state == SENDING) send();
        } catch (IOException ex) { /* ... */ }
    }

    void read() throws IOException {
        socket.read(input);
        if (inputIsComplete()) {
            process();
            state = SENDING;
            // Normally also do first write now
            sk.interestOps(SelectionKey.OP_WRITE);
        }
    }
    void send() throws IOException {
        socket.write(output);
        // void cancel() : cancel  SelectionKey        。
        if (outputIsComplete()) sk.cancel();
    }
}


/**
*      
*/

/**
 * =========  =============
 * GoF State-Object pattern 
 *     ,   "    "   
 *   :http://www.jdon.com/designpatterns/designpattern_State.htm
 *
*/

class Handler { 
    // ...

    public void run() { // initial state is reader
        socket.read(input);
        if (inputIsComplete()) {
            process();
            sk.attach(new Sender());
            sk.interest(SelectionKey.OP_WRITE);
            sk.selector().wakeup();
        }
    }
    class Sender implements Runnable {
        public void run(){ // ...
            socket.write(output);
            if (outputIsComplete()) sk.cancel();
        }
    }
}


/**
 * =========  =============
 * Handler with Thread Pool
 *
*/

class Handler implements Runnable {
    // uses util.concurrent thread pool
    static PooledExecutor pool = new PooledExecutor(...);
    static final int PROCESSING = 3;
    // ...
    synchronized void read() { // ...
        socket.read(input);
        if (inputIsComplete()) {
            state = PROCESSING;
            pool.execute(new Processer());
        }
    }
    synchronized void processAndHandOff() {
        process();
        state = SENDING; // or rebind attachment
        sk.interest(SelectionKey.OP_WRITE);
    }
    class Processer implements Runnable {
        public void run() { processAndHandOff(); }
    }
}


/**
 * =========  =============
 * Multiple Reactor Threads
 *
*/

    //Use to match CPU and IO rates
    //Static or dynamic construction
    //" Each with own Selector, Thread, dispatch loop
    //Main acceptor distributes to other reactors

    Selector[] selectors; // also create threads
    int next = 0;
    class Acceptor { // ...
        public synchronized void run() { ...
            Socket connection = serverSocket.accept();
            if (connection != null)
            new Handler(selectors[next], connection);
            if (++next == selectors.length) next = 0;
        }
    }