nettyはhttpの長い接続を実現します。
最初はこの長接続httpクライアントを実現する前に非常に簡単だと思っていましたが、実現する過程で、異なるサーバが接続の処理方法についてはまだ違っています。
考え方:
1.クライアントとサーバの前に複数のsocket接続(デフォルト8条)を確立し、要求を送信する時にランダムに一つを選択して送信します。データを送信するときは、各接続にhttp pipelineを使用する方式(一つの要求が送信された後、サーバーがこの要求に応答するのを待つ必要はなく、直接に次の要求を送信する)。ここで一つの問題があります。要求はどんどん発送されます。サーバーは各要求に対する応答をどんどん送ってきます。要求と応答はどのように対応しますか?使用者にとって、彼は要求ごとの応答が何であるかを知る必要がある。nettyを使ってこの問題を解決するコードは以下の通りです。rpollingc-core/src/main/java/com/zhw/rpollingc/http/conn/NettyHttp Handler.java。http仕様によると、同じ接続上の要求は先に送信され、先に応答した要求は、そのレスポンスが先に送られます。作り方はnettyのhandlerの中で、もし一つの要求が成功すれば、それを一つの列の最後に置いて、http応答を受け取ったら、キューヘッドの要求対象はそれに対応する要求内容です。したがって、このハンドルの前に、http reponseを解析するデコーダを配置する必要があります。性能のためにhttpのエンコーダをhandlerに置いていません。handlerの実行はnettyのioスレッドで実行されますので、このhandlerではbytebufとnettylの復号を受けたfullhttpresponseだけを送っています。
2.多くのhttpサーバが一つの接続に一定量の要求を処理した後、この接続をcloseに落とします。例えば、nginx、デフォルトの一つの接続は100個の要求だけ処理します。処理が終わったら、この接続を強制的にクローズします。もちろん、keepalive_を設定することによってもいいです。このパラメータは修正した数を取ります。上記のpipelineの特性を使っていますので、リクエストはどんどん送ります。サーバが100個の要求を処理し終わったら、クライアントは120個の要求を送信したかもしれません。残りの要求はサーバーのバッファにあるか、あるいはクライアントのバッファにあります。サーバーは残りの要求データを読み取って接続を直接オフにして、finパッケージを送ってくれますか?キティちゃんはこのfinパッケージを受け取って、サーバーが接続をオフにしたことを知っています。この時のnettyのやり方は、まだnetty緩衝区で発送されていない要求で、closedChanelの異常通知を受けます。発送済みの要求に対しても、サーバーが直接に破棄しました。サーバの応答は永遠に受け入れられません。ですから、この二つの部分の要求は改めて送らなければなりません。これらの2つの問題を解決する方法は、1)送信済みの要求について、送信が成功したら上記のキューに保存されるため、サーバがレスポンスを送信するのを待っています。接続がオフになったら、キューの中の要求はサービスによって破棄された要求を送信するので、この部分の要求を再送信する必要があります。処理方法は、サーバのすべての接続をリングチェーンにして、このリングチェーンに沿って自分以外のすべてのノードを順番に送ってください。2)nettyキャッシュで未来及びnettyによって送信される要求に対して、nettyは各要求に対してclosedchannel Exceptionに出発する異常がありますので、このような異常が発生したら、現在のリングチェーンの次のノードにも送る必要があります。直接throwを呼び出し者に渡すことはできません。この部分のコードはcom/zhw/rpollingc/http/conn/HttpConnection.javaにあります。
3.サーバーは一定数の要求を受けてから接続を切断しますので、新しいsocketchanelを自動的に再開する必要があります。リングチェーンテーブルの各ノードが保持している接続については、自動的に再接続中である可能性があり、マージンを向上させるために、このノードの接続が再接続中であることが検出された場合は、次のノードに渡して送信を継続する必要があり、順次に複数の点を探した後(デフォルト6つ)、依然として再接続中である場合は、この時にとったソリューションは接続の切断時間を確認し、接続が切断されてからしばらく(設定値により)、サーバが接続できなくなっている可能性があると説明した場合、直接に呼び出し側に異常を投げてください。接続切断時間が長くない場合は、接続が接続された後に送信されるか、または再接続が一定回数後にこの部分の要求に対して異常処理を行うために、1つのキューに保存する必要があります。併発量を高めるために、casを使ってコム/zhw/rpollingc/utils/AtomicArayCollect.javaを書きました。
4.前の期間に接続しても送信要求がない場合、サーバは接続を停止します。ここでは、接続を直接オフにしたり、heartbeatのurlをセットしたりして、空き時間に接続してから心臓の鼓動を送ります。一番核心のHttpConnection.javaのコードを貼ります。
package com.zhw.rpollingc.http.conn;
import com.zhw.rpollingc.common.RpcException;
import com.zhw.rpollingc.http.NettyConfig;
import com.zhw.rpollingc.utils.AtomicArrayCollector;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.HashedWheelTimer;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.Slf4JLoggerFactory;
import java.nio.channels.ClosedChannelException;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
/**
* auto reconnect http connection.
*
* {@link NettyHttpHandler} use send request with http1.1 and keep alive the real tcp connection.
* it will send request continuously even if the response of last request not yet received,
* but much server will force closing connection after accepted number of http requests,
* even if using http1.1,such as nginx(max_requests=..),tomcat.
* so auto reconnect to the server and resend request is necessary.
*
*
* during reconnecting this class can receive user http requests and entrust next connect to send.
* so the best way is creating the http connection object and use it to be one of the node of cycle linked list.
* and it will resend the requests on http pipeline when the server closed.
* you can random to pick one connection to send request. this way also can increment the throughput
*
*/
public class HttpConnection implements HttpEndPoint, NettyHttpHandler.Listener {
private static final HashedWheelTimer RECONNECT_TIMER = new HashedWheelTimer();
private static final InternalLogger log = Slf4JLoggerFactory.getInstance(HttpConnection.class);
private final Bootstrap bootstrap;
private final long maxWaitingOpenTime;
private final NettyConfig conf;
private final AtomicArrayCollector waitingQueue;
private volatile boolean userClose = false;
HttpConnection next;
private volatile Channel channel;
private long lastCloseTime = -1;
// 128
public HttpConnection(Bootstrap bootstrap, NettyConfig conf) {
this.conf = conf;
waitingQueue = new AtomicArrayCollector<>(conf.getMaxWaitingReSendReq());
maxWaitingOpenTime = conf.getMaxWaitingOpenTimeMs();
bootstrap.handler(new ChannelInitializer() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast("idle-handler", new IdleStateHandler(0,
0,
conf.getIdleHeartbeatInterval()));
ch.pipeline().addLast("http-codec", new HttpResponseDecoder());
ch.pipeline().addLast("response-body-cumulate", new HttpObjectAggregator(conf.getMaxRespBodyLen(),
true));
ch.pipeline().addLast("http-pipeline-handler", new NettyHttpHandler(HttpConnection.this));
}
});
this.bootstrap = bootstrap;
}
@Override
public void send(HttpRequest request) throws RpcException {
send(request, 0);
}
void send(HttpRequest request, int times) {
Channel channel = this.channel;
if (channel == null) {
long lastCloseTime = this.lastCloseTime;
long now = System.currentTimeMillis();
if (userClose) {
request.onErr(new RpcException("user close connection"));
} else if (times < 6) {
// ,
next.send(request, times + 1);
} else if (lastCloseTime > 0L) {
if (now - lastCloseTime > maxWaitingOpenTime) {
// 。
request.onErr(new RpcException("closed connection"));
} else {
//
int offer = waitingQueue.offer(request);
//
if (offer == 0) {
return;
}
if (offer < 0) {
// null, , ,
request.onErr(new RpcException("to many request waiting closed connection"));
} else {
// collecting,
send(request, times);
}
}
} else if (lastCloseTime == -2) {
// channel == null, lastCloseTime==-2.
//
send(request, times);
} else {
// lastCloseTime==-1
request.onErr(new IllegalStateException("sending after calling connect()"));
}
} else {
doWrite0(channel, request);
}
}
private void doWrite0(Channel channel, HttpRequest request) {
ByteBuf reqByteBuf = request.getReqByteBuf();
// +1, netty
reqByteBuf.retain();
ChannelFuture future = channel.writeAndFlush(request);
future.addListener(f -> {
if (!f.isSuccess()) {
Throwable cause = f.cause();
if (cause instanceof ClosedChannelException) {
// write netty 。 ,
// netty , , times 0,
next.send(request, 0);
} else if (cause != null) {
request.onErr(new RpcException("connection error", cause));
} else {
request.onErr(new RpcException("send occur unkown error"));
}
}
});
}
@Override
public void connect() {
doConnect(false);
}
private void doConnect(boolean asyncAndReconnect)
throws RpcException {
if (userClose) {
throw new IllegalStateException("user closed");
}
ChannelFuture future = bootstrap.connect();
if (asyncAndReconnect) {
future.addListener(f -> {
if (!f.isSuccess()) {
Throwable cause = f.cause();
String errMsg = cause != null ? cause.getMessage() : "UNKWON ERROR";
log.error("connect error with conf " + conf + " cause:" + errMsg);
long now = System.currentTimeMillis();
if (now - lastCloseTime > maxWaitingOpenTime) {
// 。
// , 。
Iterator iterator = waitingQueue.collect();
if (iterator.hasNext()) {
// userClose, null
RpcException exp = new RpcException("closed connection and reconnect failed:" + errMsg);
for (; iterator.hasNext(); ) {
HttpRequest req = iterator.next();
req.onErr(exp);
}
}
} else if (userClose) {
Iterator iterator = waitingQueue.collect();
if (iterator.hasNext()) {
// userClose, null
RpcException exp = new RpcException("waiting util user closed connection");
for (; iterator.hasNext(); ) {
HttpRequest req = iterator.next();
req.onErr(exp);
}
}
return;
}
scheduleReconnect();
}
});
} else {
future.awaitUninterruptibly();
if (future.isSuccess()) {
return;
}
Throwable cause = future.cause();
if (cause != null) {
throw new RpcException("connect failed with conf" + conf, cause);
} else {
throw new RpcException("unkown reason connect failed with conf" + conf);
}
}
}
@Override
public void close() {
userClose = true;
Channel channel = this.channel;
if (channel != null) {
this.channel = null;
channel.close();
}
}
@Override
public void onOpened(Channel ch) {
if (userClose) {
ch.close();
Iterator iterator = waitingQueue.collect();
if (iterator.hasNext()) {
// userClose, null
RpcException exp = new RpcException("user closed connection");
for (; iterator.hasNext(); ) {
HttpRequest req = iterator.next();
req.onErr(exp);
}
}
return;
}
this.channel = ch;
this.lastCloseTime = -2;
Iterator iterator = waitingQueue.collect();
for (; iterator.hasNext(); ) {
doWrite0(ch, iterator.next());
}
}
@Override
public void onClosed(Collection reqs) {
lastCloseTime = System.currentTimeMillis();
if (userClose) {
return;
}
HttpConnection next = this.next;
if (next == null || next == this) {
Iterator iterator = reqs.iterator();
while (iterator.hasNext() && waitingQueue.offer(iterator.next()) < 0) ;
if (iterator.hasNext()) {
RpcException exp = new RpcException("to many req waiting unopened connection");
do {
iterator.next().onErr(exp);
} while (iterator.hasNext());
}
channel = null;
} else {
channel = null;
for (HttpRequest req : reqs) {
next.send(req);
next = next.next;
if (next == this) {
next = next.next;
}
}
}
doConnect(true);
}
private void scheduleReconnect() {
if (userClose) {
return;
}
RECONNECT_TIMER.newTimeout(timeout -> {
if (!timeout.isCancelled()) {
doConnect(true);
}
}, 1000, TimeUnit.MILLISECONDS);
}
}