spark検索中位数

1403 ワード

基本的な考え方は、
最初のステップはまずpartitionで、一定範囲の数字を同じpartitionに入れます.
第2歩、各partitionのsizeを記録して、中位数がどのpartitionの中に落ちているかを知ることができます
第3歩では、第2部で見つけたpartitionをtoArrayし、sortし、中位数を取得するか、sortしないか、利用する中位数を取る関数を直接取る(後述)
データの準備:
import java.io.PrintWriter

val max=9999999
val parts=10

val out=new PrintWriter("number")
for (i 

データをロードし、上の3つの処理を行います.
//      
val file = sc.textFile("file:/root/number")
//      
 import org.apache.spark.Partitioner
  val parter = new Partitioner {
    def getPartition(key: Any): Int = key match {
      case i: Int => i / (max / parts + 1)
      case _ => parts - 1
    }

    val numPartitions = parts
  }
//  ,     partition  ,    
  val g = file.groupBy((x: String) => x.toInt, parter).cache
//    partition   
 val arr_partition = g.mapPartitions(x => Iterator(x.size)).collect
//           ,        
  val harf = (file.count) / 2
  var count = 0
  var index = 0
  var flag = true
  for (i  harf) {
      flag = false
      count -= i
    } else {
      index += 1
    }
  }

//      ,array    0     ,      array  index    ,     
  g.mapPartitionsWithIndex((i, x) => {
    if (i != index) {
      Iterator(0)
    } else {
      val tmp = x.toArray.sorted
      val (k, _) = tmp((harf - count).toInt)
      Iterator(k)
    }
  }
  ).collect
並列処理をしなくても、速い列の思想で、すぐに中位数を取ることができて、次回また書くことができます