SocketChannel/ServerSocketChannel/Selector

17189 ワード

Channel
 
Channelは、データブロックを操作し、bufferに書き込み、またはIOからbufferに読み出すために使用される.
 
SocketChannel
 
作成
 
//作成、接続待ちのブロック
SocketChannel sc = SocketChannel.open(new InetSocketAddress("abc.com",80));
 
//connect()接続を使用した作成
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("abc.com",80));//blocking
 
//作成、非ブロック
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);    //set no-block
sc.connect(new InetSocketAddress("abc.com",80));//no-blocking
 
//非ブロックモードの一般的な処理方法——ポーリング!
while(true)
{
     if(sc.
finishConnect()/*はすぐに戻り、接続はtrueを返します.そうでなければfalse*/){
          break; 
         /* connected,start write */
     }
}
 
.isConnected()
.isConnectionPending()/setup中、まだオープンしていません
 
 
読み取り
 
int sc.read(ByteBuffer dst)
できるだけ多くのデータをdstに読みます.
読み出したバイト数を返し、読み出しが完了したら-1を返します.
 
while(buffer.hasRemaining() && channel.read(buffer)!=-1 )
{
}
 
Scatterは複数のBufferに読み取りコピーする
 
ByteBuffer[] buffers = new ByteBuffer[2];
buffers[0] = ByteBuffer.allocate(1000);
buffers[1] = ByteBuffer.allocate(1000);
while(buffer[1].hasRemaining() && channel.read(buffer)!=-1 )
{
}
 
 
書き込み
 
while(buffer.hasRemaining() && channel.write(buffer)!=-1 )
{
}
 
gatherは複数のbufferから読み出し、同じsocketに書き込む
 
channel.write(ByteBuffer[] dsts);
 
 
閉じる
 
if( channel.isOpen() )
{
     channel.close();
}
 
ServerSocketChannel
 
 
作成
 
ServerSocketChannel ssc = ServerSocketChannel.open();//Openは開くのではなく、作成します
ServerSocket ss = ssc.socket();
ss.bind(new InetSocketAddress(80));
 
かんし
 
.appcept();
デフォルトは
blockingモード.
non-blockingモードで、.accept()は、接続されていない場合にすぐにnullを返します.Selector処理が必要です.
 
 
Selector
 
Selector selector = Selector.open();
 
selectorにchannelを登録する
ServerSocketChannel ssc = ... ;
ssc.register(selector,SelectionKey.OP_ACCEPT/*リスニングイベント*/|SelectionKey.OP_CONNECT);
ssc.register(selector,OP,object state);
 
イベントのタイプ
SelectionKey.OP_ACCEPT
SocketChannelこのイベントはありません.Server SocketChannelには
          
SelectionKey.OP_CONNECT          
SelectionKey.OP_READ
SelectionKey.OP_WRITE
 
各チャンネルがselectorに登録されたら、いつでも
イベントの処理が必要なチャネルをポーリング
selector.selectNow()/no-blocking、なければ0を返します
selector.select()    // 
少なくとも1つのイベントまでblocing
selector.select(long timeout)
 
イベントの準備ができたら、対応するchannelを取得します.
Set selector.
selectedKeys();
 
閉じる
selector.close()
 
 
SelectionKey
 
selectedKeyを処理するときは、どのイベントかを判断します.
SelectionKey key = selector.selectedKeys().get(0);
if(key.isAcceptable()){}
if(key.isConnectionable()){}
 
//対応するチャンネルを取得する
SelectableChannel c = key.channel();
 
//追加ステータスの取得
key.attachment()
 
//フォローしない
key.cancel()
 
