SparkのScala言語の一般的な応用例

4535 ワード

初心者として初めてSparkを学び、自分の心得を分かち合います.
        Sparkプログラミングを学ぶには、まずコンパイル環境を準備し、プログラミング言語を確定しなければならない.本人はScala言語、IntelliJ IDEAコンパイル環境を使用している.同時に、spark-assembly-1.3.1-hd-2.6.0.jar、scala-compiler.jar、scala-library.jar、scala-reflect.jarの4つのパッケージを準備しなければならない.この4つのパッケージをインポートしてこそ、自分のScalaプログラミングの旅を始めることができます.
       Hadoop環境が構築されていないため、Scalaプログラミングを練習する際、Hadoop上でHDFSのデータを読み取ることはできませんが、邪魔ではありません.プログラミングを練習するために、地元のtxtファイルを読み取り、結果をtxtに保存することができます.これにより、Spark RDDの強さを感じるだけでなく、プログラミングを練習する目的を達成することができます.以下では主にSpark RDDでよく使われる操作を例に挙げて説明する.
       まずSparkConf()を構成しなければなりません.一般的にはHDFS上のファイルを読み込みますが、ここではローカルtxtファイルを読み込み、SparkConf()を以下のように構成します.
conf=new SparkConf().setAppName("Test").setMaster("local[4]")

        説明:Local[N]:ローカルモードで、N個のスレッドを使用します.
        次のプログラムはcount()を使用して行数を統計します.
object yb {
  /*
             ,           
   */
  def main(args: Array[String]): Unit={
    val conf=new SparkConf().setAppName("Test").setMaster("local[4]")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("E:/Spark/     /data_format1/yb.txt")
    val countx=lines.count()//    
    println(countx)//  : 10485750 
  }
}

       頻度を統計し、頻度でソートします.
object yb {
  def main(args: Array[String]): Unit={

    val conf=new SparkConf().setAppName("Test").setMaster("local[4]")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("E:/Spark/     /data_format1/100W.txt")
    /*
    sortByKey     。1、true(  ),    。2、     (   )
    flatMap              。
     */
    val worldcounts=lines.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey((a,b) => a+b).map{case (key,value)=>(value,key)}.sortByKey(true,1)
    worldcounts.foreach(println)
  }
}
 
          Map()和flatMap()区别: 
  
object yb{
   def main (args: Array[String]) {
     val m=List(List(1,2),List(3,4))
     println(m.map(x=>x))
     println(m)
     val x=m.flatten
     println(x)
     println(m.flatMap(x =>x))
  }

      上記のプログラムから得られるflatMapは、Mapとflattenが統合されているとともに、flatMapは最終的に一連のシーケンスを出力し、Map出力は複数の集合であることがわかります.
       union()の使い方:
object yb{
  def main(args: Array[String]) {
    val m1=List(List(1,2),List(3,4))
    val m2=List(List(1,2),List(3,4))
    val unionx=m1.union(m2)//          
    println(unionx)
    val mx1=List(1,2)
    val mx2=List(3,4)
    val unionxx=mx1.union(mx2)//          
    println(unionxx)
  }
}

      デカルト積cartesian()の使い方:
object yb{
  def main(args: Array[String]) {
    val conf=new SparkConf().setAppName("Test").setMaster("local[4]")
    val sc = new SparkContext(conf)
    val data1=sc.parallelize(List(1,2,3))//   ,          RDD  ,     RDD   。
    val data2=sc.parallelize(List(4,5,6))
    data1.cartesian(data2).foreach(println)
  }
}

      groupByKey()とreduceByKey()の違い:
object yb {
  def main(args: Array[String]){
    val conf=new SparkConf().setAppName("Test").setMaster("local[4]")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("E:/Spark/     /data_format1/100W.txt")
    /*
    sortByKey     。1、true(  ),    。2、     (   )
     */
    val worldcounts=lines.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey((a,b) => a+b).map{case (key,value)=>(value,key)}.sortByKey(false,1)//           
    val topK=worldcounts.top(10)
    topK.foreach(println)//         
   
  }
}

    groupByKeyはローカルmergeではなくメインノードmergeで統一    reduceByKeyはローカルmergeでプライマリノードmergeに
      reduce()の使い方:
object yb{
  def main(args: Array[String]) {
    val data=List(1,2,3,4)
   val sum=data.reduce((x,y)=>x+y)
    println(sum)//  :10
  }
}//reduce RDD            ,         ,      RDD                          
                               。