RxJava変換オペレータbuffer

6554 ワード

bufferオペレータの機能:
1:複数の結果を一度にリストにまとめることができ、サブスクリプション後に自動的に結果をクリアし、完全にクリアするまで
2:複数の結果を周期的にリストに集約し、サブスクリプション後に完全に消去されるまで自動的に結果を空にすることもできます.
例1:1回に2個ずつ購読
// 2 
        Observable.range(1,5).buffer(2).subscribe(new Observer<List<Integer>>() {
            @Override
            public void onCompleted() {
                LogUtils.d("-----------------onCompleted:");
            }

            @Override
            public void onError(Throwable e) {
                LogUtils.d("----------------->onError:");
            }

            @Override
            public void onNext(List<Integer> strings) {
                LogUtils.d("----------------->onNext:" + strings);
            }
        });

結果を表示:
02-20 15:52:16.433 15913-15913/com.rxandroid.test1 D/----->: ----------------->onNext:[1, 2] 02-20 15:52:16.433 15913-15913/com.rxandroid.test1 D/----->: ----------------->onNext:[3, 4] 02-20 15:52:16.433 15913-15913/com.rxandroid.test1 D/----->: ----------------->onNext:[5] 02-20 15:52:16.433 15913-15913/com.rxandroid.test1 D/----->: -----------------onCompleted:
例2:一度に全部購読する
  // 
        Observable.range(1,5).buffer(5).subscribe(new Observer<List<Integer>>() {
            @Override
            public void onCompleted() {
                LogUtils.d("-----------------onCompleted:");
            }

            @Override
            public void onError(Throwable e) {
                LogUtils.d("----------------->onError:");
            }

            @Override
            public void onNext(List<Integer> strings) {
                LogUtils.d("----------------->onNext:" + strings);
            }
        });
結果:
02-20 15:54:56.423 21917-21917/com.rxandroid.test1 D/----->: ----------------->onNext:[1, 2, 3, 4, 5] 02-20 15:54:56.423 21917-21917/com.rxandroid.test1 D/----->: -----------------onCompleted:
例3:1つずつ削除
  // 1 
        Observable.range(1, 5).buffer(5, 1).subscribe(new Observer<List<Integer>>() {
            @Override
            public void onCompleted() {
                LogUtils.d("-----------------onCompleted:");
            }

            @Override
            public void onError(Throwable e) {
                LogUtils.d("----------------->onError:");
            }

            @Override
            public void onNext(List<Integer> strings) {
                LogUtils.d("----------------->onNext:" + strings);
            }
        });

02-20 16:05:53.323 26556-26556/com.rxandroid.test1 D/----->: ----------------->onNext:[1, 2, 3, 4, 5]
02-20 16:05:53.323 26556-26556/com.rxandroid.test1 D/----->: ----------------->onNext:[2, 3, 4, 5]
02-20 16:05:53.323 26556-26556/com.rxandroid.test1 D/----->: ----------------->onNext:[3, 4, 5]
02-20 16:05:53.323 26556-26556/com.rxandroid.test1 D/----->: ----------------->onNext:[4, 5]
02-20 16:05:53.323 26556-26556/com.rxandroid.test1 D/----->: ----------------->onNext:[5]
02-20 16:05:53.323 26556-26556/com.rxandroid.test1 D/----->: -----------------onCompleted:
注意skip=countの場合、フレームワークは同じ操作でcount個の要素を一度にクリアしたと考えられます.
Observable.range(1, 5).buffer(5, 5).subscribe(new Observer<List<Integer>>() {
            @Override
            public void onCompleted() {
                LogUtils.d("-----------------onCompleted:");
            }

            @Override
            public void onError(Throwable e) {
                LogUtils.d("----------------->onError:");
            }

            @Override
            public void onNext(List<Integer> strings) {
                LogUtils.d("----------------->onNext:" + strings);
            }
        });

02-20 16:09:24.343 14991-14991/com.rxandroid.test1 D/----->: ----------------->onNext:[1, 2, 3, 4, 5]
02-20 16:09:24.343 14991-14991/com.rxandroid.test1 D/----->: -----------------onCompleted:
例4:複数の結果を定期的に購読
 Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                if (subscriber.isUnsubscribed()) return;
                while (true) {
                    subscriber.onNext(" " + SystemClock.elapsedRealtime());
                    SystemClock.sleep(2000);// 2s 
                }

            }
        }).subscribeOn(Schedulers.io()).
                buffer(3, TimeUnit.SECONDS).// 3   
                subscribe(new Observer<List<String>>() {
            @Override
            public void onCompleted() {
                LogUtils.d("-----------------onCompleted:");
            }

            @Override
            public void onError(Throwable e) {
                LogUtils.d("----------------->onError:");
            }

            @Override
            public void onNext(List<String> strings) {
                LogUtils.d("----------------->onNext:" + strings);
            }
        });

02-20 16:55:33.283 17087-18151/com.rxandroid.test 1 D/---->:-------->onNext:[メッセージ370507667、メッセージ370509668]
02-20 16:55:36.323 17087-18151/com.rxandroid.test 1 D/---->:--------------->onNext:[メッセージ370511668]
02-20 16:55:39.303 17087-18151/com.rxandroid.test 1 D/---->:-------->onNext:[メッセージ370513669、メッセージ370515669]
02-20 16:55:54.883 23122-23316/com.rxandroid.test 1 D/---->:-------->onNext:[メッセージ370529168、メッセージ370531172]
02-20 16:55:57.863 23122-23316/com.rxandroid.test 1 D/---->:-------->onNext:[メッセージ370533184]
02-20 16:56:00.883 23122-23316/com.rxandroid.test 1 D/---->:-------->onNext:[メッセージ370535184、メッセージ370537184]
02-20 16:56:03.863 23122-23316/com.rxandroid.test 1 D/---->:-------->onNext:[メッセージ370539184]
02-20 16:56:06.863 23122-23316/com.rxandroid.test 1 D/---->:-------->onNext:[メッセージ370541185、メッセージ370543204]
02-20 16:56:09.863 23122-23316/com.rxandroid.test 1 D/---->:-------->onNext:[メッセージ370545204]
02-20 16:56:12.863 23122-23316/com.rxandroid.test 1 D/---->:-------->onNext:[メッセージ370547204、メッセージ370549204]
02-20 16:56:15.863 23122-23316/com.rxandroid.test 1 D/---->:-------->onNext:[メッセージ370551204]