Rxjavaシリーズ(七)RxJava 2.0背圧原理解析

8027 ワード

RxJava2.0背圧のサポートが大きな特徴で、背圧を使うにはFlowableが必要です.なぜこのメカニズムを背圧する必要があるのか、Flowableを捨てて言わないで、実際の応用における実例を考えてみましょう.送信イベントと受信イベントが異なるスレッドにあり、イベント処理の速度が遅く、イベント送信の速度が速い場合、送信されたイベントを保存するために下流の消化を待つプールが必要になります.そうしないと、メッセージが失われます.送信イベントの速度が速く、受信イベントの速度が遅い場合、このプールはますます大きくなり、メモリが爆発します.背圧戦略はまさにこの問題を解決するために引き起こされたものだ.
まず,非同期の場合,上流送信イベントの速度と下流処理イベントの速度をどのように一致させることができるかを考える.下流が上流の処理速度をタイムリーに伝え、上流の送信イベントの速度を遅くすればいいのではないでしょうか.この問題を持ってまずRxJava 2を見てみましょう.0の背圧戦略の実現.まずFlowableの基本的な使用例を見てみましょう.
Flowable.create(new FlowableOnSubscribe() {
            @Override
            public void subscribe(FlowableEmitter emitter) throws Exception {
                Log.d(TAG, "emit 1");
                emitter.onNext(1);
                Log.d(TAG, "emit 2");
                emitter.onNext(2);
                Log.d(TAG, "emit complete");
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                        mSubscription.request(1);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                        mSubscription.request(1);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

上記の例のFlowableの具体的な使用方法をよく見ると、私たちの前のRxjavaの使用経験とは異なる2つの場所があります.1つはFlowableを作成するにはBackpressuuresStrategyのパラメータを入力する必要があり、もう1つはSubscriberのonSubscribeメソッドでsubscriptionを呼び出す必要があります.requestメソッド.この二つは何に使いますか.最初に私たちが提案した問題に戻って、上流送信イベントと下流処理イベントの速度が一致しない問題を解決するために、下流に下流の処理能力がどのようなものかを伝える戦略が必要です.この2つの違いはこの問題を解決するためです.まず、BackpressureStrategyパラメータの意味を説明します.ここでは、BackpressureStrategyについて詳しく説明します.ERROR:上流送信イベント速度が下流処理イベント能力を超え、イベントキャッシュプールが満タンである場合、異常BackpressureStrategyを放出する.BUFFER:上流送信イベント速度が下流処理能力を超えると、イベントを格納して下流処理待ちBackpressureStrategy.DROP:上流送信イベント速度が下流処理能力を超えると、イベントキャッシュプールがいっぱいになると、その後送信イベントをBackpressureStrategyに破棄する.LATEST:送信時間速度が下流処理能力を超えている場合、最新の128個のイベントコメントのみが格納されます:128はFlowableのイベントキャッシュプールのサイズです
ここでは、イベントキャッシュプールのサイズが128である理由を説明します.Flowableのソースコードの一部を見てみましょう.
/** The default buffer size. */
static final int BUFFER_SIZE;
static {
BUFFER_SIZE = Math.max(16, Integer.getInteger("rx2.buffer-size", 128));
}

Flowableのデフォルトのbufferサイズは128で、上流のイベントを格納します.
ではこれはrequestメソッドは主に何に用いられるのか,このメソッドは下流で上流にイベントを処理する能力を知らせるために用いられる.Flowableは、上下流の処理速度が統一されていないという問題を解決するために、応答式抽出の考え方を採用しています.簡単に言えば、応答式引き抜きとは、下流が上流に自分の処理能力を知らせ、下流が1つのイベントを処理した後、上流に伝え、上流はイベントを送信し続け、下流の通知を待つことである.subscriptionが呼び出されると.request(1)の時、下流は上へ遊説します:私は今1つの事件を処理することができます.
私たちの上の例は、Flowableの使用時の違いと背圧の基本的な概念を大まかに紹介するだけです.では、実際の使用では、Flowableが提供するこれらのメカニズムをどのように使用して上下流の処理速度を統一するかを分析し続けます.前にsubscriptionの呼び出しについて言及しました.requestメソッドの後,下流は上流自身の処理能力を通知し,上流は下流の処理能力を知ってからイベントをどのように送信するかを決定する必要がある.では、上流はどのような方法でこのデータを知っているのでしょうか.まず、上流でイベントを送信するFlowableEmitterのソースコードの一部を見てみましょう.
public interface FlowableEmitter extends Emitter {
    void setDisposable(Disposable s);
    void setCancellable(Cancellable c);

    /**
     * The current outstanding request amount.
     * 

This method is thread-safe. * @return the current outstanding request amount */ long requested(); // boolean isCancelled(); FlowableEmitter serialize(); }


FlowableEmitterには、現在のリクエスト数を取得するためのメソッドrequestedがあります.では、この方法が現在の下流処理能力を取得するインタフェースであるかどうかは、検証のために小さなdemoを書くことができます.
        Flowable
                .create(new FlowableOnSubscribe() {
                    @Override
                    public void subscribe(FlowableEmitter emitter) throws Exception {
                        Log.d(TAG, "First requested = " + emitter.requested());
                        boolean flag;
                        for (int i = 0; ; i++) {
                            flag = false;
                            while (emitter.requested() == 0) {
                                if (!flag) {
                                    Log.d(TAG, "can't emit value !");
                                    flag = true;
                                }
                            }
                            emitter.onNext(i);
                            Log.d(TAG, "emit " + i + " , requested = " + emitter.requested());
                        }
                    }
                }, BackpressureStrategy.ERROR)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                        mSubscription.request(3);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

実行後、結果を見ることができます.
D/TAG: onSubscribe                        
D/TAG: requested = 128        
D/TAG: emit event 1, requested = 127        
D/TAG: onNext: 1                          
D/TAG: emit event 2, requested = 126
D/TAG: onNext: 2
D/TAG: emit event 3, requested = 125
D/TAG: onNext: 3   
D/TAG: emit event 4, requested = 124 
D/TAG: emit event 5, requested = 123
......
D/TAG: emit event 128, requested = 0           

コードによって走り出された結果,下流でs.request()で設定されたリクエスト値がいくらであっても,上流で取得した初期のrequestedリクエスト数は128であり,イベントの送信に伴ってこの値は減少することが分かった.この理由は何でしょうか.Flowableの内部には128サイズのイベントキャッシュプールがあり、上流から発生したイベントは、イベントキャッシュに先に配置され、下流で処理され、下流で処理できないとキャッシュプールにずっと存在します.そのため上流ではイベントが発生する際、キャッシュプールに依然としてスペースがあるかどうかを考慮する必要があり、キャッシュプールにスペースがないままイベントを送信し続けると異常が発生しやすい.これは、requestedの値が下流要求のイベント数ではない理由であり、上流で送信されたイベント数が下流のデータ需要量を超えても、キャッシュに配置することもできます.下流リクエスト量を超えない場合は、キャッシュプールのデータを下流に渡します.
それは...requested()の値は128個のデータを送信した後に0に減少し,demoの表記ではイベントの送信を継続することはできないが,下流のリクエスト量が128より大きいと128以上のイベントは取得できないのではないか.事実はそうですか?私たちはdemoを通じて検証を続けます.私たちはdemoの中でmSubscriptをします.request(3)をmSubscriptionに変更する.request(96).結果は次のようになります.
D/TAG: onSubscribe                        
D/TAG: requested = 128        
D/TAG: emit event 1, requested = 127        
D/TAG: onNext: 1                          
D/TAG: emit event 2, requested = 126
D/TAG: onNext: 2
......
D/TAG: emit event 95, requested = 33
D/TAG: onNext: 95   
D/TAG: emit event 96, requested = 95 
D/TAG: emit event 97, requested = 94
......
D/TAG: emit event 223, requested = 0   

結果から分かるようにrequested()の数はずっと減少しているわけではなく,33に減少すると95に回復した.つまり、非同期キャッシュ・プール内のイベントは、下流で1つ処理すると1つクリアされるのではなく、下流で95個を累計処理した後、キャッシュ・プールを集中的にクリーンアップすると、キャッシュ・プールに95個の空きスペースがあり、キャッシュ・プールにイベントを格納し続けることができます.このようなポリシー設計では,バックプレッシャーを使用する際に非同期キャッシュプールに格納できるかどうかを考慮するだけで,キャッシュプールにスペースがあればイベントを送信し続けることができ,スペースがなければ95件を下流で処理してから送信を継続する.Flowableの背圧メカニズムはこのように動作します.