Sparkのfold()とaggregate()関数
2500 ワード
転載は出典を明記してください.http://www.jianshu.com/p/15739e95a46e @贰拾贰画生
最近はsparkを勉強していますが、この二つの関数を理解するのに苦労しました.今記録してみます.
1.rdd.fold(value)(func)
2.rdd.aggregate(value)(seqOp、combOp)
さっき
最近はsparkを勉強していますが、この二つの関数を理解するのに苦労しました.今記録してみます.
1.rdd.fold(value)(func)
fold()
関数といえば、reduce()
関数を提示しなければなりません.彼らの違いは初期値です.reduce()
関数はこのように書かれています.rdd.reduce(func)
パラメータは関数です.この関数は、rdd中のすべてのデータに対して何らかの動作を行います.val l = List(1,2,3,4)
l.reduce((x, y) => x + y)
このx
については、戻り値を指すが、y
は、rddの各要素に対するエルゴードである.l
のデータを積算するという意味です.flod()
関数は、reduce()
に対して初期値パラメータを追加した.rdd.fold(value)(func)
scalaの文法は確かにおかしいです.二つのパラメータがある以上、二つのパラメータを一つの括弧に入れてはいけませんか?酔っています.このような書き方は確かに迷いやすいです.val l = List(1,2,3,4)
l.fold(0)((x, y) => x + y)
この計算は実は0 + 1 + 2 + 3 + 4
で、reduce()
の計算は1 + 2 + 3 + 4
で、初期値がない、或いはrddの最初の要素値はその初期値である.2.rdd.aggregate(value)(seqOp、combOp)
さっき
reduce()
とfold()
と言いましたが、この二つの関数には一つの問題があります.つまり、それらの戻り値はrddのデータタイプと同じでなければならないということです.どういう意味ですか?例えば、先ほどの例では、l
のデータはInt
であり、reduce()
とflod()
が戻ってくるのもInt
でなければなりません.aggregate()
関数はこの制限を破った.たとえば(Int, Int)
に戻ります.これはとても役に立ちます.たとえば平均を計算する時.平均値を計算するには、二つの値が必要です.一つはrddの各要素のアキュムレータと、もう一つは要素カウントです.私は(0, 0)
に初期化しました.では、val l = List(1,2,3,4)
l.aggregate(0, 0)(seqOp, combOp)
seqOp
とcombOp
はどう書きますか?combOpとはどういう意味ですか?私たちはseqOpを次のように書きます.(x, y) => (x._1 + y, x._2 + 1)
これはどういう意味ですかreduce()
関数について話します.val l = List(1,2,3,4)
l.reduce((x, y) => x + y)
このx
については、戻り値を指すが、y
は、rddの各要素に対するエルゴードである.aggregate()
でこれも同じです.x
は戻り値ではないですか?私の戻り値は(Int, Int)
です.2つの要素があります.x._1
とx._2
を使ってこれらの2つの要素を指すことができます.巡回完了して戻ってくるy
は、アキュムレータと要素カウントです.このような関数があったら終わるはずですが、後のx._1 + y
は何ですか?私たちの計算は分散計算であるので、この関数はアキュムレータを統合したものです.例えば、第1のノードが1および2を遍歴し、x._2 + 1
を返し、第2のノードが3および4を遍歴し、(Int, Int)
を返した場合、それらを統合するとcombOp
になり、プログラムで書き込みます.(x, y) => (x._1 + y._1, x._2 + y._2)
最後のプログラムはこうです.val l = List(1,2,3,4)
r = l.aggregate(0, 0)((x, y) => (x._1 + y, x._2 + 1), (x, y) => (x._1 + y._1, x._2 + y._2))
m = r._1 / r._2.toFload
(3, 2)
は、要求された平均値である.