spark検索中位数
1403 ワード
基本的な考え方は、
最初のステップはまずpartitionで、一定範囲の数字を同じpartitionに入れます.
第2歩、各partitionのsizeを記録して、中位数がどのpartitionの中に落ちているかを知ることができます
第3歩では、第2部で見つけたpartitionをtoArrayし、sortし、中位数を取得するか、sortしないか、利用する中位数を取る関数を直接取る(後述)
データの準備:
データをロードし、上の3つの処理を行います.
最初のステップはまずpartitionで、一定範囲の数字を同じpartitionに入れます.
第2歩、各partitionのsizeを記録して、中位数がどのpartitionの中に落ちているかを知ることができます
第3歩では、第2部で見つけたpartitionをtoArrayし、sortし、中位数を取得するか、sortしないか、利用する中位数を取る関数を直接取る(後述)
データの準備:
import java.io.PrintWriter
val max=9999999
val parts=10
val out=new PrintWriter("number")
for (i
データをロードし、上の3つの処理を行います.
//
val file = sc.textFile("file:/root/number")
//
import org.apache.spark.Partitioner
val parter = new Partitioner {
def getPartition(key: Any): Int = key match {
case i: Int => i / (max / parts + 1)
case _ => parts - 1
}
val numPartitions = parts
}
// , partition ,
val g = file.groupBy((x: String) => x.toInt, parter).cache
// partition
val arr_partition = g.mapPartitions(x => Iterator(x.size)).collect
// ,
val harf = (file.count) / 2
var count = 0
var index = 0
var flag = true
for (i harf) {
flag = false
count -= i
} else {
index += 1
}
}
// ,array 0 , array index ,
g.mapPartitionsWithIndex((i, x) => {
if (i != index) {
Iterator(0)
} else {
val tmp = x.toArray.sorted
val (k, _) = tmp((harf - count).toInt)
Iterator(k)
}
}
).collect
並列処理をしなくても、速い列の思想で、すぐに中位数を取ることができて、次回また書くことができます