Akka-Streamで非同期並列処理
結構前からよく
「大規模データ処理業務経験者 優遇」
なんていう記事、広告などをちらほら見かける気がして、いろいろググってみたところ、衝撃的な記事を発見。
バッチを Akka Streams で再実装したら100倍速くなった話 #ScalaMatsuri
プロジェクト仕様とライブラリとの相性とかもあるだろうから、何でもかんでもAkka-Stream使えば早くなる、という問題でもないでしょうが、そうは言っても、こういった効率改善事例を実現させられたら、エンジニア冥利につきるでしょう。
上記のリンクではAkka-Stream以外にも、DBにアクセスするためのSlick使ったり、単純にサーバの実メモリ上げることで、総合的に295倍の効率アップを実現しているようです。
Slickの勉強は次回に回すとして、今回はAkka-Streamの使い方と、CPUマルチコアを使った非同期処理を本家サイト等眺めながら調査しました。
注意:Akka2.4.7で動かしてます。
Akkaに限らず、Scalaのライブラリは更新が頻繁なのでバージョンの違いには注意が必要です!
検証コード
import akka.actor.{ActorSystem}
import akka.stream._
import akka.stream.scaladsl._
import scala.concurrent.Future
object AkkaStreamTest {
def main(args: Array[String]) = useStream
val INTERVAL_STEP_A: Int = 1000
val INTERVAL_STEP_B: Int = 1000
val INTERVAL_STEP_C: Int = 1000
val DATA_NUM: Int = 10
def useStream (): Unit = {
implicit val system = ActorSystem("toyo-sample")
implicit val materializer = ActorMaterializer(
ActorMaterializerSettings(system).withInputBuffer(initialSize = 64, maxSize = 64)
)
val source = Source(1 to DATA_NUM).buffer(1, OverflowStrategy.backpressure)
val stepA = Flow[Int].map { i =>
println(s"Task ${i}: stepA開始")
Thread.sleep(INTERVAL_STEP_A.toLong)
println(s"Task ${i}: stepA終了")
i
}.async
val stepB = Flow[Int].map { i =>
println(s"Task ${i}: stepB開始")
Thread.sleep(INTERVAL_STEP_B.toLong)
println(s"Task ${i}: stepB終了")
i
}.async
val stepC = Flow[Int].map { i =>
println(s"Task ${i}: stepC開始")
Thread.sleep(INTERVAL_STEP_C.toLong)
println(s"Task ${i}: stepC終了")
i
}.async
val sink = Sink.last[Int]
val graph = GraphDSL.create(sink) { implicit builder => sink =>
import GraphDSL.Implicits._
source ~> stepA ~> stepB ~> stepC ~> sink
ClosedShape
}
val rg = RunnableGraph.fromGraph[Future[Int]](graph)
import scala.concurrent.ExecutionContext.Implicits.global
val res = rg.run
res.onSuccess {
case i: Int => println(i);system.terminate()
}
}
}
Akka-Streamの登場人物
import akka.actor.{ActorSystem}
import akka.stream._
import akka.stream.scaladsl._
import scala.concurrent.Future
object AkkaStreamTest {
def main(args: Array[String]) = useStream
val INTERVAL_STEP_A: Int = 1000
val INTERVAL_STEP_B: Int = 1000
val INTERVAL_STEP_C: Int = 1000
val DATA_NUM: Int = 10
def useStream (): Unit = {
implicit val system = ActorSystem("toyo-sample")
implicit val materializer = ActorMaterializer(
ActorMaterializerSettings(system).withInputBuffer(initialSize = 64, maxSize = 64)
)
val source = Source(1 to DATA_NUM).buffer(1, OverflowStrategy.backpressure)
val stepA = Flow[Int].map { i =>
println(s"Task ${i}: stepA開始")
Thread.sleep(INTERVAL_STEP_A.toLong)
println(s"Task ${i}: stepA終了")
i
}.async
val stepB = Flow[Int].map { i =>
println(s"Task ${i}: stepB開始")
Thread.sleep(INTERVAL_STEP_B.toLong)
println(s"Task ${i}: stepB終了")
i
}.async
val stepC = Flow[Int].map { i =>
println(s"Task ${i}: stepC開始")
Thread.sleep(INTERVAL_STEP_C.toLong)
println(s"Task ${i}: stepC終了")
i
}.async
val sink = Sink.last[Int]
val graph = GraphDSL.create(sink) { implicit builder => sink =>
import GraphDSL.Implicits._
source ~> stepA ~> stepB ~> stepC ~> sink
ClosedShape
}
val rg = RunnableGraph.fromGraph[Future[Int]](graph)
import scala.concurrent.ExecutionContext.Implicits.global
val res = rg.run
res.onSuccess {
case i: Int => println(i);system.terminate()
}
}
}
Source
1個の出力口を持つ。各要素の処理の始点。引数に指定したコレクション(Stream)の順番に沿って要素(Element)を順次放出する。ちなみに例ではRangeで1〜10を順番に放出。
また.buffer(1, OverflowStrategy.backpressure)は、処理待ち要素を実メモリのタスクキューに積む時の設定であり、Reactive Streamsにおけるback-pressureを実現する仕組み。デフォでOverflowStrategy.backpressure。とりあえずデフォのままでいいかと思われる。
Flow
1個の入力口と1個の出力口を持つ。Flow[Int].map{}や、Flow[Int].filter{}などで実装される。Processing Stageとも呼ばれ、要素データの加工工程の単位。一個一個のFlowが内部でそれぞれActorとして動いてるイメージ。なおデフォルトでは同期処理を保っており、あるデータ要素がすべてのステップを完遂するまで、次のデータ要素の処理が開始されない。.asyncをつけることで非同期になり、処理が倍速に。
Sink
1個の入力口を持つ。各要素の処理の終点。基本的には何も出力しない(返り値の型が"NotUsed")。ただし、全要素の全処理の計算評価が完了した後に、その評価結果(Mat:Materialized Value)をGraph(後述)の外部に吐き出したりすることが可能。今回の例では、Sink.lastとして実装し、最後の要素(10: Int)が計算終了としたことをGraph外部に投げることで、全処理が完了したことをシステムに知らせている。(放置してるとActorSystemはメッセージを受け付けたまま待機しっぱなしなので。なんか他にいい終了方法あれば教えて下さい><)
最後の要素(10)が完了したことを持って、全要素全処理が完了したと見なせるかというと、Akka-Streamが要素の排出・処理の順番をちゃんと管理しているから。
ためしにStepCを
val stepC = Flow[Int].map { i =>
println(s"Task ${i}: stepC開始")
Thread.sleep(INTERVAL_STEP_C.toLong)
if(i == 9) Thread.sleep(10000L)// 要素9の時だけやたら重くする
println(s"Task ${i}: stepC終了")
i
}.async
という風に変えたところ、9のStepCが完了するまで10のStepCが実行開始されなかった。
Graph
Source、Flow、Sinkなどの各処理過程をひとまとまりにしたもの。
Graphを一つの巨大なFlowとしても見ることができる。
内部で完結したGraphはClosedGraph、入出力口が1個ずつあるGraphはFlowGraphなど。
実行可能なGraphはRunnableGraph。.run()でGraph内のSourceから要素が発信され始める。
Graphから最終的なMat(Materialized Value)を吐き出すためにちょっとコツがいる。
出力結果
Akka-Stream使わずに普通に(1 to 10).map(stepA).map(stepB).map(stepC)
で直列処理したところ各Stepに1秒として
3 Step X 10要素
でメインの処理だけでも約30秒かかった。
さて、非同期並列処理したところ。。。
SO39-61348:play_test2 t_kamata$ time activator "runMain application.tasks.AkkaStreamTest"
[info] Loading project definition from /Users/t_kamata/Documents/workspace/play_test2/project
[info] Set current project to play_test2 (in build file:/Users/t_kamata/Documents/workspace/play_test2/)
[warn] compile:run::javaOptions will be ignored, compile:run::fork is set to false
Warning: node.js detection failed, sbt will use the Rhino based Trireme JavaScript engine instead to run JavaScript assets compilation, which in some cases may be orders of magnitude slower than using node.js.
[info] Compiling 1 Scala source to /Users/t_kamata/Documents/workspace/play_test2/target/scala-2.11/classes...
[info] Running application.tasks.AkkaStreamTest
Task 1: stepA開始
Task 1: stepA終了
Task 2: stepA開始
Task 1: stepB開始
Task 2: stepA終了
Task 1: stepB終了
Task 3: stepA開始
Task 2: stepB開始
Task 1: stepC開始
Task 3: stepA終了
Task 1: stepC終了
Task 2: stepB終了
Task 4: stepA開始
Task 2: stepC開始
Task 3: stepB開始
Task 2: stepC終了
Task 4: stepA終了
Task 3: stepB終了
Task 5: stepA開始
Task 4: stepB開始
Task 3: stepC開始
Task 4: stepB終了
Task 3: stepC終了
Task 5: stepA終了
Task 4: stepC開始
Task 5: stepB開始
Task 6: stepA開始
Task 4: stepC終了
Task 6: stepA終了
Task 5: stepB終了
Task 5: stepC開始
Task 6: stepB開始
Task 7: stepA開始
Task 5: stepC終了
Task 7: stepA終了
Task 6: stepB終了
Task 8: stepA開始
Task 7: stepB開始
Task 6: stepC開始
Task 6: stepC終了
Task 8: stepA終了
Task 7: stepB終了
Task 9: stepA開始
Task 8: stepB開始
Task 7: stepC開始
Task 7: stepC終了
Task 8: stepB終了
Task 9: stepA終了
Task 8: stepC開始
Task 10: stepA開始
Task 9: stepB開始
Task 8: stepC終了
Task 10: stepA終了
Task 9: stepB終了
Task 9: stepC開始
Task 10: stepB開始
Task 10: stepB終了
Task 9: stepC終了
Task 10: stepC開始
Task 10: stepC終了
10
[success] Total time: 17 s, completed 2016/06/28 16:29:28
real 0m24.902s
user 0m29.029s
sys 0m1.806s
17秒、目に見えて早くなっている。
注目すべきは各要素(Task)とStepの開始タイミングである。
Task 1: stepA終了
の直後に
Task 2: stepA開始
Task 1: stepB開始
が同時に実行開始。
その後、各秒毎に最大で3要素のStepが同時進行しているのがわかりますね。
参考文献
Akka本家ドキュメント
バッチを Akka Streams で再実装したら100倍速くなった話 #ScalaMatsuri
【Akka】Akka Streamsがめっちゃ便利すぎて脳汁が出た話し
Author And Source
この問題について(Akka-Streamで非同期並列処理), 我々は、より多くの情報をここで見つけました https://qiita.com/t_kamata/items/7d954f49b4d6bdb3fea4著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .