RxJavaとマルチスレッドの同時実行について簡単に説明します

10574 ワード

前言
RxJavaに対して、みんなはすべてとても熟知するべきで、彼の最も核心の2つの字は非同期で、確かに、それは非同期の処理に対してとても優れて、しかし非同期は絶対に同時ではありませんて、更にスレッドの安全に等しくありませんて、もしこのいくつかの概念を混同したら、誤ってRxJavaを使って、とても多くの問題を持ってきます.
RxJavaと同時
まず、RxJavaプロトコルの原文を見てみましょう.

Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications.

前述したように、RxJavaはマルチスレッド同時化に対して非常に多くの保護を行っていないが、この話では、複数のObservablesが複数のスレッドからデータを送信する場合、happens-beforeの原則を満たさなければならない.
簡単な例を見てみましょう.

final PublishSubject subject = PublishSubject.create();

subject.subscribe(new Subscriber() {
 @Override
 public void onCompleted() {

 }

 @Override
 public void onError(Throwable e) {

 }

 @Override
 public void onNext(Integer integer) {
  unSafeCount = unSafeCount + integer;
  Log.d("TAG", "onNext: " + unSafeCount);
 }
});

findViewById(R.id.send).setOnClickListener(new View.OnClickListener() {
 @Override
 public void onClick(View v) {
  final int unit = 1;
  for(int i = 0;i < 10;i++) {
   new Thread(new Runnable() {
    @Override
    public void run() {
     for (int j = 0; j < 1000; j++) {
      subject.onNext(unit);
     }
    }
   }).start();
  }
 }
});

これは最も典型的なマルチスレッド問題であり、10個のスレッドからデータを送信して加算し、最終的に得られる答えは10000未満である.RxJavaを使用していますが、このような使用は同時化には意味がありません.RxJavaは同時化による問題を処理していないからです.subjectのonNextメソッドのソースコードを見ることができます.中は簡単です.observerに対応するonNextメソッドを呼び出しただけです.それだけでなく、ほとんどのSubjectはスレッドが安全ではないので、このようなクラスを使用している場合(典型的なシーンは自家製RxBus)、複数のスレッドからデータを送信する場合は気をつけてください.
このような問題には、次の2つの解決策があります.
1つ目は、intの代わりにAtomicIntegerを使用するなど、従来の解決策を簡単に使用することです.
2つ目はRxJavaを使用するソリューションで、ここではSubjectの代わりにSerializedSubjectを使用します.

final PublishSubject subject = PublishSubject.create();

subject.subscribe(new Subscriber() {
 @Override
 public void onCompleted() {

 }

 @Override
 public void onError(Throwable e) {

 }

 @Override
 public void onNext(Integer integer) {
  unSafeCount = unSafeCount + integer;
  count.addAndGet(integer);

  Log.d("TAG", "onNext: " + count);
 }
});

final SerializedSubject ser = new SerializedSubject(subject);

findViewById(R.id.send).setOnClickListener(new View.OnClickListener() {
 @Override
 public void onClick(View v) {
  final int unit = 1;

  for(int i = 0;i < 10;i++){
   new Thread(new Runnable() {
    @Override
    public void run() {
     for(int j = 0;j < 1000;j++){
      ser.onNext(unit);
     }
    }
   }).start();
  }
 }
});

SerializedSubjectのonNextメソッドを見てみましょう.

@Override
public void onNext(T t) {
 if (terminated) {
  return;
 }
 synchronized (this) {
  if (terminated) {
   return;
  }
  if (emitting) {
   FastList list = queue;
   if (list == null) {
    list = new FastList();
    queue = list;
   }
   list.add(nl.next(t));
   return;
  }
  emitting = true;
 }
 try {
  actual.onNext(t);
 } catch (Throwable e) {
  terminated = true;
  Exceptions.throwOrReport(e, actual, t);
  return;
 }
 for (;;) {
  for (int i = 0; i < MAX_DRAIN_ITERATION; i++) {
   FastList list;
   synchronized (this) {
    list = queue;
    if (list == null) {
     emitting = false;
     return;
    }
    queue = null;
   }
   for (Object o : list.array) {
    if (o == null) {
     break;
    }
    try {
     if (nl.accept(actual, o)) {
      terminated = true;
      return;
     }
    } catch (Throwable e) {
     terminated = true;
     Exceptions.throwIfFatal(e);
     actual.onError(OnErrorThrowable.addValueAsLastCause(e, t));
     return;
    }
   }
  }
 }
}

処理は簡単で、他のスレッドがデータを送信している場合は、次の送信を待つためにキューにデータを配置します.これは、onNext、onComplete、onErrorなどのメソッドを呼び出すスレッドが同じ時間に1つしかないことを保証します.
しかし、このような操作は明らかに性能に影響を与えるため、RxJavaはすべての操作をスレッドの安全なラベルにつけることはありません.
ここでは、createメソッドの乱用に使用者が使用者に頻繁に呼び出されるべきではありません.すべてのデータ送信、受信の論理を慎重に処理しなければならないからです.逆に、既存のオペレータを使うとこの問題をうまく解決できるので、次回はcreateを使って簡単に自分で書くのではなく、既存のオペレータが必要なものを完成できるかどうかを考えるべきです.
RxJavaのオペレータ
RxJavaには、マルチスレッドの同時実行に関連するオペレータもあります.mergeとconcat、および彼らのいくつかの変種オペレータについて説明します.
マルチスレッド送信データについては、送信時と同じ順序で結果を得る必要がある場合があります.この場合、mergeというオペレータを使用して複数の送信源を結合すると、一定の問題が発生します(例では非常に悪い例を示しています.createオペレータを使用しています.このような書き方を学ばないでください.ここでは単純に結果を証明するためです).

