NettyでRedisを実現(2)

13389 ワード

これはNettyで実現され、マルチスレッドと非同期呼び出し、lock-freeをサポートするJavaRedisクライアントです.
(サーババージョン:https://blog.csdn.net/pp634077956/article/details/82178710)
github: https://github.com/pyb1993/JavaRedisClient
目的1 JavaRedisサーバによるデバッグ
目的2各種非同期コールバック/futureのメカニズムを学習するために用いる
目的3スレッドセキュリティの問題を学習するために使用する
現在実装されている機能:
1.     
2.      + future  (      )
3.             
4.      (    )

コードの考え方を見る:
1       RedisClient   
2         getAsync    
            ,  /  ,handler   
3         get   
         

使用方法
    :
@Test
public void getSetTest(){
    int connNum = 100000;
    CountDownLatch c = new CountDownLatch(connNum);//todo XXXX
    RedisClient client = new RedisClient("127.0.0.1", 3333);
    for(int i = 0; i < connNum; ++i){
        String value = " " + i + " ";
        client.set(i + "",value);
        String result = client.get(i + "");
        assert result.equals(value);
    }
}

    
@Test
public void asyncGetSetTest() throws Exception{
    //Logger.setDebug();
    int connNum = 10000;
    CountDownLatch c = new CountDownLatch(connNum);//todo XXXX
    try (RedisClient client = new RedisClient("127.0.0.1", 3333)) {
        // todo    ,       (               ,   response    )
        testForOne(client,c,connNum);
    }
    c.await();
    RedisClient.stop();//
}
/**
 *         ,          
 * client, countDownLatch c(      ,         )
 * **/
private void testForOne(RedisClient client,CountDownLatch c,int taskLoad){
    for (int _i = 0; _i < taskLoad; _i++) {
        final int i = _i;
        String val = "  ,master" + _i;
        client.setAsync(_i + "",val ).addListener(future -> {
            RedisFuture r = client.getAsync(i + "");
            r.addListener(f -> {
                c.countDown();
                String result = (String)f.get();
                assert result.equals(val);
            });
        });
    };
}

解釈:実際には伝統的なコールバックの書き方で、addListenerは実際にNettyが提供していることに注意して、もし多くのlistenerを追加すれば、それは次から次へと実行され、listenerの中にはブロック操作を書くことができないので、中に非同期操作があれば、コールバックセットのコールバックしかできません.
Version-1アーキテクチャ
Nettyで実装されたクライアントを使用して、スレッドプールを保存し、Nettyオリジナルのコールバックを使用して簡単なパッケージを行います.
大まかな考え方:
     decoder fastJson     , ResponseHandler           
             ,              ,                  
     future      ,    connect               

接続プール:
                        ,       EventLoop 

同期と非同期:
           Netty   ChannelFuture   ,          
                     ,        
                    ,     netty `.sync`          ,         

マルチスレッドについて:
              Handler      ,    handler       
                        ,     `ConcurrentLinkedQueue`    `AtomicInteger`     

非同期の具体的な実装:
1.まず、接続フェーズの非同期実装です.
                            ,          
                     
    1 connect
    2               
           ,      :

              Channel(      ),         
            `sendInternal2`        ,               ,       handler     
        ,  connect       :
      ChannelFuture cFuture = bootstrap.connect();
        cFuture.addListener(f -> {
            if(f.isSuccess()) {
                sendInternal2(new RedisConnection(cFuture.channel()), future, type, payload);
            }else{
                future.setFailure(f.cause());
                Logger.debug("connect failed: 
" + f.cause()); } });

2.次に、送信コマンドの実装(sendInternal 2):
private RedisFuture sendInternal2(Channel output,RedisFuture future, String type, Object payload) throws Exception {
    //     step2,            
    ByteBuf buf = PooledByteBufAllocator.DEFAULT.directBuffer();
    final String reqId = RequestId.next();
    writeStr(buf, reqId);
    writeStr(buf, type);
    writeStr(buf, JSON.toJSONString(payload));
    Logger.debug(("send payload:" + JSON.toJSONString(payload)));

    // todo      
    //      handler,       
    while (output.pipeline().last() != null){
        output.pipeline().removeLast();
    }

    //      , decoder  reqId,                 id      
    ResponseDecoder decoder = new ResponseDecoder();
    decoder.setReqId(reqId);
    output.pipeline().addLast(decoder);

    //      handler,   handler   future
    RedisResponseHandler notifier = new RedisResponseHandler(future,this);//notifier    future      
    output.pipeline().addLast(notifier);

    output.writeAndFlush(buf);
    //        handler,     RedisFuture
    return future;
}

説明:
     : Future   Handler   ,  RedisResponseHandler   Future   ,
     *                      handler  future  
     *                      :
     *               Future(Parameters)------> listener callback(    type payLoad,         )
     *                  |                           |
     *                  |                           |      SendInteral2
     *                  |                           | 
     *                  |-------------->decoder,responseHandler(Read       Future1  )

不足:
  • の最大の問題はコールバック地獄callback hellが存在することであり、コールバックにはブロック操作(nettyの制限)が許されないため、コールバックネストコールの方式
  • と書くしかない.
  • 再接続なし、負荷等化などの機能
  • 読み書きタイムアウトの制御はサーバによって実現されるが,サーバは長リンクの制御であり,クライアント自身にも構成可能なreadTimeout,writeTimeoutのようなパラメータを加える必要があり,タイムアウト制限を加えると同時に性能が低下する.

  • Version-2改良RedisFuture構造に改造され、従来のコールバック方式を残す場合、性能はやや劣るが、コールバック地獄を解消できる方式を提供する.
    1.考え方
    Javascript Promiseという呼び出しにヒントを得たので、javaの中で1つ真似しましたが、違いはPromiseがキャンセルできず、最初の論理がすぐに実行されたことです.まずRedisFutureを拡張します.タイプの定義は次のとおりです.
     public class RedisFuture extends DefaultPromise {
     RedisFuture next;//    RedisFuture,                  
     EventLoop loop;
     BiConsumer listener;
    
    public RedisFuture(EventLoop loop){
        super(loop);
        this.loop = loop;
    }
      //     listener   setSuccess    ,   setSuccess  ChannelRead     
      // listener     「  」   next     
      //    next   listener   
    public RedisFuture then(BiConsumer listener){
         next = new RedisFuture(loop);
         this.listener = listener;
         return next;
    }
        ...
    }
    

    Listenerは以前と似ていて、setSuccessが呼び出されたときにのみ実行を通知されます.この特徴を利用して、listenerの中で次のコールバックが実行を開始することを通知する論理を設定しなければなりません.私はこの関数をnotifyNextListenerにカプセル化します.そのため、listenerの中で他の非同期操作を続けたら、では、この非同期操作にコールバックを設定し、このコールバックでnotifyNextListenerを呼び出すことができます.notifyNextListenerの定義を見てみましょう.
       public void notifyNextListener(Object result){
        if(next != null){
            next.setSuccess(result);
        }
     }
    
    setSuccessを見てみましょう
         @Override
    public RedisFuture setSuccess(Object result){
        super.setSuccess(result);
        //     listener(    listener「    」    next)
         if(listener != null){
             loop.execute(()-> listener.accept(this,result));
         }
         return this;
     }
    

    実は従来のChannelPromiseの論理を除いて、listenerに対する呼び出しを1つ増やしただけです.
    では、nextはどこで生成されたのでしょうか.またどのようにlistenerにバインドされているのか、上のコードを見るとthenという関数で生成されていることがわかりますので、thenは実際には構造関数であり、新しいRedisFutureを生成するだけで、チェーン呼び出しを実現することができ、非同期の操作を同期の書き方のように見ることができます.実は私が非同期のコールバックの詳細を隠しただけです.このようにコールバックネストは最大2層を超えないが、性能はやや劣る.
    例:
        private void testForThen(RedisClient client,CountDownLatch c,int taskLoad){
        for (int _i = 0; _i < taskLoad; _i++) {
            final int i = _i;
            String val = "(hello)  ,  (pig monster)" + _i;
            client.setAsync(_i + "",val ).then((future,result) -> {
                RedisFuture r = client.getAsync(i + "");
                r.addListener(f -> future.notifyNextListener(f.get()));  //  get              
            }).then((future,result)->{
                c.countDown();
                assert result.equals(val);
                future.notifyNextListener(1);
            });
        };
    }
    

    明らかに、addListenerを使用して、最初のコールバックが完了すると2番目のコールバックが実行されるのではなく、いつ次のコールバックが実行されるかを制御することができます.
    Version 3アーキテクチャ
    改善1:
              Channel IO    ,             
                      
    

    改善2:
               ,       ,                          。
    

    構想:チャネルベースのIO多重化とは,pipelineと同様に1つのチャネル上で複数のコマンドを同時に待つメカニズムである.これはhttp 2とは異なり、http 2は本質的に複数のリクエストを乱順に実行することができるが、Javaredis自体は単一スレッドのモデルであり、得られる結果は一般的に非常に短いため、シーケンス実行は十分速い(結果の量が非常に大きく、例えば多くのピクチャのように、N個のリクエストがそれぞれ異なるピクチャを転送することができる)
    したがって、requestIdとfutureとのつながりを記録することで、要求が来た後、非同期実行のfutureがコールバックを実行できることを通知することができる.これだけの記録ですが、実現にはかなりの細部が考慮されています.次に、最も重要ないくつかの点についてお話しします.
    1     :   v2  ,Channel                         。v3     ,    channel          ,          channel            。        channel               ,     eventLoop     。
    
               ,          channel              ,              : eventLoop         。                ,    。                             。
    
      2         :
                        ,                            
                                  ,  handler         (v2          addOrClose        ),     handler                  
                   ,                 ,         :
              1      
              2               
              3           
    

    マルチスレッドの問題を解決し、パフォーマンスとコードロジックの簡単さを保証するために、複雑な状態制御を行わないことを選択しました.
            `ConcurrentLinkedQueue`,                 `Connection`    (  `eventloop`     `handler`       Connection   )
                         ,     Connection  pool  ,                   
                                 (eventloop      )
         eventloop              ,      Connectin     (            )
                    ,       :
                  capacity    ?   add       capacity,              `rebalanceNow`    ,           ,          ,          (                  eventLoop    ,                pool     ,          )
                   :     pool capacity    ,             rebalance  ,        ,          (                      )。
    

    次に、接続プールのポリシーを考慮します.
        1          :
    /**
     *       pool,          ,                ,            
     *                cmdEachConnection:
     *        1.             (                )    capacity,           
     *        2.             ,    
     *                         (                            )
     *               ,               (RedisConnection.incrementConnectionNum();)    
    * -----------       RedisConnection           incrementConnectionNum()?  Connect                 ,                       ,                      。              ,               。
     *
    * */
    public  RedisConnection getConnection(){
        RedisConnection ret = null;
        int curSize = size.get();
        while (curSize-- > 0 && ((ret = cachePool.poll()) != null)){
            if(!ret.channel().isActive()){
                size.decrementAndGet();
            }else if(ret.getNum() > cmdEachConnection){
                if(RedisConnection.getTotalConnectionNum() >= capacity){
                    size.decrementAndGet();
                    break;//            .           
                }else{
                    /*
                    *     :          ,         ,              
                    *      ,   curSize       0,    ret   
                    *  ret           cachePool  ,  ret = null        
                    * */
                    cachePool.add(ret);
                    ret = null;
                }
            }else{
                //        connection
                size.decrementAndGet();
                break;
            }
        }
        if(ret == null){
            RedisConnection.incrementConnectionNum();
        }
    
        return  ret;
    }   
    
    2         :
                 ?     ,              。            +         ,
                   ,            ,                 ,     pool         。                 (             ,        )
                capacity * k(   0   1)
    
    3         
        public void submitRebalanceTask(EventLoop loop,int delay){
        if (!rebalanceSubmit.compareAndSet(false,true)){return;}
        loop.schedule(()->{
                    this.rebalanceNow();
                    rebalanceSubmit.set(false);},
                delay,TimeUnit.SECONDS);
    }
    
    //          ,         
    public void rebalanceNow(){
        int tmpSize = size.get();
        RedisConnection ret;
        while (tmpSize-- > 0){
            if((ret = cachePool.poll()) == null){ break;}//     
            //                ret,    idle          
            if(ret.isIdle()){
                Logger.show("close channel" + ret.channel());
                ret.close();//               
                size.decrementAndGet();
            }else{
                cachePool.add(ret);
            }
        }
    }
    

    パフォーマンスの向上:
    V 2バージョンに比べて多くの性能(特に非同期操作)が向上し、現在、私のマシン(Mac 2.3 GHz Intel Core i 5、8 GB 2133 MHz LPDDR 3)でサーバとクライアントを同時に実行しています....同期単一スレッドは7500 QPSに達することができる…同期8スレッドは16000 QPSに達することができる…非同期単一スレッド:20000 QPS…非同期4スレッド:23000 QPS