Akka-Streamで非同期並列処理


結構前からよく
「大規模データ処理業務経験者 優遇」
なんていう記事、広告などをちらほら見かける気がして、いろいろググってみたところ、衝撃的な記事を発見。
バッチを Akka Streams で再実装したら100倍速くなった話 #ScalaMatsuri

プロジェクト仕様とライブラリとの相性とかもあるだろうから、何でもかんでもAkka-Stream使えば早くなる、という問題でもないでしょうが、そうは言っても、こういった効率改善事例を実現させられたら、エンジニア冥利につきるでしょう。
上記のリンクではAkka-Stream以外にも、DBにアクセスするためのSlick使ったり、単純にサーバの実メモリ上げることで、総合的に295倍の効率アップを実現しているようです。

Slickの勉強は次回に回すとして、今回はAkka-Streamの使い方と、CPUマルチコアを使った非同期処理を本家サイト等眺めながら調査しました。

注意:Akka2.4.7で動かしてます。
Akkaに限らず、Scalaのライブラリは更新が頻繁なのでバージョンの違いには注意が必要です!

検証コード

import akka.actor.{ActorSystem}
import akka.stream._
import akka.stream.scaladsl._
import scala.concurrent.Future

object AkkaStreamTest {

  def main(args: Array[String]) = useStream

  val INTERVAL_STEP_A: Int = 1000
  val INTERVAL_STEP_B: Int = 1000
  val INTERVAL_STEP_C: Int = 1000
  val DATA_NUM: Int = 10

  def useStream (): Unit = {

    implicit val system = ActorSystem("toyo-sample")
    implicit val materializer = ActorMaterializer(
      ActorMaterializerSettings(system).withInputBuffer(initialSize = 64, maxSize = 64)
    )

    val source = Source(1 to DATA_NUM).buffer(1, OverflowStrategy.backpressure)

    val stepA = Flow[Int].map { i =>
      println(s"Task ${i}: stepA開始")
      Thread.sleep(INTERVAL_STEP_A.toLong)
      println(s"Task ${i}: stepA終了")
      i
    }.async
    val stepB = Flow[Int].map { i =>
      println(s"Task ${i}: stepB開始")
      Thread.sleep(INTERVAL_STEP_B.toLong)
      println(s"Task ${i}: stepB終了")
      i
    }.async
    val stepC = Flow[Int].map { i =>
      println(s"Task ${i}: stepC開始")
      Thread.sleep(INTERVAL_STEP_C.toLong)
      println(s"Task ${i}: stepC終了")
      i
    }.async

    val sink = Sink.last[Int]

    val graph = GraphDSL.create(sink) { implicit builder => sink =>
      import GraphDSL.Implicits._
      source ~> stepA ~> stepB ~> stepC ~> sink
      ClosedShape
    }
    val rg = RunnableGraph.fromGraph[Future[Int]](graph)
    import scala.concurrent.ExecutionContext.Implicits.global
    val res = rg.run
    res.onSuccess {
      case i: Int => println(i);system.terminate()
    }
  }
}

Akka-Streamの登場人物

Source

1個の出力口を持つ。各要素の処理の始点。引数に指定したコレクション(Stream)の順番に沿って要素(Element)を順次放出する。ちなみに例ではRangeで1〜10を順番に放出。

また.buffer(1, OverflowStrategy.backpressure)は、処理待ち要素を実メモリのタスクキューに積む時の設定であり、Reactive Streamsにおけるback-pressureを実現する仕組み。デフォでOverflowStrategy.backpressure。とりあえずデフォのままでいいかと思われる。

Flow

1個の入力口と1個の出力口を持つ。Flow[Int].map{}や、Flow[Int].filter{}などで実装される。Processing Stageとも呼ばれ、要素データの加工工程の単位。一個一個のFlowが内部でそれぞれActorとして動いてるイメージ。なおデフォルトでは同期処理を保っており、あるデータ要素がすべてのステップを完遂するまで、次のデータ要素の処理が開始されない。.asyncをつけることで非同期になり、処理が倍速に。

Sink

1個の入力口を持つ。各要素の処理の終点。基本的には何も出力しない(返り値の型が"NotUsed")。ただし、全要素の全処理の計算評価が完了した後に、その評価結果(Mat:Materialized Value)をGraph(後述)の外部に吐き出したりすることが可能。今回の例では、Sink.lastとして実装し、最後の要素(10: Int)が計算終了としたことをGraph外部に投げることで、全処理が完了したことをシステムに知らせている。(放置してるとActorSystemはメッセージを受け付けたまま待機しっぱなしなので。なんか他にいい終了方法あれば教えて下さい><)

