scala並列集合のsparkへの応用

4239 ワード

一.scalaパラレルコレクション
次に示すように、各要素を処理するコレクションがあります.
    val arr = List[String]("a","b","c")
    arr.foreach(println(_))
//    : 
a
b
c

//    
    arr.par.foreach(println(_))
//    : 
a
c
b

観測スレッド
  println((0 to 1000).map{r => Thread.currentThread.getName}.distinct)

Vector(main)

  println((0 to 1000).par.map{r => Thread.currentThread.getName}.distinct)

ParVector(ForkJoinPool-1-worker-5, ForkJoinPool-1-worker-7, ForkJoinPool-1-worker-3, ForkJoinPool-1-worker-1)

二.sparkでの使用
100枚のテーブルからデータを取り出し、データ洗浄を行うなど、相互に関連のないタスクをn個実行する必要があります.sparkのメカニズムに従って、jobが最初に使用した同時度はデータのpartition数(hdfsではデータのブロック数、esとcassで読み取ったデータはデータのコピー数)である.
すべてのcoreは、タスクに100個割り当てられていても、データ読み出しフェーズではいくつかしか使用されません.
    val table_list = List[String]("table1","table2","table3","...")

val iter = arr.iterator

while(iter.hasNext){
    val tableName = iter.next
    //  table        
    val data = ...
    //      
    data.map(...)
}

パラレルコレクションのモードに変更すると、プログラムは最大の同時度(core数)でデータソースからデータを引き出し、プログラムの実行効率を大幅に向上させます.
    val table_list = List[String]("table1","table2","table3","...")

    table_list.par.foreach(r=>{
    val tableName = r
    //  table        
    val data = ...
    //      
    data.map(...)
})

もちろんscala.concurrentパックの中のFutureなどを使って同じ役割を果たすこともできます
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._

    val arr = List[String]("a","b","c")

    val futures = arr.map(r=>Future{
    val tableName = r
    //  table        
    val data = ...
    //      
    data.map(...)
  })

  val future = Future.sequence(futures)

  //        
  Await.result(future, 1 hour)