SparkのScala言語の一般的な応用例
4535 ワード
初心者として初めてSparkを学び、自分の心得を分かち合います.
Sparkプログラミングを学ぶには、まずコンパイル環境を準備し、プログラミング言語を確定しなければならない.本人はScala言語、IntelliJ IDEAコンパイル環境を使用している.同時に、spark-assembly-1.3.1-hd-2.6.0.jar、scala-compiler.jar、scala-library.jar、scala-reflect.jarの4つのパッケージを準備しなければならない.この4つのパッケージをインポートしてこそ、自分のScalaプログラミングの旅を始めることができます.
Hadoop環境が構築されていないため、Scalaプログラミングを練習する際、Hadoop上でHDFSのデータを読み取ることはできませんが、邪魔ではありません.プログラミングを練習するために、地元のtxtファイルを読み取り、結果をtxtに保存することができます.これにより、Spark RDDの強さを感じるだけでなく、プログラミングを練習する目的を達成することができます.以下では主にSpark RDDでよく使われる操作を例に挙げて説明する.
まずSparkConf()を構成しなければなりません.一般的にはHDFS上のファイルを読み込みますが、ここではローカルtxtファイルを読み込み、SparkConf()を以下のように構成します.
説明:Local[N]:ローカルモードで、N個のスレッドを使用します.
次のプログラムはcount()を使用して行数を統計します.
頻度を統計し、頻度でソートします.
Sparkプログラミングを学ぶには、まずコンパイル環境を準備し、プログラミング言語を確定しなければならない.本人はScala言語、IntelliJ IDEAコンパイル環境を使用している.同時に、spark-assembly-1.3.1-hd-2.6.0.jar、scala-compiler.jar、scala-library.jar、scala-reflect.jarの4つのパッケージを準備しなければならない.この4つのパッケージをインポートしてこそ、自分のScalaプログラミングの旅を始めることができます.
Hadoop環境が構築されていないため、Scalaプログラミングを練習する際、Hadoop上でHDFSのデータを読み取ることはできませんが、邪魔ではありません.プログラミングを練習するために、地元のtxtファイルを読み取り、結果をtxtに保存することができます.これにより、Spark RDDの強さを感じるだけでなく、プログラミングを練習する目的を達成することができます.以下では主にSpark RDDでよく使われる操作を例に挙げて説明する.
まずSparkConf()を構成しなければなりません.一般的にはHDFS上のファイルを読み込みますが、ここではローカルtxtファイルを読み込み、SparkConf()を以下のように構成します.
conf=new SparkConf().setAppName("Test").setMaster("local[4]")
説明:Local[N]:ローカルモードで、N個のスレッドを使用します.
次のプログラムはcount()を使用して行数を統計します.
object yb {
/*
,
*/
def main(args: Array[String]): Unit={
val conf=new SparkConf().setAppName("Test").setMaster("local[4]")
val sc = new SparkContext(conf)
val lines = sc.textFile("E:/Spark/ /data_format1/yb.txt")
val countx=lines.count()//
println(countx)// : 10485750
}
}
頻度を統計し、頻度でソートします.
object yb {
def main(args: Array[String]): Unit={
val conf=new SparkConf().setAppName("Test").setMaster("local[4]")
val sc = new SparkContext(conf)
val lines = sc.textFile("E:/Spark/ /data_format1/100W.txt")
/*
sortByKey 。1、true( ), 。2、 ( )
flatMap 。
*/
val worldcounts=lines.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey((a,b) => a+b).map{case (key,value)=>(value,key)}.sortByKey(true,1)
worldcounts.foreach(println)
}
}
Map()和flatMap()区别:object yb{ def main (args: Array[String]) { val m=List(List(1,2),List(3,4)) println(m.map(x=>x)) println(m) val x=m.flatten println(x) println(m.flatMap(x =>x)) }
上記のプログラムから得られるflatMapは、Mapとflattenが統合されているとともに、flatMapは最終的に一連のシーケンスを出力し、Map出力は複数の集合であることがわかります.
union()の使い方:object yb{ def main(args: Array[String]) { val m1=List(List(1,2),List(3,4)) val m2=List(List(1,2),List(3,4)) val unionx=m1.union(m2)// println(unionx) val mx1=List(1,2) val mx2=List(3,4) val unionxx=mx1.union(mx2)// println(unionxx) } }
デカルト積cartesian()の使い方:object yb{ def main(args: Array[String]) { val conf=new SparkConf().setAppName("Test").setMaster("local[4]") val sc = new SparkContext(conf) val data1=sc.parallelize(List(1,2,3))// , RDD , RDD 。 val data2=sc.parallelize(List(4,5,6)) data1.cartesian(data2).foreach(println) } }
groupByKey()とreduceByKey()の違い:object yb { def main(args: Array[String]){ val conf=new SparkConf().setAppName("Test").setMaster("local[4]") val sc = new SparkContext(conf) val lines = sc.textFile("E:/Spark/ /data_format1/100W.txt") /* sortByKey 。1、true( ), 。2、 ( ) */ val worldcounts=lines.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey((a,b) => a+b).map{case (key,value)=>(value,key)}.sortByKey(false,1)// val topK=worldcounts.top(10) topK.foreach(println)// } }
groupByKeyはローカルmergeではなくメインノードmergeで統一 reduceByKeyはローカルmergeでプライマリノードmergeに
reduce()の使い方:object yb{ def main(args: Array[String]) { val data=List(1,2,3,4) val sum=data.reduce((x,y)=>x+y) println(sum)// :10 } }//reduce RDD , , RDD 。