最後の要素(10)が完了したことを持って、全要素全処理が完了したと見なせるかというと、Akka-Streamが要素の排出・処理の順番をちゃんと管理しているから。

ためしにStepCを

    val stepC = Flow[Int].map { i =>
      println(s"Task ${i}: stepC開始")
      Thread.sleep(INTERVAL_STEP_C.toLong)
      if(i == 9) Thread.sleep(10000L)// 要素9の時だけやたら重くする
      println(s"Task ${i}: stepC終了")
      i
    }.async

という風に変えたところ、9のStepCが完了するまで10のStepCが実行開始されなかった。

Graph

Source、Flow、Sinkなどの各処理過程をひとまとまりにしたもの。
Graphを一つの巨大なFlowとしても見ることができる。

内部で完結したGraphはClosedGraph、入出力口が1個ずつあるGraphはFlowGraphなど。
実行可能なGraphはRunnableGraph。.run()でGraph内のSourceから要素が発信され始める。
Graphから最終的なMat(Materialized Value)を吐き出すためにちょっとコツがいる。

出力結果

Akka-Stream使わずに普通に(1 to 10).map(stepA).map(stepB).map(stepC)
で直列処理したところ各Stepに1秒として
3 Step X 10要素
でメインの処理だけでも約30秒かかった。
さて、非同期並列処理したところ。。。

SO39-61348:play_test2 t_kamata$ time activator "runMain application.tasks.AkkaStreamTest"
[info] Loading project definition from /Users/t_kamata/Documents/workspace/play_test2/project
[info] Set current project to play_test2 (in build file:/Users/t_kamata/Documents/workspace/play_test2/)
[warn] compile:run::javaOptions will be ignored, compile:run::fork is set to false
Warning: node.js detection failed, sbt will use the Rhino based Trireme JavaScript engine instead to run JavaScript assets compilation, which in some cases may be orders of magnitude slower than using node.js.
[info] Compiling 1 Scala source to /Users/t_kamata/Documents/workspace/play_test2/target/scala-2.11/classes...
[info] Running application.tasks.AkkaStreamTest 
Task 1: stepA開始
Task 1: stepA終了
Task 2: stepA開始
Task 1: stepB開始
Task 2: stepA終了
Task 1: stepB終了
Task 3: stepA開始
Task 2: stepB開始
Task 1: stepC開始
Task 3: stepA終了
Task 1: stepC終了
Task 2: stepB終了
Task 4: stepA開始
Task 2: stepC開始
Task 3: stepB開始
Task 2: stepC終了
Task 4: stepA終了
Task 3: stepB終了
Task 5: stepA開始
Task 4: stepB開始
Task 3: stepC開始
Task 4: stepB終了
Task 3: stepC終了
Task 5: stepA終了
Task 4: stepC開始
Task 5: stepB開始
Task 6: stepA開始
Task 4: stepC終了
Task 6: stepA終了
Task 5: stepB終了
Task 5: stepC開始
Task 6: stepB開始
Task 7: stepA開始
Task 5: stepC終了
Task 7: stepA終了
Task 6: stepB終了
Task 8: stepA開始
Task 7: stepB開始
Task 6: stepC開始
Task 6: stepC終了
Task 8: stepA終了
Task 7: stepB終了
Task 9: stepA開始
Task 8: stepB開始
Task 7: stepC開始
Task 7: stepC終了
Task 8: stepB終了
Task 9: stepA終了
Task 8: stepC開始
Task 10: stepA開始
Task 9: stepB開始
Task 8: stepC終了
Task 10: stepA終了
Task 9: stepB終了
Task 9: stepC開始
Task 10: stepB開始
Task 10: stepB終了
Task 9: stepC終了
Task 10: stepC開始
Task 10: stepC終了
10
[success] Total time: 17 s, completed 2016/06/28 16:29:28

real    0m24.902s
user    0m29.029s
sys 0m1.806s

17秒、目に見えて早くなっている。
注目すべきは各要素(Task)とStepの開始タイミングである。
Task 1: stepA終了
の直後に
Task 2: stepA開始
Task 1: stepB開始
が同時に実行開始。

その後、各秒毎に最大で3要素のStepが同時進行しているのがわかりますね。

参考文献
Akka本家ドキュメント
バッチを Akka Streams で再実装したら100倍速くなった話 #ScalaMatsuri
【Akka】Akka Streamsがめっちゃ便利すぎて脳汁が出た話し