ゼロからSpark(三)RDDプログラミングを学ぶ
4871 ワード
RDDプログラミング
RDD(Resilient Distributed Datasetフレックス分散データセット)はSparkの中で最も核心的な概念である.RDDをマスターしてSparkを理解して、この章はいくつかの最も簡単な例を通じてRDDのScalaプログラミングを実現します.RDDのコア操作は3つあり,作成,変換操作,アクション操作である.
1.概要
以下の最も簡単な例はRDDの一連の典型的な動作を表す.以下はPythonコードで、Scalaも基本的に同じです.第1行はRDD を作成した.の2行目は、RDDに対して変換動作を実行し、まだRDDを返している.SparkはこれらのRDDを不活性に計算するだけであることに注意してください.プログラムはあなたの操作を記録しただけですが、実際に実行するわけではありません.ファイルは読み込まれていません.filter操作は実行されていません.RDDの変換動作は、map()およびfilter()のような新しいRDDを返す動作である. の3行目は行動操作に対応しており、このときに実際にファイルを読み取り、操作を実行する.アクション操作は、ドライバプログラムに結果を返すか、外部システムに結果を書き込む操作であり、count()やfirst()などの実際の計算がトリガーされます.また、Int、Array、Stringなどのデータ型にも似ています.
2.RDDの作成
第一種外部データセットの読み込み
第二種既存の集合を利用する
RDDを作成する最も簡単な方法は、プログラムの既存のセットをSparkContextのparallelize()メソッドに渡すことです.
プロトタイプの開発とテストを除いて、この方法はあまり使われていません.結局、この方法はデータセット全体を1台のマシンのメモリに置く必要があります.
3.変換操作
各要素に対する変換操作 map()は、RDDの各要素に使用される関数を受信し、関数の戻り結果を結果として使用します.
文字列RDDがあり、map()関数が文字列を解析してDouble値を返すために使用されている場合、入力RDDタイプはRDD[String]であり、出力タイプはRDD[Double]である. filter()は、関数を受信し、RDDの関数を満たす要素を新しいRDDに入れて返します.伝達された関数の戻り値はブール型でなければなりません. flatMap()の関数は、入力RDDの各要素にそれぞれ適用される.ただし、返されるのは要素ではなく、値シーケンスを返す反復器です.出力されたRDDは反復器で構成されていない.各反復器がアクセス可能なすべての要素を含むRDDを得た. distinct()操作は重複元素を除去することができるが、これはデータの混練に関連し、効率が非常に低下する.
擬似集合操作、2つのRDD操作
次の4つの方法は,並列,交差,差分,デカルト積をそれぞれ実現した. union(other)は、2つのRDDのすべての要素を含むRDDを返すコレクション操作です.unionは重くならない. intersection(other)メソッドは、2つのRDDに存在する要素のみを返します.注意intersection()は、データの混洗にも関与し、効率が非常に低下します. subtract(other)関数は、パラメータとして別のRDDを受信し、第1のRDDにのみ存在し、第2のRDDには存在しないすべての要素からなるRDDを返す. cartesian(other)変換操作は、可能なすべての(a,b)対 を返す.
4.行動操作 collect()関数は、RDD全体のデータを取得するために使用することができる.collect()は、データセット全体を単一のマシンのメモリに格納できる場合にのみ使用できます.そのため、collect()は大規模なデータセットでは使用できません. reduce()関数は、2つのRDDの要素タイプのデータを操作し、同じタイプの新しい要素を返す関数をパラメータとして受信します. fold()はreduce()と同様に、reduce()が受信した関数署名と同じ関数を受信し、各パーティションが最初に呼び出されたときの結果として「初期値」を追加します. aggregate()関数は、戻り値タイプが動作するRDDタイプと同じでなければならないという制限から解放されます.たとえば、平均値を計算する場合は、ループ中のカウントと要素の数を記録する必要があります.これには、二元グループを返す必要があります.具体的な使い方についてはこちらをご覧ください. take(n)は、RDD内のn要素を返し、できるだけ少ないパーティションにのみアクセスしようとするので、この操作は不均衡な集合を得る. top()は、RDDから最初の要素を取得する. foreach()アクションは、RDDをローカルに戻す必要がなく、RDDの各要素を動作させる.foreachは値を返していません.
6.注意すべき点
オブジェクトがオブジェクトのメンバーであるか、オブジェクトのフィールドへの参照が含まれている場合(self.fieldなど)、Sparkはオブジェクト全体をワークノードに送信します.これは、あなたが伝えたいものよりもずっと大きい可能性があります.
python正しい書き方
Scala正しい書き方
7.SparkとHadoopの違い
Hadoop MapReduceのようなシステムでは、開発者は、MapReduceのサイクル数を減らすために、操作を組み合わせる方法を考えるのに多くの時間を費やすことが多い.Sparkでは,非常に複雑なマッピングを書くことは,多くの簡単な連続動作を用いるよりも多くの性能を得ることができるとは限らない.根本的な原因はSparkの変換操作が不活性操作であることである.
RDD(Resilient Distributed Datasetフレックス分散データセット)はSparkの中で最も核心的な概念である.RDDをマスターしてSparkを理解して、この章はいくつかの最も簡単な例を通じてRDDのScalaプログラミングを実現します.RDDのコア操作は3つあり,作成,変換操作,アクション操作である.
1.概要
以下の最も簡単な例はRDDの一連の典型的な動作を表す.以下はPythonコードで、Scalaも基本的に同じです.
lines = sc.textFile("README.md") //
pythonLines = lines.filter(lambda line: "Python" in line) //
pythonLines.first() //
2.RDDの作成
第一種外部データセットの読み込み
val lines = sc.textFile("README.md")
第二種既存の集合を利用する
RDDを作成する最も簡単な方法は、プログラムの既存のセットをSparkContextのparallelize()メソッドに渡すことです.
プロトタイプの開発とテストを除いて、この方法はあまり使われていません.結局、この方法はデータセット全体を1台のマシンのメモリに置く必要があります.
val lines = sc.parallelize(List("pandas", "i like pandas"))
3.変換操作
各要素に対する変換操作
val input = sc.parallelize(List(1, 2, 3, 4))
val result = input.map(x => x * x)
文字列RDDがあり、map()関数が文字列を解析してDouble値を返すために使用されている場合、入力RDDタイプはRDD[String]であり、出力タイプはRDD[Double]である.
newlines = lines.filter(line => line.contains("error"))
val lines = sc.parallelize(List("hello world", "hi"))
val words = lines.flatMap(line => line.split(" "))
words.first() // "hello"
擬似集合操作、2つのRDD操作
次の4つの方法は,並列,交差,差分,デカルト積をそれぞれ実現した.
val a = sc.parallelize(List(1, 2, 3))
val b = sc.parallelize(List(3, 4, 5))
a.cartesian(b) //{(1, 3), (1, 4), ... (3, 5)}, RDD
4.行動操作
val sum = rdd.reduce((x, y) => x + y) //
rdd.fold(0)((x, y) => x + y)
val result = input.aggregate((0, 0))(
(acc, value) => (acc._1 + value, acc._2 + 1),
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
val avg = result._1 / result._2.toDouble
line.foreach(println)
8. RDD, mean() variance() RDD , join() RDD 。
## 5. persist( )
Spark RDD , RDD。 RDD ,Spark RDD 。 , 。 persist , 。
```scala
val result = input.map(x => x * x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))
6.注意すべき点
オブジェクトがオブジェクトのメンバーであるか、オブジェクトのフィールドへの参照が含まれている場合(self.fieldなど)、Sparkはオブジェクト全体をワークノードに送信します.これは、あなたが伝えたいものよりもずっと大きい可能性があります.
python正しい書き方
class WordFunctions(object):
...
def getMatchesNoReference(self, rdd):
# : query = self.query
return rdd.filter(lambda x: query in x)
Scala正しい書き方
def getMatchesNoReference(rdd: RDD[String]): RDD[String] = {
// : val query_ = this.query
rdd.map(x => x.split(query_))
}
7.SparkとHadoopの違い
Hadoop MapReduceのようなシステムでは、開発者は、MapReduceのサイクル数を減らすために、操作を組み合わせる方法を考えるのに多くの時間を費やすことが多い.Sparkでは,非常に複雑なマッピングを書くことは,多くの簡単な連続動作を用いるよりも多くの性能を得ることができるとは限らない.根本的な原因はSparkの変換操作が不活性操作であることである.