nioシンプルなreactorモデルserverを実現しました。
8111 ワード
nio関連のクラスと方法
ブザー
nioのbufferは本質的には、メモリ領域の一つです。Bufferクラスにカプセル化され、データを読み書きするための方法を提供します。読み書きデータは、次の4つのステップに分けられます。 バffer にデータを書き込みます。 flipを起動する()方法 データをバッファローから読み出す clear()方法またはcomppact()方法を呼び出す 。
バfferにデータを書き込む時、バfferはどれぐらいのデータを書きましたか?データを読み込むには、flip()法によりBufferを書き込みモードから読み出しモードに切り替える必要があります。読み出しモードでは、読出し前にブザーに書き込まれたすべてのデータが読み込めます。
すべてのデータを読み終わると、バッファをクリアして、もう一度書き込みできるようにします。バッファをクリアできる方法は二つあります。clear()またはcompect()を呼び出す方法です。clear()メソッドはバッファ全体をクリアします。compect()メソッドは既読のデータだけをクリアします。未読のデータはいずれもバッファの先頭に移動します。新たに書き込まれたデータはバッファ未読のデータの後ろに配置されます。ブザーの割り当て ブザーのタイプ ByteBuffer MappedByteBuffer CharBuffer Double Burer Float Buffer IntBuffer LongBuffer ShotBuffer bufferにデータを書き込む方法は、ブザーを呼び出すput方法であり、一つはチャンネルから取得する方法である。 bufferはデータを読む前にflip()を呼び出し、読み出しモードに切り替えます。一つはbufferを呼び出すget方法であり、一つはchanelに を書き込むことである。 clear()とcompect()の方法でBufferのデータを読み終えて、Bufferに準備してもらう必要があれば再度書き込みされます。clear()またはcomppact()の方法で完成できます。 もしBufferの中に未読のデータがあったら、clear()を呼び出して、データは“忘れられます”となります。どのデータが読みましたか?まだ何も教えてくれません。
Bufferに未読のデータがあり、その後もこれらのデータが必要である場合、先にデータを書きたい場合はcompct(compect)を使用します。
プロジェクト selectの作成 select()メソッドselect方法はブロックされます。
reactorモデルはnioの上に(1)メインスレッド(Accept)があります。読み書きイベントを受信します。具体的な処理スレッドは読み書き要求を処理します。(2)キューの概念を導入して、例えばリクエストイベントをキューに入れて、プロcessは消費キューに行って、書き込み処理をします。*
やはりecho機能を例にします。事件のシンプルなレクチャーモデル
ブザー
nioのbufferは本質的には、メモリ領域の一つです。Bufferクラスにカプセル化され、データを読み書きするための方法を提供します。
バfferにデータを書き込む時、バfferはどれぐらいのデータを書きましたか?データを読み込むには、flip()法によりBufferを書き込みモードから読み出しモードに切り替える必要があります。読み出しモードでは、読出し前にブザーに書き込まれたすべてのデータが読み込めます。
すべてのデータを読み終わると、バッファをクリアして、もう一度書き込みできるようにします。バッファをクリアできる方法は二つあります。clear()またはcompect()を呼び出す方法です。clear()メソッドはバッファ全体をクリアします。compect()メソッドは既読のデータだけをクリアします。未読のデータはいずれもバッファの先頭に移動します。新たに書き込まれたデータはバッファ未読のデータの後ろに配置されます。
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.put(new byte[127]);
//channel read buffer
clientChannel.read(byteBuffer);
// ,
byteBuffer.flip();
byteBuffer.get();
clientChannel.write(byteBuffer);
Bufferに未読のデータがあり、その後もこれらのデータが必要である場合、先にデータを書きたい場合はcompct(compect)を使用します。
プロジェクト
Selector selector = Selector.open();
reactorモデルはnioの上に(1)メインスレッド(Accept)があります。読み書きイベントを受信します。具体的な処理スレッドは読み書き要求を処理します。(2)キューの概念を導入して、例えばリクエストイベントをキューに入れて、プロcessは消費キューに行って、書き込み処理をします。*
やはりecho機能を例にします。事件のシンプルなレクチャーモデル
package com.lxqn.jiapeng.reactorO;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;
/**
* Accept
* Created by jiapeng on 2017/10/7.
*/
public class NIOServer {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(9998));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
int coreNum = Runtime.getRuntime().availableProcessors();
Processor[] processors = new Processor[coreNum];
for (int i = 0; i < processors.length; i++) {
processors[i] = new Processor();
}
int index = 0;
while (true) {
int n=selector.select(1000);
if (n == 0) {
System.out.print(".");
continue;
}
System.out.println("n="+n);
if(n>0){
Set keys = selector.selectedKeys();
for (SelectionKey key : keys) {
keys.remove(key);
if (key.isAcceptable()) {
ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = acceptServerSocketChannel.accept();
socketChannel.configureBlocking(false);
System.out.println("Accept request from {}" + socketChannel.getRemoteAddress());
Processor processor = processors[(int) ((index++) % coreNum)];
processor.addChannel(socketChannel);
processor.wakeup();
}
}
}
}
}
}
package com.lxqn.jiapeng.reactorO;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* IO
* Created by jiapeng on 2017/10/7.
*/
public class Processor {
// , 2
private static final ExecutorService service =
Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors());
private Selector selector;
// ,
protected ConcurrentLinkedQueue toWrite;
public Processor() throws IOException {
this.selector = SelectorProvider.provider().openSelector();
toWrite=new ConcurrentLinkedQueue();
start();
}
public void addChannel(SocketChannel socketChannel) throws ClosedChannelException {
socketChannel.register(this.selector, SelectionKey.OP_READ,ByteBuffer.allocate(1024));
}
public void wakeup() {
this.selector.wakeup();
}
public void start() {
service.submit(() -> {
while (true) {
int n=selector.select(1000);
if (n == 0) {
continue;
}
if(n>0){
System.out.println("process n="+n);
Set keys = selector.selectedKeys();
Iterator iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel socketChannel = (SocketChannel) key.channel();
int count = socketChannel.read(buffer);
System.out.println("count="+count);
if (count < 0) {
socketChannel.close();
key.cancel();
System.out.println("Read ended"+socketChannel);
continue;
} else if (count == 0) {
System.out.println("Message size is 0"+socketChannel);
continue;
} else {
System.out.println("Read message"+socketChannel+" message="+new String(buffer.array()));
key.interestOps(SelectionKey.OP_WRITE);
}
}
if (key.isWritable()) {
System.out.println("start write");
toWrite.offer(key);
}
//
SelectionKey writableKey=null;
while((writableKey=toWrite.poll())!=null) {
System.out.println("toWrite operation");
write(writableKey);
}
}
}
}
});
}
private void write(SelectionKey key) throws IOException{
ByteBuffer buf = (ByteBuffer) key.attachment();
//make buffer ready for read
buf.flip();
SocketChannel clientChannel = (SocketChannel) key.channel();
clientChannel.write(buf);
if (!buf.hasRemaining()) {
// , echo
key.interestOps(SelectionKey.OP_READ);
}
// buffer, buffer
buf.compact();
System.out.println("toWrite"+buf.toString()+" message="+new String(buf.array()));
}
}