DEMO:
ブロック・モードのSocketChannelは、ブロック・モードのServer SocketChannelと対話します(Selectorを使用する必要はありません).
 
    /**

    *  ServerSocketChannel

    */

    public static void startBlockingServerChannel() {

        new Thread(new Runnable() {

                @Override

                public void run() {

                    ServerSocketChannel serverSocketChannel = null;



                    try {

                        while (true) {

                            serverSocketChannel = ServerSocketChannel.open();

                            serverSocketChannel.configureBlocking(true);

                            serverSocketChannel.bind(new InetSocketAddress(90));



                            System.out.println("[server]accepting...");



                            SocketChannel client = serverSocketChannel.accept();



                            ByteBuffer dst = ByteBuffer.allocate(1024);

                            client.read(dst);



                            String result = new String(dst.array(), "UTF-8");



                            System.out.println("->" + result);

                            serverSocketChannel.close();

                        }

                    } catch (IOException e) {

                        e.printStackTrace();

                    } finally {

                        serverSocketChannel.close();

                    }

                }

            }).start();

    }



    /**

    *  SocketChanel ServerSocketChannel

    */

    public static void startBlockingClinetUsingSocketChannel() {

        new Thread(new Runnable() {

                @Override

                public void run() {

                    SocketChannel s = null;



                    try {

                        System.out.println("[client]start a client socket:");

                        s = SocketChannel.open();

                        //s.configureBlocking(true);

                        System.out.println("[client]try connect...");

                        s.connect(new InetSocketAddress("127.0.0.1", 90));

                        System.out.println("[client]after connect");



                        ByteBuffer src = ByteBuffer.allocate(20);

                        src.put("Hello".getBytes("UTF-8"));

                        src.flip(); //  

                        s.write(src);

                        s.close();

                    } catch (Exception e) {

                        e.printStackTrace();

                    } finally {

                        try {

                            s.close();

                        } catch (IOException e) {

                            e.printStackTrace();

                        }

                    }

                }

            }).start();

    }

 
 
DEMO:
非ブロックモードのSocketChannelをクライアントとして
 
    public static void startNoBlocingSocketChannelAsClinet() {

        SocketChannel socketChannel = null;



        try {

            System.out.println("[client]start a client socket:");

            s = SocketChannel.open();

            s.configureBlocking(false);

            s.connect(new InetSocketAddress("127.0.0.1", 90));



            while (true) {

                if (s.finishConnect()) {

                    System.out.println("[client]connected...");



                    ByteBuffer src = ByteBuffer.allocate(100);

                    src.put("Hello".getBytes("UTF-8"));

                    src.flip();

                    s.write(src);



                    break;

                }

            }

        } catch (Exception e) {

        } finally { /*close*/

        }

    }

 
 
DEMO:
非ブロッキング・モードでのServer SocketChannel+Selector実装サーバ(シングル・スレッド・マルチ・チャネル)
 
   public static void startNoBlockingServerChannel() {

        new Thread(new Runnable() {

                @Override

                public void run() {

                    ServerSocketChannel serverSocketChannel = null;

                    Selector selector = null;



                    try {

                        selector = Selector.open();



                        serverSocketChannel = ServerSocketChannel.open();

                        serverSocketChannel.configureBlocking(false);

                        serverSocketChannel.socket()

                                           .bind(new InetSocketAddress(90));



                        // register to selector

                        serverSocketChannel.register(selector,

                            SelectionKey.OP_ACCEPT);



                        //  selector

                        while (true) {

                            System.out.println(

                                "[server] Selector select and blocking ...");

                            selector.select(); // blocking,wait until 1 event



                            Set<SelectionKey> keys = selector.selectedKeys(); // not .keys()

                            Iterator iter = keys.iterator();



                            while (iter.hasNext()) {

                                SelectionKey key = (SelectionKey) iter.next();

                                iter.remove();



                                //   if else

                                if (key.isValid() == false) {

                                    continue;

                                } else if (key.isAcceptable()) {

                                    SocketChannel client = ((ServerSocketChannel) key.channel()).accept();

                                    client.configureBlocking(false); //  ( , )

                                    client.register(selector,

                                        SelectionKey.OP_READ); //  Selector 

                                } else if (key.isReadable()) {

                                    // start read

                                    SocketChannel client = (SocketChannel) key.channel();



                                    ByteBuffer bf = ByteBuffer.allocate(1000);

                                    int count = client.read(bf);

                                    System.out.println("-->" +

                                        new String(bf.array(), "UTF-8"));

                                    // bf.flip();

                                    key.cancel();

                                } else if (key.isWritable()) {

                                    //  

                                }

                            }

                        }

                    } catch (IOException e) {

                        e.printStackTrace();

                    } finally { /* close selector and ServerSocketChannel*/

                    }

                }

            }).start();

    }