複数のネットワークリクエストrxjavaのマージ

10584 ワード

プロジェクトにユーザの3つの部分を取得する情報が表示され、ログイン後に直接すべて取得してローカルに保存することにします.
rxjavaの学習では,多様な実装方式が見出された.
zipを採用し始めたばかりです.
flowable 1,flowable 2,flowable 3はそれぞれ3つのネットワークリクエストである.
Flowable.zip(flowable1, flowable2, flowable3, new Function3, ShopInfo, UserProfile,
                LoginInfo>() {
    @Override
    public LoginInfo apply(UserInfo userInfo, ShopInfo shopInfo, UserProfile userProfile) throws Exception {
        LoginInfo loginInfo = new LoginInfo();
        loginInfo.profile = userProfile;
        loginInfo.user = userInfo;
        loginInfo.shop = shopInfo;
        return loginInfo;
    }
}).subscribeWith(new HttpResultSubscriber() {
    @Override
    public void onNext(LoginInfo loginInfo) {
        callBack.setInfo(loginInfo);//    
    }

    @Override
    public void onError(Throwable t) {
        super.onError(t);
    }
});
3つのリクエストはマージ後に同時に送信されるため、リクエストイベントにエラーが発生した場合はonErrorに進みます.
他のリクエストが通ってもonNextまでは行けません
しかし、いくつかのリクエストのデータを保存するには、各イベントを別の方法で処理する必要があります.
最初はmergeを採用していました
Flowable.merge(flowable1, flowable2, flowable3)
       .subscribeWith(new HttpResultSubscriber() {
                          @Override
                          public void onNext(Object object) {
                              LoginInfo loginInfo = new LoginInfo();
                              if (object instanceof UserInfo) {
                                  loginInfo.user = (UserInfo) object;
                              }
                              if (object instanceof UserProfile) {
                                  loginInfo.profile = (UserProfile) object;
                              }
                              if (object instanceof ShopInfo) {
                                  loginInfo.shop = (ShopInfo) object;
                              }
                              callBack.setInfo(loginInfo);//    

                          }
                      }
       );
しかし、retrofitのinterceptorが「java.io.InterruptedIOException:thread interrupted」とエラーを報告していることがわかり、目視はリクエストの送信が早すぎるためです.
クエリー資料ではmergeがイベントを順番に送信していないことがわかり、順番に送信したconcatを交換してもエラーはありません.
Flowable.concat(flowable1, flowable2, flowable3)
       .subscribeWith(new HttpResultSubscriber() {
                          @Override
                          public void onNext(Object object) {
                              LoginInfo loginInfo = new LoginInfo();
                              if (object instanceof UserInfo) {
                                  loginInfo.user = (UserInfo) object;
                              }
                              if (object instanceof UserProfile) {
                                  loginInfo.profile = (UserProfile) object;
                              }
                              if (object instanceof ShopInfo) {
                                  loginInfo.shop = (ShopInfo) object;
                              }
                              callBack.setInfo(loginInfo);//    

                          }
                      }
       );

もちろん、各リクエスト間に依存関係がある場合はmapまたはflatMapオペレータを使用できます.
例えばflowable 2とflowable 3がflowable 1に依存するデータを必要とする場合、flatmapとzipを組み合わせて
やや複雑に見える.
 
  
final Flowable flowable = Flowable.zip(flowable2, flowable3, new
        BiFunction, UserProfile, LoginInfo>() {
            @Override
            public LoginInfo apply(ShopInfo shopInfo, UserProfile userProfile) throws Exception {
                LoginInfo loginInfo = new LoginInfo();
                loginInfo.profile = userProfile;
                loginInfo.shop = shopInfo;
                return loginInfo;//flowable2 3    
            }
        });

flowable1.flatMap(new Function, Flowable>() {
    @Override
    public Flowable apply(UserInfo userInfo) throws Exception {
        LoginInfo loginInfo = new LoginInfo();
        loginInfo.user = userInfo;
        callBack.setInfo(loginInfo);//flowable1  
        return flowable;
    }
}).subscribeWith(new HttpResultSubscriber() {
    @Override
    public void onNext(LoginInfo loginInfo) {
        callBack.setInfo(loginInfo);//flowable2 3        
    }

    @Override
    public void onError(Throwable t) {
        super.onError(t);
    }
});

目視はcomposeを利用して、後で補充することもできます.RxJavaは実に強い.