NettyでRedisを実現(2)
13389 ワード
これはNettyで実現され、マルチスレッドと非同期呼び出し、lock-freeをサポートするJavaRedisクライアントです.
(サーババージョン:https://blog.csdn.net/pp634077956/article/details/82178710)
github: https://github.com/pyb1993/JavaRedisClient
目的1 JavaRedisサーバによるデバッグ
目的2各種非同期コールバック/futureのメカニズムを学習するために用いる
目的3スレッドセキュリティの問題を学習するために使用する
現在実装されている機能:
コードの考え方を見る:
使用方法
解釈:実際には伝統的なコールバックの書き方で、
Version-1アーキテクチャ
Nettyで実装されたクライアントを使用して、スレッドプールを保存し、Nettyオリジナルのコールバックを使用して簡単なパッケージを行います.
大まかな考え方:
接続プール:
同期と非同期:
マルチスレッドについて:
非同期の具体的な実装:
1.まず、接続フェーズの非同期実装です.
2.次に、送信コマンドの実装(sendInternal 2):
説明:
不足:の最大の問題はコールバック地獄 と書くしかない.再接続なし、負荷等化などの機能 読み書きタイムアウトの制御はサーバによって実現されるが,サーバは長リンクの制御であり,クライアント自身にも構成可能な
Version-2改良
1.考え方
Javascript Promiseという呼び出しにヒントを得たので、javaの中で1つ真似しましたが、違いはPromiseがキャンセルできず、最初の論理がすぐに実行されたことです.まずRedisFutureを拡張します.タイプの定義は次のとおりです.
Listenerは以前と似ていて、setSuccessが呼び出されたときにのみ実行を通知されます.この特徴を利用して、listenerの中で次のコールバックが実行を開始することを通知する論理を設定しなければなりません.私はこの関数を
実は従来の
では、
例:
明らかに、
Version 3アーキテクチャ
改善1:
改善2:
構想:チャネルベースのIO多重化とは,pipelineと同様に1つのチャネル上で複数のコマンドを同時に待つメカニズムである.これはhttp 2とは異なり、http 2は本質的に複数のリクエストを乱順に実行することができるが、Javaredis自体は単一スレッドのモデルであり、得られる結果は一般的に非常に短いため、シーケンス実行は十分速い(結果の量が非常に大きく、例えば多くのピクチャのように、N個のリクエストがそれぞれ異なるピクチャを転送することができる)
したがって、requestIdとfutureとのつながりを記録することで、要求が来た後、非同期実行のfutureがコールバックを実行できることを通知することができる.これだけの記録ですが、実現にはかなりの細部が考慮されています.次に、最も重要ないくつかの点についてお話しします.
マルチスレッドの問題を解決し、パフォーマンスとコードロジックの簡単さを保証するために、複雑な状態制御を行わないことを選択しました.
次に、接続プールのポリシーを考慮します.
パフォーマンスの向上:
V 2バージョンに比べて多くの性能(特に非同期操作)が向上し、現在、私のマシン(Mac 2.3 GHz Intel Core i 5、8 GB 2133 MHz LPDDR 3)でサーバとクライアントを同時に実行しています....同期単一スレッドは7500 QPSに達することができる…同期8スレッドは16000 QPSに達することができる…非同期単一スレッド:20000 QPS…非同期4スレッド:23000 QPS
(サーババージョン:https://blog.csdn.net/pp634077956/article/details/82178710)
github: https://github.com/pyb1993/JavaRedisClient
目的1 JavaRedisサーバによるデバッグ
目的2各種非同期コールバック/futureのメカニズムを学習するために用いる
目的3スレッドセキュリティの問題を学習するために使用する
現在実装されている機能:
1.
2. + future ( )
3.
4. ( )
コードの考え方を見る:
1 RedisClient
2 getAsync
, / ,handler
3 get
使用方法
:
@Test
public void getSetTest(){
int connNum = 100000;
CountDownLatch c = new CountDownLatch(connNum);//todo XXXX
RedisClient client = new RedisClient("127.0.0.1", 3333);
for(int i = 0; i < connNum; ++i){
String value = " " + i + " ";
client.set(i + "",value);
String result = client.get(i + "");
assert result.equals(value);
}
}
@Test
public void asyncGetSetTest() throws Exception{
//Logger.setDebug();
int connNum = 10000;
CountDownLatch c = new CountDownLatch(connNum);//todo XXXX
try (RedisClient client = new RedisClient("127.0.0.1", 3333)) {
// todo , ( , response )
testForOne(client,c,connNum);
}
c.await();
RedisClient.stop();//
}
/**
* ,
* client, countDownLatch c( , )
* **/
private void testForOne(RedisClient client,CountDownLatch c,int taskLoad){
for (int _i = 0; _i < taskLoad; _i++) {
final int i = _i;
String val = " ,master" + _i;
client.setAsync(_i + "",val ).addListener(future -> {
RedisFuture r = client.getAsync(i + "");
r.addListener(f -> {
c.countDown();
String result = (String)f.get();
assert result.equals(val);
});
});
};
}
解釈:実際には伝統的なコールバックの書き方で、
addListener
は実際にNettyが提供していることに注意して、もし多くのlistener
を追加すれば、それは次から次へと実行され、listener
の中にはブロック操作を書くことができないので、中に非同期操作があれば、コールバックセットのコールバックしかできません.Version-1アーキテクチャ
Nettyで実装されたクライアントを使用して、スレッドプールを保存し、Nettyオリジナルのコールバックを使用して簡単なパッケージを行います.
大まかな考え方:
decoder fastJson , ResponseHandler
, ,
future , connect
接続プール:
, EventLoop
同期と非同期:
Netty ChannelFuture ,
,
, netty `.sync` ,
マルチスレッドについて:
Handler , handler
, `ConcurrentLinkedQueue` `AtomicInteger`
非同期の具体的な実装:
1.まず、接続フェーズの非同期実装です.
,
1 connect
2
, :
Channel( ),
`sendInternal2` , , handler
, connect :
ChannelFuture cFuture = bootstrap.connect();
cFuture.addListener(f -> {
if(f.isSuccess()) {
sendInternal2(new RedisConnection(cFuture.channel()), future, type, payload);
}else{
future.setFailure(f.cause());
Logger.debug("connect failed:
" + f.cause());
}
});
2.次に、送信コマンドの実装(sendInternal 2):
private RedisFuture sendInternal2(Channel output,RedisFuture future, String type, Object payload) throws Exception {
// step2,
ByteBuf buf = PooledByteBufAllocator.DEFAULT.directBuffer();
final String reqId = RequestId.next();
writeStr(buf, reqId);
writeStr(buf, type);
writeStr(buf, JSON.toJSONString(payload));
Logger.debug(("send payload:" + JSON.toJSONString(payload)));
// todo
// handler,
while (output.pipeline().last() != null){
output.pipeline().removeLast();
}
// , decoder reqId, id
ResponseDecoder decoder = new ResponseDecoder();
decoder.setReqId(reqId);
output.pipeline().addLast(decoder);
// handler, handler future
RedisResponseHandler notifier = new RedisResponseHandler(future,this);//notifier future
output.pipeline().addLast(notifier);
output.writeAndFlush(buf);
// handler, RedisFuture
return future;
}
説明:
: Future Handler , RedisResponseHandler Future ,
* handler future
* :
* Future(Parameters)------> listener callback( type payLoad, )
* | |
* | | SendInteral2
* | |
* |-------------->decoder,responseHandler(Read Future1 )
不足:
callback hell
が存在することであり、コールバックにはブロック操作(nettyの制限)が許されないため、コールバックネストコールの方式readTimeout
,writeTimeout
のようなパラメータを加える必要があり,タイムアウト制限を加えると同時に性能が低下する.Version-2改良
RedisFuture
構造に改造され、従来のコールバック方式を残す場合、性能はやや劣るが、コールバック地獄を解消できる方式を提供する.1.考え方
Javascript Promiseという呼び出しにヒントを得たので、javaの中で1つ真似しましたが、違いはPromiseがキャンセルできず、最初の論理がすぐに実行されたことです.まずRedisFutureを拡張します.タイプの定義は次のとおりです.
public class RedisFuture extends DefaultPromise
Listenerは以前と似ていて、setSuccessが呼び出されたときにのみ実行を通知されます.この特徴を利用して、listenerの中で次のコールバックが実行を開始することを通知する論理を設定しなければなりません.私はこの関数を
notifyNextListener
にカプセル化します.そのため、listenerの中で他の非同期操作を続けたら、では、この非同期操作にコールバックを設定し、このコールバックでnotifyNextListener
を呼び出すことができます.notifyNextListener
の定義を見てみましょう. public void notifyNextListener(Object result){
if(next != null){
next.setSuccess(result);
}
}
setSuccess
を見てみましょう @Override
public RedisFuture setSuccess(Object result){
super.setSuccess(result);
// listener( listener「 」 next)
if(listener != null){
loop.execute(()-> listener.accept(this,result));
}
return this;
}
実は従来の
ChannelPromise
の論理を除いて、listener
に対する呼び出しを1つ増やしただけです.では、
next
はどこで生成されたのでしょうか.またどのようにlistener
にバインドされているのか、上のコードを見るとthen
という関数で生成されていることがわかりますので、then
は実際には構造関数であり、新しいRedisFuture
を生成するだけで、チェーン呼び出しを実現することができ、非同期の操作を同期の書き方のように見ることができます.実は私が非同期のコールバックの詳細を隠しただけです.このようにコールバックネストは最大2層を超えないが、性能はやや劣る.例:
private void testForThen(RedisClient client,CountDownLatch c,int taskLoad){
for (int _i = 0; _i < taskLoad; _i++) {
final int i = _i;
String val = "(hello) , (pig monster)" + _i;
client.setAsync(_i + "",val ).then((future,result) -> {
RedisFuture r = client.getAsync(i + "");
r.addListener(f -> future.notifyNextListener(f.get())); // get
}).then((future,result)->{
c.countDown();
assert result.equals(val);
future.notifyNextListener(1);
});
};
}
明らかに、
addListener
を使用して、最初のコールバックが完了すると2番目のコールバックが実行されるのではなく、いつ次のコールバックが実行されるかを制御することができます.Version 3アーキテクチャ
改善1:
Channel IO ,
改善2:
, , 。
構想:チャネルベースのIO多重化とは,pipelineと同様に1つのチャネル上で複数のコマンドを同時に待つメカニズムである.これはhttp 2とは異なり、http 2は本質的に複数のリクエストを乱順に実行することができるが、Javaredis自体は単一スレッドのモデルであり、得られる結果は一般的に非常に短いため、シーケンス実行は十分速い(結果の量が非常に大きく、例えば多くのピクチャのように、N個のリクエストがそれぞれ異なるピクチャを転送することができる)
したがって、requestIdとfutureとのつながりを記録することで、要求が来た後、非同期実行のfutureがコールバックを実行できることを通知することができる.これだけの記録ですが、実現にはかなりの細部が考慮されています.次に、最も重要ないくつかの点についてお話しします.
1 : v2 ,Channel 。v3 , channel , channel 。 channel , eventLoop 。
, channel , : eventLoop 。 , 。 。
2 :
,
, handler (v2 addOrClose ), handler
, , :
1
2
3
マルチスレッドの問題を解決し、パフォーマンスとコードロジックの簡単さを保証するために、複雑な状態制御を行わないことを選択しました.
`ConcurrentLinkedQueue`, `Connection` ( `eventloop` `handler` Connection )
, Connection pool ,
(eventloop )
eventloop , Connectin ( )
, :
capacity ? add capacity, `rebalanceNow` , , , ( eventLoop , pool , )
: pool capacity , rebalance , , ( )。
次に、接続プールのポリシーを考慮します.
1 :
/**
* pool, , ,
* cmdEachConnection:
* 1. ( ) capacity,
* 2. ,
* ( )
* , (RedisConnection.incrementConnectionNum();)
* ----------- RedisConnection incrementConnectionNum()? Connect , , 。 , 。
*
* */
public RedisConnection getConnection(){
RedisConnection ret = null;
int curSize = size.get();
while (curSize-- > 0 && ((ret = cachePool.poll()) != null)){
if(!ret.channel().isActive()){
size.decrementAndGet();
}else if(ret.getNum() > cmdEachConnection){
if(RedisConnection.getTotalConnectionNum() >= capacity){
size.decrementAndGet();
break;// .
}else{
/*
* : , ,
* , curSize 0, ret
* ret cachePool , ret = null
* */
cachePool.add(ret);
ret = null;
}
}else{
// connection
size.decrementAndGet();
break;
}
}
if(ret == null){
RedisConnection.incrementConnectionNum();
}
return ret;
}
2 :
? , 。 + ,
, , , pool 。 ( , )
capacity * k( 0 1)
3
public void submitRebalanceTask(EventLoop loop,int delay){
if (!rebalanceSubmit.compareAndSet(false,true)){return;}
loop.schedule(()->{
this.rebalanceNow();
rebalanceSubmit.set(false);},
delay,TimeUnit.SECONDS);
}
// ,
public void rebalanceNow(){
int tmpSize = size.get();
RedisConnection ret;
while (tmpSize-- > 0){
if((ret = cachePool.poll()) == null){ break;}//
// ret, idle
if(ret.isIdle()){
Logger.show("close channel" + ret.channel());
ret.close();//
size.decrementAndGet();
}else{
cachePool.add(ret);
}
}
}
パフォーマンスの向上:
V 2バージョンに比べて多くの性能(特に非同期操作)が向上し、現在、私のマシン(Mac 2.3 GHz Intel Core i 5、8 GB 2133 MHz LPDDR 3)でサーバとクライアントを同時に実行しています....同期単一スレッドは7500 QPSに達することができる…同期8スレッドは16000 QPSに達することができる…非同期単一スレッド:20000 QPS…非同期4スレッド:23000 QPS