Spark RDDの2つの演算子の1つ:一般的なAction演算子の小結
5532 ワード
**RDD:**弾性分布式データセットは、特殊な集合であり、マルチソースをサポートし、フォールトトレランスメカニズムがあり、キャッシュされ、並列操作をサポートすることができ、1つのRDDは複数のパーティション内のデータセットを代表する.
RDDには2つの演算子がある:1.Transformation(変換):遅延Lazy計算に属し、1つのRDDが別のRDDに変換されたとき、すぐに変換されず、データセットの論理操作を覚えているだけである.2.Action(実行):Sparkジョブの実行をトリガし、本当に変換演算子の計算をトリガする.RDDにおける演算子の実行手順:入力:Sparkプログラムの実行中、データは外部データ空間(例えば分散ストレージ:textFileからHDFSを読み取るなど、parallelizeメソッドはScala集合またはデータを入力する)からSparkを入力し、データはSpark実行時データ空間に入り、Spark中のデータブロックに変換し、BlockManagerで管理する.実行:Sparkデータ入力によりRDDが形成された後、filterなどの変換演算子によりデータを操作し、RDDを新しいRDDに変換し、Action演算子によりSparkコミットジョブをトリガーすることができる.データを多重化する必要がある場合は、Cache演算子を使用してメモリにデータをキャッシュできます.出力:プログラムの実行が終了すると、Spark実行時のスペースが出力され、saveAsTextFileがHDFSに出力されるなどの分散型ストレージに格納されるか、Scalaデータまたはコレクションに格納される(collectがScalaコレクションに出力され、countがScala Int型データに戻る)
一般的なアクション演算子(コードは一番下に統一されています): reduce(func):関数funcによってセット内のすべての要素が集約されます.func関数は2つの同性の要素を受信し,1つの値を返す.この関数は関連性があり、正しく同時実行できることを確認する必要があります.この演算子はreduceByKeyのようにkeyによってグループ化されないので,全量の動作である. collect():Driverのプログラムでは、データセットのすべての要素を配列として返します.ただし、これは小さなデータ・サブセットを返す場合にのみ使用できます.そうしないと、OOM異常が発生しやすくなります. count():データセットの要素数(Longタイプの数)を返します. take(n):データセットの前のn(Intタイプ)要素からなる配列を返します.なお、この操作は複数のノードではなく、Driverが存在するノードで実行される.取得するデータ量が多い場合は、この演算子を使用しないようにすると、Driverが存在するノードの圧力が大きくなります. first():データセットの最初の要素(take(1)に類似)を返します. saveAsTextFile(path):データセットのすべての要素をtextfile形式でローカル、hdfsなどのファイルシステムの指定ディレクトリに保存します.SparkはtoString()メソッドを呼び出し、各要素を1行のテキスト保存に変換します. saveAsSequenceFile(path):データセットのすべての要素をローカル、hdfsなどのファイルシステムで指定されたディレクトリにsequencefile形式で保存します.しかしながら、この方法は、RDDの要素がkey−value対である必要があり、Writableインタフェースまたは暗黙的にWritableに変換できることを実装する(Sparkの基本タイプは、この変換を含む). foreach(func):データセットの各要素で関数funcを実行します. countByKey:reduceByKeyと同じ効果ですが、reduceByKeyはTransformation演算子です.
コード実装:
RDDには2つの演算子がある:1.Transformation(変換):遅延Lazy計算に属し、1つのRDDが別のRDDに変換されたとき、すぐに変換されず、データセットの論理操作を覚えているだけである.2.Action(実行):Sparkジョブの実行をトリガし、本当に変換演算子の計算をトリガする.RDDにおける演算子の実行手順:入力:Sparkプログラムの実行中、データは外部データ空間(例えば分散ストレージ:textFileからHDFSを読み取るなど、parallelizeメソッドはScala集合またはデータを入力する)からSparkを入力し、データはSpark実行時データ空間に入り、Spark中のデータブロックに変換し、BlockManagerで管理する.実行:Sparkデータ入力によりRDDが形成された後、filterなどの変換演算子によりデータを操作し、RDDを新しいRDDに変換し、Action演算子によりSparkコミットジョブをトリガーすることができる.データを多重化する必要がある場合は、Cache演算子を使用してメモリにデータをキャッシュできます.出力:プログラムの実行が終了すると、Spark実行時のスペースが出力され、saveAsTextFileがHDFSに出力されるなどの分散型ストレージに格納されるか、Scalaデータまたはコレクションに格納される(collectがScalaコレクションに出力され、countがScala Int型データに戻る)
一般的なアクション演算子(コードは一番下に統一されています):
コード実装:
package com.aura.spark
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object ActionTest {
def main(args: Array[String]): Unit = {
// ,
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.spark-project").setLevel(Level.WARN)
/**
* SparkContext
*
* setMaster() :
* Master URL
* :Master URL Spark
* (local):--Spark (Spark Driver Executor )
* local : Spark
* local[M] : Spark M
* local[*] : Spark
* local[M,N]: Spark M , Spark N
*/
val conf = new SparkConf()
.setAppName(s"${ActionTest.getClass.getSimpleName}")
.setMaster("local[*]")
val sc = new SparkContext(conf)
//
action_t(sc)
sc.stop()
}
def action_t(sc:SparkContext): Unit ={
val list = List(
"1,dashi",
"2,ersha",
"3,sansha",
"4,sisha"
)
val list_rdd:RDD[String] = sc.parallelize(list)
val list_map:RDD[(Int,String)] = list_rdd.map(line =>{
val tup = line.split(",")
(tup(0).toInt,tup(1))
})
//reduce
val list_red:(Int,String) = list_map.reduce((l1,l2) => {
val k = l1._1 + l2._1
val v = l1._2 + "_" + l2._2
(k,v)
})
println("----reduce :")
println(list_red)
//collect
// collect OOM , , filter(func)
val list_fil:RDD[(Int,String)] = list_map.filter(_._1 > 2)
// , :
//list_fil.collect().foreach(t => println(t + "\t"))
val list_col:Array[(Int,String)] = list_fil.collect()
println("----collect :")
list_col.foreach(t => println(t + "\t"))
//count
val coun:Long = list_map.count()
println("----count :")
println("list_map :" + coun)
//take
val list_take:Array[(Int,String)] = list_map.take(2)
println("----take :")
print(" list_map :")
list_take.foreach(t => println(t + "\t"))
//first( take(1))
val list_fir:(Int, String) = list_map.first()
println("----first :")
print(" list_map :" + list_fir)
//saveAsTextFile saveAsSequenceFile , saveAsTextFile
/**
* :file:///D:/dasha.txt
* hdfs-site.xml core-site.xml , hdfs file:///
* , D:/dasha.txt
*/
println("----saveAsTextFile ")
//list_map.saveAsTextFile("hdfs://dfs01/data/dasha.txt")
//foreach
// , : list_take.foreach(t => println(t + "\t"))
// func
var count = 1
val list_for:Unit = list.foreach(line => {
val words = line.split(",")
val tup = (words(1),words(0))
println(" list " + count + " :" + tup)
count += 1
})
}
}
実行結果:----reduce :
(10,ersha_dashi_sansha_sisha)
----collect :
(3,sansha)
(4,sisha)
----count :
list_map :4
----take :
list_map :(1,dashi)
(2,ersha)
----first :
list_map :(1,dashi)----saveAsTextFile
list 1 :(dashi,1)
list 2 :(ersha,2)
list 3 :(sansha,3)
list 4 :(sisha,4)
クリックして表示:一般的なTransformation演算子のまとめ