scala並列集合のsparkへの応用
一.scalaパラレルコレクション
次に示すように、各要素を処理するコレクションがあります.
観測スレッド
二.sparkでの使用
100枚のテーブルからデータを取り出し、データ洗浄を行うなど、相互に関連のないタスクをn個実行する必要があります.sparkのメカニズムに従って、jobが最初に使用した同時度はデータのpartition数(hdfsではデータのブロック数、esとcassで読み取ったデータはデータのコピー数)である.
すべてのcoreは、タスクに100個割り当てられていても、データ読み出しフェーズではいくつかしか使用されません.
パラレルコレクションのモードに変更すると、プログラムは最大の同時度(core数)でデータソースからデータを引き出し、プログラムの実行効率を大幅に向上させます.
もちろん
次に示すように、各要素を処理するコレクションがあります.
val arr = List[String]("a","b","c")
arr.foreach(println(_))
// :
a
b
c
//
arr.par.foreach(println(_))
// :
a
c
b
観測スレッド
println((0 to 1000).map{r => Thread.currentThread.getName}.distinct)
Vector(main)
println((0 to 1000).par.map{r => Thread.currentThread.getName}.distinct)
ParVector(ForkJoinPool-1-worker-5, ForkJoinPool-1-worker-7, ForkJoinPool-1-worker-3, ForkJoinPool-1-worker-1)
二.sparkでの使用
100枚のテーブルからデータを取り出し、データ洗浄を行うなど、相互に関連のないタスクをn個実行する必要があります.sparkのメカニズムに従って、jobが最初に使用した同時度はデータのpartition数(hdfsではデータのブロック数、esとcassで読み取ったデータはデータのコピー数)である.
すべてのcoreは、タスクに100個割り当てられていても、データ読み出しフェーズではいくつかしか使用されません.
val table_list = List[String]("table1","table2","table3","...")
val iter = arr.iterator
while(iter.hasNext){
val tableName = iter.next
// table
val data = ...
//
data.map(...)
}
パラレルコレクションのモードに変更すると、プログラムは最大の同時度(core数)でデータソースからデータを引き出し、プログラムの実行効率を大幅に向上させます.
val table_list = List[String]("table1","table2","table3","...")
table_list.par.foreach(r=>{
val tableName = r
// table
val data = ...
//
data.map(...)
})
もちろん
scala.concurrent
パックの中のFutureなどを使って同じ役割を果たすこともできますimport scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
val arr = List[String]("a","b","c")
val futures = arr.map(r=>Future{
val tableName = r
// table
val data = ...
//
data.map(...)
})
val future = Future.sequence(futures)
//
Await.result(future, 1 hour)