nioシンプルなreactorモデルserverを実現しました。

8111 ワード

nio関連のクラスと方法
ブザー
nioのbufferは本質的には、メモリ領域の一つです。Bufferクラスにカプセル化され、データを読み書きするための方法を提供します。
  • 読み書きデータは、次の4つのステップに分けられます。
  • バffer
  • にデータを書き込みます。
  • flipを起動する()方法
  • データをバッファローから読み出す
  • clear()方法またはcomppact()方法を呼び出す

  • バfferにデータを書き込む時、バfferはどれぐらいのデータを書きましたか?データを読み込むには、flip()法によりBufferを書き込みモードから読み出しモードに切り替える必要があります。読み出しモードでは、読出し前にブザーに書き込まれたすべてのデータが読み込めます。
    すべてのデータを読み終わると、バッファをクリアして、もう一度書き込みできるようにします。バッファをクリアできる方法は二つあります。clear()またはcompect()を呼び出す方法です。clear()メソッドはバッファ全体をクリアします。compect()メソッドは既読のデータだけをクリアします。未読のデータはいずれもバッファの先頭に移動します。新たに書き込まれたデータはバッファ未読のデータの後ろに配置されます。
  • ブザーの割り当て
  • ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    
  • ブザーのタイプ
  • ByteBuffer
  • MappedByteBuffer
  • CharBuffer
  • Double Burer
  • Float Buffer
  • IntBuffer
  • LongBuffer
  • ShotBuffer
  • bufferにデータを書き込む方法は、ブザーを呼び出すput方法であり、一つはチャンネルから取得する方法である。
  • byteBuffer.put(new byte[127]);
    //channel read buffer  
    clientChannel.read(byteBuffer);
    
  • bufferはデータを読む前にflip()を呼び出し、読み出しモードに切り替えます。一つはbufferを呼び出すget方法であり、一つはchanelに
  • を書き込むことである。
    //           ,               
    byteBuffer.flip();
    byteBuffer.get();
    clientChannel.write(byteBuffer);
    
  • clear()とcompect()の方法でBufferのデータを読み終えて、Bufferに準備してもらう必要があれば再度書き込みされます。clear()またはcomppact()の方法で完成できます。
  • もしBufferの中に未読のデータがあったら、clear()を呼び出して、データは“忘れられます”となります。どのデータが読みましたか?まだ何も教えてくれません。
    Bufferに未読のデータがあり、その後もこれらのデータが必要である場合、先にデータを書きたい場合はcompct(compect)を使用します。
    プロジェクト
  • selectの作成
  • Selector selector = Selector.open();
    
  • select()メソッドselect方法はブロックされます。
    reactorモデルはnioの上に(1)メインスレッド(Accept)があります。読み書きイベントを受信します。具体的な処理スレッドは読み書き要求を処理します。(2)キューの概念を導入して、例えばリクエストイベントをキューに入れて、プロcessは消費キューに行って、書き込み処理をします。*
    やはりecho機能を例にします。事件のシンプルなレクチャーモデル
    package com.lxqn.jiapeng.reactorO;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Set;
    
    /**
     *     Accept      
     * Created by jiapeng on 2017/10/7.
     */
    public class NIOServer {
    
        public static void main(String[] args) throws IOException {
            Selector selector = Selector.open();
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.bind(new InetSocketAddress(9998));
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            int coreNum = Runtime.getRuntime().availableProcessors();
            Processor[] processors = new Processor[coreNum];
            for (int i = 0; i < processors.length; i++) {
                processors[i] = new Processor();
            }
            int index = 0;
            while (true) {
                int n=selector.select(1000);
                if (n == 0) {
                    System.out.print(".");
                    continue;
                }
                System.out.println("n="+n);
    
                if(n>0){
                    Set keys = selector.selectedKeys();
                    for (SelectionKey key : keys) {
                        keys.remove(key);
                        if (key.isAcceptable()) {
                            ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
                            SocketChannel socketChannel = acceptServerSocketChannel.accept();
                            socketChannel.configureBlocking(false);
                            System.out.println("Accept request from {}" + socketChannel.getRemoteAddress());
                            Processor processor = processors[(int) ((index++) % coreNum)];
                            processor.addChannel(socketChannel);
                            processor.wakeup();
                        }
                    }
                }
    
            }
        }
    }
    
    package com.lxqn.jiapeng.reactorO;
    
    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.nio.channels.*;
    import java.nio.channels.spi.SelectorProvider;
    import java.util.Iterator;
    import java.util.Set;
    import java.util.concurrent.ConcurrentLinkedQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     *       IO  
     * Created by jiapeng on 2017/10/7.
     */
    public class Processor {
        //     ,      2 
        private static final ExecutorService service =
                Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors());
        private Selector selector;
    
        //     ,
        protected ConcurrentLinkedQueue toWrite;
    
        public Processor() throws IOException {
            this.selector = SelectorProvider.provider().openSelector();
            toWrite=new ConcurrentLinkedQueue();
            start();
        }
        public void addChannel(SocketChannel socketChannel) throws ClosedChannelException {
            socketChannel.register(this.selector, SelectionKey.OP_READ,ByteBuffer.allocate(1024));
        }
        public void wakeup() {
            this.selector.wakeup();
        }
        public void start() {
            service.submit(() -> {
                while (true) {
                    int n=selector.select(1000);
                    if (n == 0) {
                        continue;
                    }
                    if(n>0){
                        System.out.println("process n="+n);
                        Set keys = selector.selectedKeys();
                        Iterator iterator = keys.iterator();
                        while (iterator.hasNext()) {
                            SelectionKey key = iterator.next();
                            iterator.remove();
                            if (key.isReadable()) {
                                ByteBuffer buffer = (ByteBuffer) key.attachment();
                                SocketChannel socketChannel = (SocketChannel) key.channel();
                                int count = socketChannel.read(buffer);
                                System.out.println("count="+count);
                                if (count < 0) {
                                    socketChannel.close();
                                    key.cancel();
                                    System.out.println("Read ended"+socketChannel);
                                    continue;
                                } else if (count == 0) {
                                    System.out.println("Message size is 0"+socketChannel);
                                    continue;
                                } else {
                                    System.out.println("Read message"+socketChannel+" message="+new String(buffer.array()));
                                    key.interestOps(SelectionKey.OP_WRITE);
                                }
                            }
                            if (key.isWritable()) {
                                System.out.println("start write");
                                toWrite.offer(key);
                            }
                            //         
                            SelectionKey writableKey=null;
                            while((writableKey=toWrite.poll())!=null) {
                                System.out.println("toWrite operation");
                                write(writableKey);
                            }
                        }
                    }
                }
            });
        }
    
        private void write(SelectionKey key) throws IOException{
            ByteBuffer buf = (ByteBuffer) key.attachment();
            //make buffer ready for read
            buf.flip();
            SocketChannel clientChannel = (SocketChannel) key.channel();
            clientChannel.write(buf);
            if (!buf.hasRemaining()) {
                //        ,  echo   
                key.interestOps(SelectionKey.OP_READ);
            }
            //  buffer,        buffer
            buf.compact();
            System.out.println("toWrite"+buf.toString()+" message="+new String(buf.array()));
        }
    
    }