Netty 4.0 Androidでの使用、長接続断線自動再接続
12299 ワード
原文アドレスは最近Netty非同期通信フレームワークを使用しており、使用中にネットワークが切断されたときに検出できない現象が発生することが分かった.
長いリンクの切断に影響する原因は主に3種類ある:1.サービス停止、2.ローカルネットワークケーブルが切断されました.3.パブリックネットワークまたはローカルエリアネットワークでスイッチが切断されました.
使用中にサービスが停止したりローカルネットワークが切断されたりしたときにnettyの@ChannelHandlerのchannelInactiveが呼び出されることがわかりましたが、パブリックネットワークやローカルエリアネットワークスイッチが直接ネットワークが切断された場合、すぐにchannelInactiveのコールバックを受け取ることはできません.IdleStateHandler関数でコールバックを設計しました心拍数データを受信するたびに遅延送信の関数を書き、遅延心拍数時間に心拍数を送信します.
もういいからコードをつけましょう.本当に書けません.
NettyClientBootstrap androidクライアント起動クラス
長いリンクの切断に影響する原因は主に3種類ある:1.サービス停止、2.ローカルネットワークケーブルが切断されました.3.パブリックネットワークまたはローカルエリアネットワークでスイッチが切断されました.
使用中にサービスが停止したりローカルネットワークが切断されたりしたときにnettyの@ChannelHandlerのchannelInactiveが呼び出されることがわかりましたが、パブリックネットワークやローカルエリアネットワークスイッチが直接ネットワークが切断された場合、すぐにchannelInactiveのコールバックを受け取ることはできません.IdleStateHandler関数でコールバックを設計しました心拍数データを受信するたびに遅延送信の関数を書き、遅延心拍数時間に心拍数を送信します.
もういいからコードをつけましょう.本当に書けません.
NettyClientBootstrap androidクライアント起動クラス
//netty
public class NettyClientManager{
private String host; //ip
private int port; //
private EventLoopGroup group;//EventLoop
private Bootstrap b;
private Channel ch;
private ScheduledExecutorService executorService;
// N
private static final int RE_CONN_WAIT_SECONDS = 5;
// ,
private static final int WRITE_WAIT_SECONDS = 7;
//
private boolean isStop = false;
private final String TAG = "NettyClientBootstrap";
//
private ITCPStateListener mStateListener;
//handler
private NettyClientHandler mNettyClientHandler;
private boolean isOnline =false;
public NettyClientBootstrap(String host, int port) {
this.host = host;
this.port = port;
group = new NioEventLoopGroup();
b = new Bootstrap();
mNettyClientHandler = new NettyClientHandler(mListener);
b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
/**
*
*
**/
ChannelPipeline pipeline = ch.pipeline();
// Decoders
pipeline.addLast("frameDecoder", new LineBasedFrameDecoder(1024 * 1024 *
1024));
pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
// Encoder
pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast("lineEncoder", new LineEncoder(LineSeparator.UNIX,
CharsetUtil.UTF_8));
// IdleStateHandler userEventTriggered
pipeline.addLast("ping", new IdleStateHandler(WRITE_WAIT_SECONDS,WRITE_WAIT_SECONDS, WRITE_WAIT_SECONDS, TimeUnit.SECONDS));
//
pipeline.addLast("handler", mNettyClientHandler);
}
});
}
//
public void onStart() {
new Thread() {
@Override
public void run() {
connServer();
super.run();
}
}.start();
}
//
public void onStop() {
isStop = true;
if (ch != null && ch.isOpen()) {
ch.close();
}
if (executorService != null) {
executorService.shutdown();
}
}
//
private void connServer() {
Log.e(TAG, "connServer ServerIP = " + IStatic.ServerIP + " ;tcpPort = " + IStatic.tcpPort);
isStop = false;
if (executorService != null) {
executorService.shutdown();
}
// ( ) ,
executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleWithFixedDelay(new Runnable() {
boolean isConnSucc = true;
@Override
public void run() {
try {
//
if (ch != null && ch.isOpen()) {
Log.e(TAG, " ch != null && ch.isOpen() ");
ch.close();
}
ch = b.connect(host, port).sync().channel();
//
// ch.closeFuture().sync();
System.out.println("connect server finish");
Log.e(TAG, "connect server finish");
} catch (Exception e) {
e.printStackTrace();
Log.e(TAG, e.toString());
isConnSucc = false;
} finally {
System.out.println("executorService.shutdown before");
Log.e(TAG, "executorService.shutdown before isConnSucc = " + isConnSucc);
if (isConnSucc) {
if (executorService != null) {
executorService.shutdown();
}
}
System.out.println("executorService.shutdown after");
Log.e(TAG, "connect server finish isConnSucc = " + isConnSucc);
}
}
}, RE_CONN_WAIT_SECONDS, RE_CONN_WAIT_SECONDS, TimeUnit.SECONDS);
}
// NettyClientHandler
private ITCPStateListener mListener = new ITCPStateListener() {
@Override
public void online() {
// handler , main ,
if(!isOnline)
{
if (mStateListener != null)
mStateListener.online();
new Thread(){
@Override
public void run() {
super.run();
// executorService ,
if(!executorService.isShutdown())
{
executorService.shutdown();
Log.e(TAG,"executorService is not Shutdown");
}else {
Log.e(TAG,"executorService is Shutdown");
}
}
}.start();
}
isOnline=true;
}
@Override
public void offline() {
if(isOnline)
{
if (mStateListener != null)
mStateListener.offline();
}
isOnline=false;
if (!isStop) {
new Thread() {
@Override
public void run() {
super.run();
try {
sleep(5 * 1000);
/*
*
*/
connServer();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
}
}
};
//
public void sendMsg(String msg)
{
mNettyClientHandler.sendMsg(msg);
}
//
public void setStateListener(ITCPStateListener stateListener) {
mStateListener = stateListener;
}
//
public void setInfoListener(ITCPInfoListener infoListener) {
mNettyClientHandler.setInfoListener(infoListener);
}
}
Nettyクライアントのエントリプログラムが完了したら、次はより主要なhandler実装プログラムを完了します.@ChannelHandler.Sharable
public class NettyClientHandler extends SimpleChannelInboundHandler {
private ITCPStateListener mStateListener;
private final int HEART_FRESH_TIME = 5000;//
private boolean isonline = false;
private String HEART_FRESH = "";
private final String TAG = "NettyClientHandler";
private ChannelHandlerContext mctx;
private ITCPInfoListener mInfoListener;
private SFresh mFresh = null;
public NettyClientHandler(ITCPStateListener StateListener) {
mStateListener = StateListener;
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
//
MyLogger.e(TAG, "=== === (Reader_IDLE )");
ctx.channel().close();
} else if (event.state() == IdleState.WRITER_IDLE) {
//
MyLogger.e(TAG, "=== === (Reader_IDLE )");
ctx.channel().close();
} else if (event.state() == IdleState.ALL_IDLE) {
//
MyLogger.e(TAG, "=== === (ALL_IDLE )");
ctx.channel().close();
}
}
}
// handler , , ,
// ,
// ,
private Handler mHanler = new Handler(){
@Override
public void handleMessage(Message msg) {
mHanler .removeMessage(0);
//
}
};
@Override
protected void channelRead0(ChannelHandlerContext ctx, String message) throws Exception {
// channelActive ,
if (!isonline) {
mStateListener.online();
isonline = true;
}
String action = JSON.parseObject(message, JsonBean.class).action;
if(action.equls("fresh")){
// ,
mHanler .removeMessageDelay(0,5000);
// 5s ,
// ,
}
// , , json , fastjson ,
}
//
@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
System.out.println("Client active ");
MyLogger.e(TAG, "Client active ");
this.mctx = ctx;
super.channelActive(ctx);
setHeartInfo(1);
// 2s
Observable.timer(2, TimeUnit.SECONDS)
.subscribe(l->{
sendHeartData();
},e->{
MyLogger.e(TAG, "=== === e = "+e.toString());
ctx.channel().close();
});
}
// , close
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Client close ");
MyLogger.e(TAG, "Client close ");
super.channelInactive(ctx);
isonline = false;
mctx.close();
this.mctx = null;
mHeartTimer.cancel();
if (mStateListener != null)
mStateListener.offline();
}
//
public boolean sendMsg(String message) {
if (mctx != null) {
try {
MyLogger.e(TAG, "sendMsg = " + mctx.isRemoved());
MyLogger.e(TAG, "sendMsg = " + message);
mctx.channel().writeAndFlush(message);
return true;
} catch (Exception e) {
return false;
}
}
return false;
}
//
private void sendHeartData() {
try {
MyLogger.e(TAG, " HeartTask ctx = " + mctx);
MyLogger.e(TAG, " HeartTask HEART_FRESH = " + HEART_FRESH);
if (mctx != null) mctx.channel().writeAndFlush(HEART_FRESH);
if (mFresh.getLogin() == 1) {
setHeartInfo(0);
}
} catch (Exception e) {
MyLogger.e(TAG, " sendHeartData e = " + e.toString());
}
}
//
private void setHeartInfo(int first) {
if (mFresh == null) {
mFresh = new SFresh();
mFresh.setIp(IStatic.IP);
mFresh.setMac(IStatic.MAC);
}
mFresh.setLogin(first);
HEART_FRESH = JSON.toJSONString(mFresh);
}
//
public void setInfoListener(ITCPInfoListener infoListener) {
mInfoListener = infoListener;
}
}
初めて書くのも、主にコードを貼って、みんなにチェックしてもらうことができて、その中にある可能性のあるバグや不足点を読むことができることを望んでいます.またrxjava非同期メカニズムを強くお勧めします.書かれたコードは本当にきれいです.