JAva nio&nettyシリーズの3つのnettyネットワークモデルコードと簡略化版コードの例
22276 ワード
前の章でrecatorモデルについてたくさん話しましたが、nettyではいったいどのようにこのモデルを使っているのでしょうか.
1、nettyのサーバー側ネットワークモデル
個人的にnettyソースコードを読む場合、nettyのサーバ側が最も簡単なrecatorネットワークモデル、すなわち単一スレッドのrecatorモデルを使用していると考えられます.本明細書に関連するnettyソースバージョンは3.6.5であり、主にsocketプロトコルのnioを分析する.
1.1、nettyサーバー側の実際のモデル設計
NettyサーバコードにはNioServerBossPoolというクラスもあります.一見Nettyのサーバ側ネットワークアーキテクチャもMainReactorとSubReactorモードであるべきですが、サーバ側が複数のポートをバインドしている場合にのみNioServerBossPoolクラスが真の役割を果たすことがわかります.次にコードを追跡します.
ServerBootstrapのbindAsyncメソッドのgetFactory().newChannel->
NioServerSocketChannelFactoryのnewChannelメソッドには、return new NioServerSocketChannel(this,pipeline,sink,bossPool.nextBoss()workerPool)というコードが表示されます.
NioServerSocketChannelが1つのNioServerBossクラスにしか対応していないことがわかるので、サーバ側のネットワークモデルが最も簡単なReactorモデルだと結論しました.実際には理解に難くありませんが、subReactorは接続の確立と接続の割り当てを担当するために使われていますが、サーバー側ではmainReactorが接続の確立の仕事をsubReactorに割り当てることはできません.サーバーは接続を確立してこそ新しい接続があることを知っています.サーバーは受動的なので、クライアントだけがmainReactorとsubReactorモデルをよく実行することができます.くだらないことを言い終えて、本題に入り続ける.
1.2、nettyサーバー側コード類説明
Netty 3.6.5バージョンではコードを大面積に再構築し、クラスファイルの命名も明らかになった.
Nettyのサーバ側のnioのrecatorは、org.jboss.netty.channel.socket.nio.NioServer Boss.java
Nettyのサーバ側のnioのワークスレッドはorg.jboss.netty.channel.socket.nio.NioWorker.javaです
nettyのワークスレッドは、サーバ側とクライアント側で共通です.
1.2.1、NioServer Boss機能方法 bind:主にサーバ側が起動するときにポート をバインドする run:NioServerBossはまずスレッドクラスであり、processメソッド を呼び出し続けます. process:主にSelectorを巡って新しいリクエストリンクが入っているかどうかを確認し、ある場合はリンクを確立し、registerAcceptedChannelを通じてリンクをワークスレッド(NioWorker)に登録します. registerAcceptedChannel:リンクをNioWorkerの1つに割り当てる責任を負います.
1.2.2、NioWorker作業スレッドの機能概要
NioWorkerの核心処理プロセスは実はその親AbstractNioWorkerの中で、主にprocess方法でその管理のリンクを処理して、その中でreadはサブクラスが具体的に実現する必要があって、nettyはいくつかの巧みな方法で読取過程を最適化してある程度性能を向上させて、本文はしばらくこの読取過程を分析しないで、まず大きな枠組みから描きます.
もちろんNioServerBossもNioWorkerも、新しいリンクを個別のスレッドで追加します.NioServerBossのbindメソッドは実はNioServerBossのRegisterTaskによって完成したもので、nettyがこのメカニズムを使ったのはCPUの使用率を高めるためだと思いますか?
1.3、nettyのサーバー側ネットワークモデルを模擬する
nettyのサーバ側ネットワークモデルをより深く理解するためには、コードで記述するのが最善の方法であり、できるだけ簡単なコードであるため、nettyのコードを参考にして、比較的簡単なモデルコードを抽出して以下のようにします.
コードを表示する前に、これらのクラスを大まかに説明します.
1、NioServices:サーバー側起動クラス、startUpはいくつかの初期化作業を行い、reactorつまりボススレッドをバインドする
2、Boss:NioServiceの内部クラス、1つのスレッド、絶えずSelectorをポーリングして要求の接続を処理して、接続が創立した後に指定のworkスレッドに割り当てて処理します
3、Work:読み終わって、業務ロジックを処理して、書く機能.
4、WorkPool:workスレッドのプールです.Bossスレッドはこのプールからワークスレッドの1つを取得します.
5、SimpleHandler:nettyをシミュレートするhandlerは、ビジネスロジックを処理するために使用され、中には一時的に2つの方法があり、1つはmessageReceivedであり、workスレッドデータの読み取りが完了した後にトリガーされ、1つは channelConnectedは、workスレッドが接続を登録するときにトリガーする(接続を登録するときは接続が確立したとき).
6、DefaultSimpleHandler:勝手に一つの業務処理ロジッククラスを実現し、SimpleHandlerを実現する.
recatorクラスとサーバ側起動クラスが一緒に書かれています.
ワーククラス
ワークプールクラス
handlerインタフェースおよび簡単なデフォルト実装クラス
2、nettyのクライアントネットワークモデル
2.1、nettyのクライアントモデルクラス説明
NettyのクライアントネットワークモデルはmainRecotorとsubRecotorモードを用い,mainReactorは接続と割り当て接続の確立を担当し,subRecotorはデータの読み書きとビジネスロジック処理を担当する. mainReactor:org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.conctメソッド subReactor:org.jboss.netty.channel.socket.nio.NioClientBoss.java subRecatorPool:org.jboss.netty.channel.socket.nio.NioClientBossPool.java workerスレッド:org.jboss.netty.channel.socket.nio.NioWorker.java NioClientSocketPipelineSinkは接続の最終処理を確立する終端クラスであり、次の記事ではnetty全体のアーキテクチャを分析する際に、NioclientSocketPipelineSinkが作成する際にNioClientBossPoolが要求され、デフォルトではNioClientBossスレッドが1つしかありません.NioClientSocketPipelineSinkのconnectメソッドを呼び出すと、nextBossによってNioClientBossが取得され、接続の作成と割り当てが具体的に処理されます.これがNioClientBossが主に行う作業です.
NioClientBossの主な論理はやはりその親(AbstractNioSelector)が完成し、NioClientBossのメソッド呼び出しロジックは以下の通りである.
process->processSelectedKeys->connect、connectメソッドのch.worker.registerは、指定したworkerスレッドに割り当てられた接続を実行することを意味します.
2.2、nettyのクライアントモデルの簡略化コード例
次の簡略化モデルでは、NioClientクラスはmainReacator、NioClientの内部クラスBossはsubReacator、workerスレッドは依然としてワークスレッドである.またクライアントが接続を確立するのは非同期モードであるため、接続を作成する前にデータを書くことができないため、netty方式に基づいてChannelFutureを構築して処理する.さらにFutureListinerをChannelFuture接続確立後のコールバッククラスとして使用する. NioClient:クライアントクラス、mainReacatoryモードとしても機能します.2つのconnectメソッドがあります.パラメータのないconnectメソッドは、デフォルトのChannelFutureを返します.パラメータのあるconnectは、ChannelFutureにリスナーを追加し、ChannelFutureが完了した後にデータを書き込む などの操作をトリガーすることができます. ChannelFuture:クライアントでmainReactorとsubReactorモデルを使用しているため、接続の作成は非同期のプロセスであるため、接続が完了するまでリスナーを追加してイベントをトリガすることもできるし、ChannelFutureのwaitUntilSuccessを呼び出して接続が作成されるまでブロックすることもできる. FutureListiner:ChannelFutureのリスナー、中には簡単なcallBack方法しかありません.これは最も簡単なリスナーです. Boss:SubReactorクラス、接続を確立し、そこに割り当てられたworkスレッドを処理するために使用される. .
NioClientクラス:
ChannelFutureクラス
FutureListinerクラス
DefaultClientSimpleHandlerクラス:
添付ファイルはすべてのソースファイルです.
1、nettyのサーバー側ネットワークモデル
個人的にnettyソースコードを読む場合、nettyのサーバ側が最も簡単なrecatorネットワークモデル、すなわち単一スレッドのrecatorモデルを使用していると考えられます.本明細書に関連するnettyソースバージョンは3.6.5であり、主にsocketプロトコルのnioを分析する.
1.1、nettyサーバー側の実際のモデル設計
NettyサーバコードにはNioServerBossPoolというクラスもあります.一見Nettyのサーバ側ネットワークアーキテクチャもMainReactorとSubReactorモードであるべきですが、サーバ側が複数のポートをバインドしている場合にのみNioServerBossPoolクラスが真の役割を果たすことがわかります.次にコードを追跡します.
ServerBootstrapのbindAsyncメソッドのgetFactory().newChannel->
NioServerSocketChannelFactoryのnewChannelメソッドには、return new NioServerSocketChannel(this,pipeline,sink,bossPool.nextBoss()workerPool)というコードが表示されます.
NioServerSocketChannelが1つのNioServerBossクラスにしか対応していないことがわかるので、サーバ側のネットワークモデルが最も簡単なReactorモデルだと結論しました.実際には理解に難くありませんが、subReactorは接続の確立と接続の割り当てを担当するために使われていますが、サーバー側ではmainReactorが接続の確立の仕事をsubReactorに割り当てることはできません.サーバーは接続を確立してこそ新しい接続があることを知っています.サーバーは受動的なので、クライアントだけがmainReactorとsubReactorモデルをよく実行することができます.くだらないことを言い終えて、本題に入り続ける.
1.2、nettyサーバー側コード類説明
Netty 3.6.5バージョンではコードを大面積に再構築し、クラスファイルの命名も明らかになった.
Nettyのサーバ側のnioのrecatorは、org.jboss.netty.channel.socket.nio.NioServer Boss.java
Nettyのサーバ側のnioのワークスレッドはorg.jboss.netty.channel.socket.nio.NioWorker.javaです
nettyのワークスレッドは、サーバ側とクライアント側で共通です.
1.2.1、NioServer Boss機能方法
1.2.2、NioWorker作業スレッドの機能概要
NioWorkerの核心処理プロセスは実はその親AbstractNioWorkerの中で、主にprocess方法でその管理のリンクを処理して、その中でreadはサブクラスが具体的に実現する必要があって、nettyはいくつかの巧みな方法で読取過程を最適化してある程度性能を向上させて、本文はしばらくこの読取過程を分析しないで、まず大きな枠組みから描きます.
もちろんNioServerBossもNioWorkerも、新しいリンクを個別のスレッドで追加します.NioServerBossのbindメソッドは実はNioServerBossのRegisterTaskによって完成したもので、nettyがこのメカニズムを使ったのはCPUの使用率を高めるためだと思いますか?
1.3、nettyのサーバー側ネットワークモデルを模擬する
nettyのサーバ側ネットワークモデルをより深く理解するためには、コードで記述するのが最善の方法であり、できるだけ簡単なコードであるため、nettyのコードを参考にして、比較的簡単なモデルコードを抽出して以下のようにします.
コードを表示する前に、これらのクラスを大まかに説明します.
1、NioServices:サーバー側起動クラス、startUpはいくつかの初期化作業を行い、reactorつまりボススレッドをバインドする
2、Boss:NioServiceの内部クラス、1つのスレッド、絶えずSelectorをポーリングして要求の接続を処理して、接続が創立した後に指定のworkスレッドに割り当てて処理します
3、Work:読み終わって、業務ロジックを処理して、書く機能.
4、WorkPool:workスレッドのプールです.Bossスレッドはこのプールからワークスレッドの1つを取得します.
5、SimpleHandler:nettyをシミュレートするhandlerは、ビジネスロジックを処理するために使用され、中には一時的に2つの方法があり、1つはmessageReceivedであり、workスレッドデータの読み取りが完了した後にトリガーされ、1つは channelConnectedは、workスレッドが接続を登録するときにトリガーする(接続を登録するときは接続が確立したとき).
6、DefaultSimpleHandler:勝手に一つの業務処理ロジッククラスを実現し、SimpleHandlerを実現する.
recatorクラスとサーバ側起動クラスが一緒に書かれています.
package org.ben.nio.multithread;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
/**
* @Auth ben.s
* @Email [email protected]
* @Date 2013-2-22
*
*
*/
public class NioService {
// CPU, Boss
private static WorkerPool workerPool = new WorkerPool();
//
public void startUp() {
try {
//
ServerSocketChannel serverSocketChannel = ServerSocketChannel
.open();
// , sokcet
serverSocketChannel.socket().bind(
new InetSocketAddress(InetAddress.getLocalHost(), 4444));
List handlers = new ArrayList();
handlers.add(new DefaultSimpleHandler());
workerPool.init(handlers);
Boss boss = new Boss(serverSocketChannel);
boss.start();
System.out.println(" ");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* Recator , , netty bind , , 。
*
* run netty NioServerBoss process 。
* registerAcceptedChannel worker 。
* @author shao
*
*/
private final class Boss extends Thread {
private final Selector selector;
private final ServerSocketChannel serverSocketChannel;
Boss(ServerSocketChannel serverSocketChannel) throws IOException {
this.selector = Selector.open();
this.serverSocketChannel = serverSocketChannel;
boolean registered = false;
try {
this.serverSocketChannel.configureBlocking(false);
this.serverSocketChannel.register(this.selector,
SelectionKey.OP_ACCEPT);
registered = true;
} finally {
if (!registered) {
closeSelector();
}
}
}
private void closeSelector() {
try {
selector.close();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void run() {
// TODO Auto-generated method stub
for (;;) {
try {
this.selector.select(1000);
Set selectedKeys = this.selector.selectedKeys();
if (selectedKeys.isEmpty()) {
continue;
}
for (Iterator i = selectedKeys.iterator(); i
.hasNext();) {
SelectionKey k = i.next();
i.remove();
for (;;) {
SocketChannel acceptedSocket = this.serverSocketChannel
.accept();
if (acceptedSocket == null) {
break;
}
registerAcceptedChannel(acceptedSocket);
}
}
} catch (Throwable e) {
e.printStackTrace();
}
}
}
private void registerAcceptedChannel(SocketChannel acceptedSocket) {
try {
workerPool.getReader().register((SocketChannel) acceptedSocket);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static void main(String[] ben) {
NioService ns = new NioService();
ns.startUp();
}
}
ワーククラス
package org.ben.nio.multithread;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
public class Worker extends Thread {
private List handlers;
// selector
private Selector selector;
private int id;
private final AtomicBoolean wakenUp = new AtomicBoolean();
public Worker(List handlers, int id) throws IOException {
this.handlers = handlers;
this.id = id;
this.selector = (this.selector == null) ? Selector.open()
: this.selector;
}
public void register(SocketChannel socketChannel) throws IOException {
if (socketChannel == null || socketChannel.isOpen()) {
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
for (SimpleHandler handler : handlers) {
handler.channelConnected(id, socketChannel, this);
}
// 。
// select() select(long) , 。
// , selectNow() , 。
// , 。 , select() select(long)
// 。
// 。
// wakeup, , wakeup , run
// if(wakenup.get())this.selector.wakeup() , channel , .
// A,B,C,D
// A this.selector.wakeup()
// B this.selector.wakeup(), A wakeup
// C D wakeup 。
// A B ,C D this.selector.select(500) 500ms ,
// if(wakenup.get())this.selector.wakeup() ( selector , ),
// C D 500ms , 。 C D , wakeup false, selector
if (wakenUp.compareAndSet(false, true)) {
this.selector.wakeup();
}
}
}
public void run() {
while (true) {
wakenUp.set(false);
try {
int selectNum = this.selector.select(500);
if (selectNum > 0) {
Set keys = this.selector.selectedKeys();
// , .
if (wakenUp.get()) {
this.selector.wakeup();
}
for (Iterator i = keys.iterator(); i
.hasNext();) {
SelectionKey k = i.next();
i.remove();
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_READ) != 0
|| readyOps == 0) {
if (!read(k)) {
// Connection already closed - no need to
// handle write.
continue;
}
}
/**
* OP_WRITER , , netty
* , , OP_WRITE 。
*/
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
writeFromSelectorLoop(k);
}
} catch (CancelledKeyException e) {
k.cancel();
} catch(IOException e){
System.out.println(" !");
k.cancel();
}
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
/**
* , 1024 , , , , , .
*
* @param key
* @return
* @throws IOException
*/
private boolean read(SelectionKey key) throws IOException {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer data = ByteBuffer.allocate(1024);
sc.read(data);
/**
* handlers messageReceived
* ,netty handler , , 。
*/
for (SimpleHandler handler : handlers) {
handler.messageReceived(id, data, sc, this);
}
return true;
}
//
private void writeFromSelectorLoop(SelectionKey k) {
// TODO Auto-generated method stub
}
//
private void write0(SocketChannel sc, ByteBuffer msg) throws IOException {
sc.write(msg);
}
//
public void writeFormUserCode(SocketChannel sc, ByteBuffer msg) {
try {
write0(sc, msg);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
ワークプールクラス
package org.ben.nio.multithread;
import java.util.ArrayList;
import java.util.List;
public class WorkerPool {
private static List workList = null;
private int workNum = 5;
private int nextId = 0;
public void init(List handlers) {
workList = new ArrayList();
for (int i = 0; i < workNum; i++) {
try {
Worker worker = new Worker(handlers, i);
workList.add(worker);
worker.start();
} catch (Throwable e) {
e.printStackTrace();
}
}
}
public Worker getReader() {
return workList.get(nextId++ % 5);
}
}
handlerインタフェースおよび簡単なデフォルト実装クラス
package org.ben.nio.multithread;
import java.nio.channels.SocketChannel;
/**
*
*
* @Auth ben.s
* @Email [email protected]
* @Date 2013-2-23
*/
public interface SimpleHandler {
public void messageReceived(int id,Object param,SocketChannel sc,Worker worker);
public void channelConnected(int id,SocketChannel sc,Worker worker);
}
package org.ben.nio.multithread;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
public class DefaultSimpleHandler implements SimpleHandler {
@Override
//netty SocketChannel worker, , writer ,
//netty socketChannel.write(msg);
public void messageReceived(int id,Object param, SocketChannel sc,Worker worker) {
System.out.println("["+System.currentTimeMillis()+"] ("+id+"):"+new String(((ByteBuffer)param).array()));
worker.writeFormUserCode(sc, ByteBuffer.wrap((" work "+id+"thanks to access!").getBytes()));
}
@Override
public void channelConnected(int id, SocketChannel sc,
Worker worker) {
// TODO Auto-generated method stub
}
}
2、nettyのクライアントネットワークモデル
2.1、nettyのクライアントモデルクラス説明
NettyのクライアントネットワークモデルはmainRecotorとsubRecotorモードを用い,mainReactorは接続と割り当て接続の確立を担当し,subRecotorはデータの読み書きとビジネスロジック処理を担当する.
NioClientBossの主な論理はやはりその親(AbstractNioSelector)が完成し、NioClientBossのメソッド呼び出しロジックは以下の通りである.
process->processSelectedKeys->connect、connectメソッドのch.worker.registerは、指定したworkerスレッドに割り当てられた接続を実行することを意味します.
2.2、nettyのクライアントモデルの簡略化コード例
次の簡略化モデルでは、NioClientクラスはmainReacator、NioClientの内部クラスBossはsubReacator、workerスレッドは依然としてワークスレッドである.またクライアントが接続を確立するのは非同期モードであるため、接続を作成する前にデータを書くことができないため、netty方式に基づいてChannelFutureを構築して処理する.さらにFutureListinerをChannelFuture接続確立後のコールバッククラスとして使用する.
NioClientクラス:
package org.ben.nio.multithread;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
public class NioClient {
private static int bossCount = 2;
private static AtomicLong count = new AtomicLong(0);
private static Boss[] bossPool = null;
private static WorkerPool workerPool = null;
static {
List handlers = new ArrayList();
handlers.add(new DefaultClientSimpleHandler("hello world!"));
if (workerPool == null) {
workerPool = new WorkerPool();
workerPool.init(handlers);
}
if (bossPool == null) {
bossPool = new Boss[bossCount];
for (int i = 0; i < 2; i++) {
Boss boss = new Boss(i);
boss.start();
bossPool[i] = boss;
}
}
}
private Boss nextBoss() {
Boss boss = bossPool[(int) count.longValue() % 2];
count.incrementAndGet();
return boss;
}
public ChannelFuture connect() {
ChannelFuture future = new ChannelFuture();
return this.connect(future);
}
public ChannelFuture connect(FutureListiner listiner) {
ChannelFuture future = new ChannelFuture();
future.addListiner(listiner);
return this.connect(future);
}
private ChannelFuture connect(ChannelFuture future) {
SocketChannel socketChannel;
try {
socketChannel = SocketChannel.open();
future.setSocketChannel(socketChannel);
nextBoss().registor(future);
return future;
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
}
static class Boss extends Thread {
private SocketChannel socketChannel;
private ChannelFuture future;
private Selector selector = null;
private int id;
public Boss(int id) {
try {
this.selector = (this.selector == null) ? Selector.open()
: this.selector;
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
this.id = id;
}
public void registor(ChannelFuture future) {
this.socketChannel = future.getSocketChannel();
this.future = future;
try {
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_CONNECT);
socketChannel.connect(new InetSocketAddress(InetAddress
.getLocalHost(), 4444));
this.selector.wakeup();
System.out.println("BOSS[" + this.id + "] connect ");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void run() {
for (;;) {
process();
}
}
private void process() {
int selectNum;
try {
selectNum = this.selector.select(500);
if (selectNum > 0) {
System.out.println("BOSS[" + this.id + "] connect ");
Set keys = selector.selectedKeys();
for (Iterator it = keys.iterator(); it
.hasNext();) {
SelectionKey key = it.next();
it.remove();
if (key.isConnectable()) {
SocketChannel channel = ((SocketChannel) key
.channel());
connect(channel);
key.cancel();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void connect(SocketChannel channel) throws IOException {
// 。
// 。
if (channel.isConnectionPending()) {
channel.finishConnect();
this.future.setSuccess();
workerPool.getReader().register(channel);
}
}
}
public static void main(String[] ben) throws IOException,
InterruptedException {
//
/**
* 4 SocketChannel,
*/
// NioClient nioClient = new NioClient();
// for(int i=0;i<4;i++){
// nioClient.connect();
// }
//
/**
* , , , , ,
* :how are you! :how are you!how are you!how are you!
*
* netty ? , , 。
*/
NioClient nioClient = new NioClient();
ChannelFuture future = nioClient.connect();
// , netty ChannelFuture , 。
System.out.println("before waitUntilSuccess,is connected:"
+ future.getSocketChannel().isConnected());
future.waitUntilSuccess();
System.out.println("after waitUntilSuccess,is connected:"
+ future.getSocketChannel().isConnected());
for (int i = 0; i < 4; i++) {
future.getSocketChannel().write(
ByteBuffer.wrap("how are you!".getBytes()));
}
//
/**
* listiner .
*/
/*
* NioClient nioClient = new NioClient(); ChannelFuture future =
* nioClient.connect(new FutureListiner() { public void
* callBack(ChannelFuture future) { try {
* future.getSocketChannel().write(
* ByteBuffer.wrap("how are you!".getBytes())); } catch (IOException e)
* { // TODO Auto-generated catch block e.printStackTrace(); } } });
*/
}
}
ChannelFutureクラス
package org.ben.nio.multithread;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* netty ChannelFuture
* asyncWriter , , , queue , 。
* @author shao
*
*/
public class ChannelFuture {
public SocketChannel socketChannel;
public boolean isSuccess = false;
private BlockingQueue queue = new LinkedBlockingQueue();
private List futureLintiners = new ArrayList();
public void addListiner(FutureListiner listiner){
this.futureLintiners.add(listiner);
}
public SocketChannel getSocketChannel() {
return this.socketChannel;
}
public void setSocketChannel(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
public boolean isSuccess() {
return isSuccess;
}
public void setSuccess() {
synchronized (this) {
System.out.println(" !");
if(this.socketChannel.isConnected()) {
this.isSuccess = true;
notifyListiner();
notifyAll();
}
}
}
private void notifyListiner(){
for(FutureListiner listiner : futureLintiners){
listiner.callBack(this);
}
}
public void asyncWrite(ByteBuffer msg) throws InterruptedException {
queue.put(msg);
}
public void waitUntilSuccess() {
synchronized (this) {
try {
wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
FutureListinerクラス
package org.ben.nio.multithread;
public interface FutureListiner{
public void callBack(ChannelFuture future);
}
DefaultClientSimpleHandlerクラス:
package org.ben.nio.multithread;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
public class DefaultClientSimpleHandler implements SimpleHandler{
private String firstData;
public DefaultClientSimpleHandler(String data){
this.firstData = data;
}
@Override
public void messageReceived(int id, Object param, SocketChannel sc,
Worker worker) {
ByteBuffer data = (ByteBuffer) param;
System.out.println(" work 【"+id+"】 :"+new String(data.array()));
}
@Override
public void channelConnected(int id, SocketChannel sc, Worker worker) {
// TODO Auto-generated method stub
ByteBuffer bf = ByteBuffer.wrap(firstData.getBytes());
System.out.println(" !");
worker.writeFormUserCode(sc, bf);
}
}
添付ファイルはすべてのソースファイルです.