Http要求接続池-HttpClientのAbstractConPoolソース分析


背景
サービスの分割をする時、性能が特に高いシーンを要求しないなら、私達は一般的にHttpサービスを露出します。SpringにはテンプレートクラスRestTemplateが提供されています。構成RestTemplateを通じて、外部のHttpサービスに素早くアクセスできます。Httpの下層部はTcpの3回の握手によって接続が確立されており、各要求が接続を再確立すると、そのオーバーヘッドは大きく、特にメッセージボディにとって非常に小さいシーンで、オーバーヘッドはより大きい。
接続池の方式を利用すれば、接続先を管理し、サービスのスループットを大幅に向上させることができる。RestTemplateの下層部はHttpClientをカプセル化したもので、高合併ネットワーク要求を処理するための接続プール機構を提供している。

通常、私たちは次のようなサンプルコードを採用してHttpClientを構築します。
    HttpClientBuilder builder = HttpClientBuilder.create();
    builder.setMaxConnTotal(maxConnections).setMaxConnPerRoute(maxConnectionsPerRoute);
    if (!connectionReuse) {
        builder.setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE);
    }
    if (!automaticRetry) {
        builder.disableAutomaticRetries();
    }
    if (!compress) {
        builder.disableContentCompression();
    }
    HttpClient httpClient = builder.build();
上記のコードから分かるように、HttpClientは建築者設計モードを使用してオブジェクトを構成し、最後の行のコード構築対象として、前のコードはクライアントの最大接続数、単ルート最大接続数、長接続、圧縮などの特性を設定するために使用されています。
ソース分析
私たちはHttpClientBuiderのbuild()方法に入ると、次のコードが見られます。
    #   Http      
    final PoolingHttpClientConnectionManager poolingmgr = new PoolingHttpClientConnectionManager(
            RegistryBuilder.create()
                .register("http", PlainConnectionSocketFactory.getSocketFactory())
                .register("https", sslSocketFactory)
                .build());
    if (defaultSocketConfig != null) {
        poolingmgr.setDefaultSocketConfig(defaultSocketConfig);
    }
    if (defaultConnectionConfig != null) {
        poolingmgr.setDefaultConnectionConfig(defaultConnectionConfig);
    }
    if (systemProperties) {
        String s = System.getProperty("http.keepAlive", "true");
        if ("true".equalsIgnoreCase(s)) {
            s = System.getProperty("http.maxConnections", "5");
            final int max = Integer.parseInt(s);
            poolingmgr.setDefaultMaxPerRoute(max);
            poolingmgr.setMaxTotal(2 * max);
        }
    }
    if (maxConnTotal > 0) {
        poolingmgr.setMaxTotal(maxConnTotal);
    }
    if (maxConnPerRoute > 0) {
        poolingmgr.setDefaultMaxPerRoute(maxConnPerRoute);
    }
    # Http               
    connManager = poolingmgr;
デフォルトで構築されたHttp接続マネージャは、接続池として実現される。 PoolingHttpClientConnectionManagerのコードに入ります。接続池のコア実装は CPoolクラスに依存しています。 CPoolはまた抽象類AbstractConnPoolを継承しています。 AbstractConnPool@ThreadSafeの注釈があり、これはスレッドセキュリティクラスであると説明していますので、 HttpClientスレッドが安全に取得され、接続が AbstractConnPoolに依存しています。
次に、一番核心のAbstractConnPool類を見にきました。以下は接続池の構造図です。
接続池の最も重要な2つの公有方法は、 leaseおよびreleaseであり、接続を取得し、解放する2つの方法である。
lease取得接続
    @Override
    public Future lease(final T route, final Object state, final FutureCallback callback) {
        Args.notNull(route, "Route");
        Asserts.check(!this.isShutDown, "Connection pool shut down");
        return new PoolEntryFuture(this.lock, callback) {

            @Override
            public E getPoolEntry(
                    final long timeout,
                    final TimeUnit tunit)
                        throws InterruptedException, TimeoutException, IOException {
                final E entry = getPoolEntryBlocking(route, state, timeout, tunit, this);
                onLease(entry);
                return entry;
            }

        };
    }
