OkHttp Post制限速度アップロード

10928 ワード

ローカルエリアネットワークでOkHttp postを通じていくつかの大きなファイルをアップロードし、テスト後、ファイルのアップロードは常に帯域幅を満たし、ビジネスのインタラクションに影響を与えることが分かった.アップロードには速度制限が必要です.
制限速度は、サービス側で制限してもよいし、クライアントで制限してもよい.サービス側の制限速度は遅延許容にすぎず、TCPバッファが渋滞し、帯域幅の問題は本当に解決されていない.クライアントの速度制限の考え方はSocketの速度制限を書くことです.資料を探してみると、OkHttpは速度制限のインタフェースを提供していません.
OkHttpのブロッカーInterceptorを調べてみました
ネットワークアクセスの実行RealCall
RealCallのexecute関数でgetResponseWithInterceptorChain関数を呼び出してネットワークのResponseを取得します.
RealCall.java 
 @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }
  }

二OkHttp Interceptorの実現.
コードから、ブロッキングは配列に追加されないことがわかります.次の順になります.
  • client.interceptors()カスタムブロッキング
  • retryAndFollowUpInterceptor
  • BridgeInterceptor
  • CacheInterceptor
  • ConnectInterceptor
  • CallServerInterceptor

  • 次のブロッキングの実行は順序に関係するため、順序が重要です.本格的なネットカフェアクセスはCallServer Interceptorで
    RealCall.java 
      Response getResponseWithInterceptorChain() throws IOException {
        // Build a full stack of interceptors.
        List interceptors = new ArrayList<>();
        interceptors.addAll(client.interceptors());
        interceptors.add(retryAndFollowUpInterceptor);
        interceptors.add(new BridgeInterceptor(client.cookieJar()));
        interceptors.add(new CacheInterceptor(client.internalCache()));
        interceptors.add(new ConnectInterceptor(client));
        if (!forWebSocket) {
          interceptors.addAll(client.networkInterceptors());
        }
        interceptors.add(new CallServerInterceptor(forWebSocket));
    
        Interceptor.Chain chain = new RealInterceptorChain(
            interceptors, null, null, null, 0, originalRequest);
        return chain.proceed(originalRequest);
      }
    

    三RealInterceptorChainの実行
    RealInterceptorChain実行はチェーン式のプロセスです.注意はRealCallです.JAvaでRealInterceptorChainを構築して入力するindexパラメータは0であり、proceed関数でnewの新しいRealInterceptorChain next、nextのindexが追加されます.
  • this.interceptor.intercept(next); 注意新しいブロッキングをパラメータとして入力しました.
  • @Override public Response proceed(Request request) throws IOException {
        return proceed(request, streamAllocation, httpCodec, connection);
      }
    
      public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
          RealConnection connection) throws IOException {
        
        // Call the next interceptor in the chain.
        RealInterceptorChain next = new RealInterceptorChain(
            interceptors, streamAllocation, httpCodec, connection, index + 1, request);
        Interceptor interceptor = interceptors.get(index);
        
        Response response = interceptor.intercept(next);
    
    
        return response;
      }
    

    ブロック内で必要に応じて
  • Requestを処理する前にブロッキングを処理する論理である、次いでnextブロッキングのproceedをチェーン的に呼び出す.
  • Responseがnextブロッキングを呼び出してブロッキングを処理する論理である場合.

  • このデザインはいいですね.
    四CallServer Interceptor
    CallServerInterceptorは、ネットワークの読み書きを担当する場所で、制限速度を実現するには、ここが最大の可能性です.
        request.body().writeTo(bufferedRequestBody);
    

    requestのbodyでネットワークにデータを書きます.このbodyはどこから来たのか、postでアップロードしたデータを構築する際には、MultipartBodyを構築してアップロードしたファイルをカプセル化する必要があります.
    public CallServerInterceptor(boolean forWebSocket) {
        this.forWebSocket = forWebSocket;
      }
    
      @Override public Response intercept(Chain chain) throws IOException {
        
    
        Response.Builder responseBuilder = null;
        if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
         
          if (responseBuilder == null) {
            // Write the request body if the "Expect: 100-continue" expectation was met.
            Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
            BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
            request.body().writeTo(bufferedRequestBody);
            bufferedRequestBody.close();
          } else if (!connection.isMultiplexed()) {
            // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection from
            // being reused. Otherwise we're still obligated to transmit the request body to leave the
            // connection in a consistent state.
            streamAllocation.noNewStreams();
          }
        }
    
        httpCodec.finishRequest();
    
        if (responseBuilder == null) {
          responseBuilder = httpCodec.readResponseHeaders(false);
        }
    
        Response response = responseBuilder
            .request(request)
            .handshake(streamAllocation.connection().handshake())
            .sentRequestAtMillis(sentRequestMillis)
            .receivedResponseAtMillis(System.currentTimeMillis())
            .build();
    
        int code = response.code();
        if (forWebSocket && code == 101) {
          // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
          response = response.newBuilder()
              .body(Util.EMPTY_RESPONSE)
              .build();
        } else {
          response = response.newBuilder()
              .body(httpCodec.openResponseBody(response))
              .build();
        }
    
        
        return response;
      }
    

    5 RequestBody
    コードから分かるように、RequestBodyは抽象クラスであり、MultipartBodyを通過している.create直接new出てきます.私たちの考えは、このRequestBodyのwriteTo関数を修正し、Socketを書く速度を制御することです.
        public MultipartBody.Part getMultipartBodyPart(){
            RequestBody requestFile = MultipartBody.create(MediaType.parse("multipart/form-data"), new File(mFileEncrypt));
            MultipartBody.Part fileBody = MultipartBody.Part.createFormData(FILE_ENCRYPT, mFileEncrypt, requestFile);
    
            return fileBody;
        }
        
          public static RequestBody create(final @Nullable MediaType contentType, final File file) {
        if (file == null) throw new NullPointerException("content == null");
    
        return new RequestBody() {
          @Override public @Nullable MediaType contentType() {
            return contentType;
          }
    
          @Override public long contentLength() {
            return file.length();
          }
    
          @Override public void writeTo(BufferedSink sink) throws IOException {
            Source source = null;
            try {
              source = Okio.source(file);
              sink.writeAll(source);
            } finally {
              Util.closeQuietly(source);
            }
          }
        };
      }
    

    六RateLimitingRequestBody
    修正されたコードは以下の通りで、OkIOのいくつかの操作に対していくつかのコードをコピーしました.またコンパイルの問題でOkIO.source法は反射を用いた.
    public MultipartBody.Part getMultipartBodyPart(){
            RequestBody requestFile = RateLimitingRequestBody.createRequestBody(MediaType.parse("multipart/form-data"), new File(mFileEncrypt), UPLOAD_RATE);
            MultipartBody.Part fileBody = MultipartBody.Part.createFormData(FILE_ENCRYPT, mFileEncrypt, requestFile);
    
            return fileBody;
    }    
        
    public class RateLimitingRequestBody extends RequestBody {
    
        private MediaType mContentType;
        private File mFile;
        private int mMaxRate;    // bit/ms
    
        private RateLimitingRequestBody(@Nullable final MediaType contentType, final File file, int rate){
            mContentType = contentType;
            mFile = file;
            mMaxRate = rate;
        }
    
        @Override
        public MediaType contentType() {
            return mContentType;
        }
    
        @Override
        public void writeTo(BufferedSink sink) throws IOException {
    
            Source source = null;
    
            try {
    
                /*
                *  reflect instead of Okio.source(mFile) because of build error at platform 23.
                *  the error is java.nio.** can't find.
                */
    
                // source = Okio.source(mFile);
    
                String className = "okio.Okio";
                String methodName = "source";
                Class> clazz = Class.forName(className);
                Method method = clazz.getMethod(methodName, File.class);
                source = (Source) method.invoke(null, mFile);
                writeAll(sink, source);
    
            } catch (InterruptedException e) {
                NLog.exception("writeTo", e);
            } catch (NoSuchMethodException e) {
                NLog.exception("writeTo", e);
            } catch (IllegalAccessException e) {
                NLog.exception("writeTo", e);
            } catch (InvocationTargetException e) {
                NLog.exception("writeTo", e);
            } catch (ClassNotFoundException e) {
                NLog.exception("writeTo", e);
            } finally {
                Util.closeQuietly(source);
            }
        }
    
    
        public long writeAll(BufferedSink sink, Source source) throws IOException, InterruptedException {
            if (source == null) {
                throw new IllegalArgumentException("source == null");
            } else {
                long totalBytesRead = 0L;
    
                long readCount;
                long start = System.currentTimeMillis();
                while((readCount = source.read(sink.buffer(), 8192L)) != -1L) {
                    totalBytesRead += readCount;
                    sink.emitCompleteSegments();
    
                    long time = System.currentTimeMillis();
                    if(time == start) continue;
                    long rate = (totalBytesRead * 8) / (time - start);
    
                    if(rate > mMaxRate/1000){
                        int sleep = (int) (totalBytesRead * 8 * 1000 / mMaxRate - (time - start));
                        NLog.v("writeAll","totalBytesRead:"+totalBytesRead+"B "+ " Rate:"+rate*1000+"bits");
                        NLog.d("writeAll", "sleep:"+sleep);
                        Thread.sleep(sleep+500);
                    }
                }
    
                long end = System.currentTimeMillis();
                long rate = (totalBytesRead * 8 * 1000) / ((end - start));
                NLog.e("writeAll","totalBytesRead:"+totalBytesRead+"B "+ " Rate:"+rate+"bits"+" total time:"+(end-start));
                return totalBytesRead;
            }
        }
    
    
        public static RequestBody createRequestBody(@Nullable final MediaType contentType, final File file, int rate) {
            if (file == null) {
                throw new NullPointerException("content == null");
            } else {
                return new RateLimitingRequestBody(contentType, file, rate);
            }
        }
    }