Observable o1 = Observable.create(new Observable.OnSubscribe() {
 @Override
 public void call(final Subscriber super Integer> subscriber) {
  new Thread(new Runnable() {
   @Override
   public void run() {
    try {
     Thread.sleep(1000);
     subscriber.onNext(1);
     subscriber.onCompleted();
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   }
  }).start();
 }
});
Observable o2 = Observable.create(new Observable.OnSubscribe() {
 @Override
 public void call(Subscriber super Integer> subscriber) {
  subscriber.onNext(2);
  subscriber.onCompleted();
 }
});

Observable.merge(o1,o2)
  .subscribe(new Subscriber() {
   @Override
   public void onCompleted() {

   }

   @Override
   public void onError(Throwable e) {

   }

   @Override
   public void onNext(Integer i) {
    Log.d("TAG", "onNext: " + i);
   }
  });

このようなシナリオに対して,我々が得た答えは,o 1が送信したデータを先に得てからo 2のデータを取得するのではなく,2,1である.
なぜなら、mergeは実際に何を伝えても、データの送信順序を管理しないからです.

@Override
public void onNext(Observable extends T> t) {
  if (t == null) {
    return;
  }
  if (t == Observable.empty()) {
    emitEmpty();
  } else
  if (t instanceof ScalarSynchronousObservable) {
    tryEmit(((ScalarSynchronousObservable extends T>)t).get());
  } else {
    InnerSubscriber inner = new InnerSubscriber(this, uniqueId++);
    addInner(inner);
    t.unsafeSubscribe(inner);
    emit();
  }
}

lift操作後、対応する仲介者MergeSubscriberのonNextは、余計なコードがないので、複数のObservableがマルチスレッドからデータを送信する場合、順序が保証されないのは当然である.
一つの単語はこの問題を説明する:interleaving――交錯.merge後のデータソースはインタリーブされている可能性があります.mergeではこのようなデータ交錯の問題があるため,その変種であるflatMapにも同様の問題がある.
このようなシーンでは、concatオペレータを使用して次のことを実行できます.

Concat waits to subscribe to each additional Observable that you pass to it until the previous Observable completes.

ドキュメントによると、concatオペレータはデータソースのデータを次々と処理していることがわかります.

if (wip.getAndIncrement() != 0) {
  return;
}

final int delayErrorMode = this.delayErrorMode;

for (;;) {
  if (actual.isUnsubscribed()) {
    return;
  }

  if (!active) {
    if (delayErrorMode == BOUNDARY) {
      if (error.get() != null) {
        Throwable ex = ExceptionsUtils.terminate(error);
        if (!ExceptionsUtils.isTerminated(ex)) {
          actual.onError(ex);
        }
        return;
      }
    }

    boolean mainDone = done;
    Object v = queue.poll();
    boolean empty = v == null;

    if (mainDone && empty) {
      Throwable ex = ExceptionsUtils.terminate(error);
      if (ex == null) {
        actual.onCompleted();
      } else
      if (!ExceptionsUtils.isTerminated(ex)) {
        actual.onError(ex);
      }
      return;
    }

    if (!empty) {

      Observable extends R> source;

      try {
        source = mapper.call(NotificationLite.instance().getValue(v));
      } catch (Throwable mapperError) {
        Exceptions.throwIfFatal(mapperError);
        drainError(mapperError);
        return;
      }

      if (source == null) {
        drainError(new NullPointerException("The source returned by the mapper was null"));
        return;
      }

      if (source != Observable.empty()) {

        if (source instanceof ScalarSynchronousObservable) {
          ScalarSynchronousObservable extends R> scalarSource = (ScalarSynchronousObservable extends R>) source;

          active = true;

          arbiter.setProducer(new ConcatMapInnerScalarProducer(scalarSource.get(), this));
        } else {
          ConcatMapInnerSubscriber innerSubscriber = new ConcatMapInnerSubscriber(this);
          inner.set(innerSubscriber);

          if (!innerSubscriber.isUnsubscribed()) {
            active = true;

            source.unsafeSubscribe(innerSubscriber);
          } else {
            return;
          }
        }
        request(1);
      } else {
        request(1);
        continue;
      }
    }
  }
  if (wip.decrementAndGet() == 0) {
    break;
  }
}

ソースコードにより、activeフィールドは、前のデータソースがまだデータを送信していない場合、前のデータソースがデータを送信してactiveフィールドをリセットするまでforサイクルで待機することを保証します.
concatでは、複数のObservableがシリアル化され、RxJavaイベントフロー全体の処理時間が大幅に増加するという問題もあります.このシーンではconcatEagerを使用して解決できます.concatEagerのソースコードはみんなの分析を持っていないので、興味のある学生は自分で見ることができます.
まとめ
この文章は比較的に短くて、話すものも比較的に浅くて、実はRxJavaの中でマルチスレッドの同時のいくつかの問題を討論しました.最後に、RxJavaは大したものではありません.あなたのプロジェクトが導入される前に、本当にそうする必要があるかどうかを考えてみましょう.本当にRxJavaが必要なシーンがあっても、プロジェクト内のすべての操作を一気にRxJavaに変えないでください.簡単な操作では、RxJavaのオペレータを使用する必要はありません.逆にコードの可読性を低下させ、Rxを使用するためにRxを使用しないでください.
はい、以上はこの文章のすべての内容で、本文の内容がみんなの学习あるいは仕事に対して一定の助けをもたらすことができることを望んで、もし疑问があればみんなは伝言を残して交流することができます.