Coroutines私的メモ3〜Coroutines Flow〜


前回の続きとなります

Coroutines Flowとは

Coldデーターストリームの一種

Cold?

RxJavaのObservableにもありますが、データストリームはColdHotに大別できる

Cold

  • 1つの消費者(生産者の対義語)に対して購読関係を結ぶ (=個別に値を流していく)
  • Subscribeするまで値を流さない
  • 通知するデータのタイムラインが購読されるたびに生成
  • 大抵のObservableはCold

Hot

  • 複数の消費者(生産者の対義語)に対して購読関係を結ぶ
  • Subscribeしなくても値が流れていく
  • すでに作成した通知するデータストリームに、後から消費者が加わる
  • RxJavaでいうところのConnectableFlowable/ConnectableObserver

※上記2つの図はRxJavaリアクティブプログラミングから一部抜粋し、引用させていただきました

導入

def coroutines_ver = "1.3.3"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutines_ver"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:$coroutines_ver"

CoroutinesFlowの使い方の流れ

  • Flowでストリームを作成
  • (任意)オペレータによる中間処理
  • Flow.collectで値を受け取っていく

Flowの作り方

例としてここでは1から10までのストリームをあげてみます

  • flowOf
private fun getIntFlow1() = flowOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
  • asFlow()

Iterable, Iterator, Sequenceに対して使うことができます

private fun getIntFlow2() = listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).asFlow()
  • flow {}
private fun getIntFlow3() = flow {
   repeat(10) {
       emit(it + 1)
   }
}

オペレータによる中間処理

Rxにあったfilterやmapなど中間処理の関数をサポートしています
中間処理をする時点ではsuspendではないためこの点も便利ですね

val ints = getIntFlow1()
    .filter { it > 3 }
    .map { value ->
        "Value: $value"
    }
    .drop(1)
    .take(2)

上記では3以上のものをフィルタリングし、文字列に変換し、最初の1つを削除して、その中から最初の2つを選ぶということを行っています

Experimentalなものも未だ多い現状ですが、他にも様々な中間処理が使えますのでこちらのExtension Functionsを御覧ください

Flow<T>.collect

Flow<T>及び途中の中間処理まではsuspend関数ではないのでどこでも定義ができます
一方、受け取り側のFlow.collectはsuspend関数のため、必ずコルーチンスコープ内で実行する必要があります
(逆にスコープの閉じ忘れがない事は利点になる)

val ints = getIntFlow1()
GlobalScope.launch(Dispatchers.Main) {
    ints.collect {
        delay(1000L)
        text.text = it
    }
}

※textはtextView

結果として1からはじまり、10まで1秒ごとにTextViewのtextに代入されViewに表示されていくことになります

Flow<T>.collectIndexed

collectと同様に値も取得できますし、indexも取得できます

さいごに

かんたんにCoroutines Flowについてとりあげました
次は今回は分量上避けていたFlowBuilderのうちの一つであるchannelFlow<T>に着目して取り上げてみたいと思います

参考にさせていただいたもの