lease方法は、 Futureオブジェクト、すなわち Futureを呼び出す必要があるget方法であり、PoolEntryのオブジェクトを得ることができ、接続の具体的な情報を含む。
接続を取得することは、 getPoolEntryBlocking方法によって実現され、対応する接続池の接続が不十分であるときには、routeに対応する接続池が接続解放されるまで、関数名によって知ることができる。または接続がタイムアウトするまで、方法はずっと待っています。
    private E getPoolEntryBlocking(
            final T route, final Object state,
            final long timeout, final TimeUnit tunit,
            final PoolEntryFuture future)
                throws IOException, InterruptedException, TimeoutException {

        Date deadline = null;
        //          
        if (timeout > 0) {
            deadline = new Date
                (System.currentTimeMillis() + tunit.toMillis(timeout));
        }
        //     ,        ,    --->    
        this.lock.lock();
        try {
            //  Map    route      , Map   ,    route      
            final RouteSpecificPool pool = getPool(route);
            E entry = null;
            while (entry == null) {
                Asserts.check(!this.isShutDown, "Connection pool shut down");
                for (;;) {
                    //              ,  available        ,   leased   
                    entry = pool.getFree(state);
                    //        ,    
                    if (entry == null) {
                        break;
                    }
                    //       ,     
                    if (entry.isExpired(System.currentTimeMillis())) {
                        entry.close();
                    } else if (this.validateAfterInactivity > 0) {
                        if (entry.getUpdated() + this.validateAfterInactivity <= System.currentTimeMillis()) {
                            if (!validate(entry)) {
                                entry.close();
                            }
                        }
                    }
                    if (entry.isClosed()) {
                        //        ,   available        
                        this.available.remove(entry);
                        //   route       leased        ,      available                           
                        pool.free(entry, false);
                    } else {
                        break;
                    }
                }
                //   for  
                if (entry != null) {
                    //          ,      available    ,    leased   
                    //       ,    
                    this.available.remove(entry);
                    this.leased.add(entry);
                    onReuse(entry);
                    return entry;
                }
                //    route      
                // New connection is needed
                final int maxPerRoute = getMax(route);
                // Shrink the pool prior to allocating a new connection
                  //    route                 route     
                final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
                //       route     ,    route    
                if (excess > 0) {
                    for (int i = 0; i < excess; i++) {
                        //    route             , available       
                        //        ,        available     ,                 
                        final E lastUsed = pool.getLastUsed();
                        if (lastUsed == null) {
                            break;
                        }
                        //     ,          route         
                        lastUsed.close();
                        this.available.remove(lastUsed);
                        pool.remove(lastUsed);
                    }
                }
                //  route          route     
                if (pool.getAllocatedCount() < maxPerRoute) {
                    final int totalUsed = this.leased.size();
                    final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
                    if (freeCapacity > 0) {
                        final int totalAvailable = this.available.size();
                        //                       
                        if (totalAvailable > freeCapacity - 1) {
                            if (!this.available.isEmpty()) {
                                //    available       route            ,     
                                final E lastUsed = this.available.removeLast();
                                lastUsed.close();
                                final RouteSpecificPool otherpool = getPool(lastUsed.getRoute());
                                otherpool.remove(lastUsed);
                            }
                        }
                        //      ,      leased    route    leased   ,    
                        final C conn = this.connFactory.create(route);
                        entry = pool.add(conn);
                        this.leased.add(entry);
                        return entry;
                    }
                }
                
                //route      ,      
                boolean success = false;
                try {
                    //            pending  
                    pool.queue(future);
                    this.pending.add(future);
                    //     ,         ,   true;        ,   false
                    success = future.await(deadline);
                } finally {
                    // In case of 'success', we were woken up by the
                    // connection pool and should now have a connection
                    // waiting for us, or else we're shutting down.
                    // Just continue in the loop, both cases are checked.
                    //          、               ,    finally   
                    //  pending     
                    pool.unqueue(future);
                    this.pending.remove(future);
                }
                // check for spurious wakeup vs. timeout
                //               
                //        ,   while  ,           ;
                //       ,         
                if (!success && (deadline != null) &&
                    (deadline.getTime() <= System.currentTimeMillis())) {
                    break;
                }
            }
            throw new TimeoutException("Timeout waiting for connection");
        } finally {
            //    
            this.lock.unlock();
        }
    }
リリースコネクション
    @Override
    public void release(final E entry, final boolean reusable) {
        //    
        this.lock.lock();
        try {
            //    leased       
            if (this.leased.remove(entry)) {
                final RouteSpecificPool pool = getPool(entry.getRoute());
                //     
                pool.free(entry, reusable);
                if (reusable && !this.isShutDown) {
                    this.available.addFirst(entry);
                    onRelease(entry);
                } else {
                    entry.close();
                }
                //   pending       (      ),        
                PoolEntryFuture future = pool.nextPending();
                if (future != null) {
                    this.pending.remove(future);
                } else {
                    future = this.pending.poll();
                }
                if (future != null) {
                    future.wakeup();
                }
            }
        } finally {
            //    
            this.lock.unlock();
        }
    }
締め括りをつける routeは、接続を取得したり、接続を解除したりする際にロックをかけたりすることによって、スレッドの安全を実現するという考えは非常に簡単であるが、routeに対応する接続池にロックオブジェクトを入れていない、つまり AbstractConnPoolの取得接続、解除接続操作はロックされていない。
また、各 RouteSpecificPoolは、1つの接続池に対応しており、 AbstractConnPoolレベルでの分離を実現しており、下流のある台がサービスを提供するホストが保留されている場合、無効な接続は最大route対応の接続池だけを占有し、接続池全体を占有しないので、サービス全体を牽引する。
以上です
原文のリンク
https://segmentfault.com/a/11...