Sparkのfold()とaggregate()関数

2500 ワード

転載は出典を明記してください.http://www.jianshu.com/p/15739e95a46e @贰拾贰画生
最近はsparkを勉強していますが、この二つの関数を理解するのに苦労しました.今記録してみます.
1.rdd.fold(value)(func)fold()関数といえば、reduce()関数を提示しなければなりません.彼らの違いは初期値です.reduce()関数はこのように書かれています.
rdd.reduce(func)
パラメータは関数です.この関数は、rdd中のすべてのデータに対して何らかの動作を行います.
val l = List(1,2,3,4)
l.reduce((x, y) => x + y)
このxについては、戻り値を指すが、yは、rddの各要素に対するエルゴードである.lのデータを積算するという意味です.flod()関数は、reduce()に対して初期値パラメータを追加した.
rdd.fold(value)(func)
scalaの文法は確かにおかしいです.二つのパラメータがある以上、二つのパラメータを一つの括弧に入れてはいけませんか?酔っています.このような書き方は確かに迷いやすいです.
val l = List(1,2,3,4)
l.fold(0)((x, y) => x + y)
この計算は実は0 + 1 + 2 + 3 + 4で、reduce()の計算は1 + 2 + 3 + 4で、初期値がない、或いはrddの最初の要素値はその初期値である.
2.rdd.aggregate(value)(seqOp、combOp)
さっきreduce()fold()と言いましたが、この二つの関数には一つの問題があります.つまり、それらの戻り値はrddのデータタイプと同じでなければならないということです.どういう意味ですか?例えば、先ほどの例では、lのデータはIntであり、reduce()flod()が戻ってくるのもIntでなければなりません.aggregate()関数はこの制限を破った.たとえば(Int, Int)に戻ります.これはとても役に立ちます.たとえば平均を計算する時.平均値を計算するには、二つの値が必要です.一つはrddの各要素のアキュムレータと、もう一つは要素カウントです.私は(0, 0)に初期化しました.では、
val l = List(1,2,3,4)
l.aggregate(0, 0)(seqOp, combOp)
seqOpcombOpはどう書きますか?combOpとはどういう意味ですか?私たちはseqOpを次のように書きます.
(x, y) => (x._1 + y, x._2 + 1)
これはどういう意味ですかreduce()関数について話します.
val l = List(1,2,3,4)
l.reduce((x, y) => x + y)
このxについては、戻り値を指すが、yは、rddの各要素に対するエルゴードである.aggregate()でこれも同じです.xは戻り値ではないですか?私の戻り値は(Int, Int)です.2つの要素があります.x._1x._2を使ってこれらの2つの要素を指すことができます.巡回完了して戻ってくるyは、アキュムレータと要素カウントです.このような関数があったら終わるはずですが、後のx._1 + yは何ですか?私たちの計算は分散計算であるので、この関数はアキュムレータを統合したものです.例えば、第1のノードが1および2を遍歴し、x._2 + 1を返し、第2のノードが3および4を遍歴し、(Int, Int)を返した場合、それらを統合するとcombOpになり、プログラムで書き込みます.
(x, y) => (x._1 + y._1, x._2 + y._2)
最後のプログラムはこうです.
val l = List(1,2,3,4)
r = l.aggregate(0, 0)((x, y) => (x._1 + y, x._2 + 1), (x, y) => (x._1 + y._1, x._2 + y._2))
m = r._1 / r._2.toFload
(3, 2)は、要求された平均値である.