SparkSQL(10)DataFrame基本APIの操作-1-(DSL)


一、DataFrameとRDDの対比
1.同じ点:両方とも分散型ストレージデータセットであり、ビッグデータ処理に適している.
2.相違点:
(1)RDDは内部構造を知らない
SparkSQL(10)DataFrame基本API的操作-1-(DSL)_第1张图片
(2)DataFrameは詳細な内部構造があり、データテーブルに類似する
SparkSQL(10)DataFrame基本API的操作-1-(DSL)_第2张图片
二、DataFrame基本データソースとAPI操作
1.DataFrameデータソース
公式サイト:http://spark.apache.org/docs/2.1.0/sql-programming-guide.html#datasets-and-dataframes
dataframeデータのロード
val spark=SparkSession.builder().appName("DataFrameApp").master("local[2]").getOrCreate()
// json       dataframe
val peopleDF= spark.read.format("json").load("datas/people.json")

2.基本APIの操作
(1)printSchema:dataframe対応のschema情報を出力する
peopleDF.printSchema()

(2)show:dataframeの上位20レコードを出力
peopleDF.show()  //  20 

//    30 
peopleDF.show(30)

(3)select:ある列のすべてのデータをクエリーする
peopleDF.select("name").show()

(4)col:ある列の列名を返す
//          ,       
peopleDF.select(peopleDF.col("name"),(peopleDF.col("age")+10).as("age2")).show()

(5)where/filter:カラムの値に基づいてフィルタリング
peopleDF.filter(peopleDF.col("age")>19).show()

または
//name M    
studentDF.filter("SUBSTR(name,0,1)='M'").show

(6)groupBy:ある列に従ってグループ化し、集約操作を行う
peopleDF.groupBy("age").count().show()

(7)sort/orderBy:グローバルデータ並べ替え機能、Hiveのorder by文のように所与のフィールドに従ってすべてのデータの並べ替えを行う
//         ,         
PersonDF.sort($"salary".desc,$"empAge".asc).select("empName","salary","empAge").show

または、サーバで検証します.
 studentDF.sort("name","id").show
  
studentDF.sort(studentDF("name").asc, studentDF("id").desc).show

(8)limit:前のN個のデータ記録を取得する
(9)sortWithinPartitions:ローカルソート
  PersonDF.sortWithinPartitions($"salary".desc,$"empAge".asc)
    .select("empName","salary","empAge").show

3.コード
(1)例1
package _0729DF


import _0728sql.Avg_UDAF
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.functions._
/**
 *
 */
/**
 * sparkDSL  
 * sparksql          ,sql       
 *
 * 1/       ,   sql
 * 2/  sql          
 *    (           )  join    ,    ,filter
 * 3/  sql ,   sql  ,             ,       dsl
 * 4/spark   ,         ,    spark   ,         
 *
 */
case class Person1(empName:String,empAge:Int,empGender:String,salary:Float,deptNo:Int)
case class Dept1(deptNo:Int,deptName:String)

object DSLdemo extends App{

  // 1.      
  val conf = new SparkConf()
    .setMaster("local[*]")
    .setAppName("DSLdemo")

  val sc = SparkContext.getOrCreate(conf)
  val sqlContext = new HiveContext(sc)

    sqlContext.udf.register("gender2Num",(gender:String) => {
    gender.toUpperCase() match {
      case "M" => 0
      case "F" => 1
      case _ => -1
    }
  })


  val rddp = sc.parallelize(Array(
    Person1("  ",21,"M",3231.2f,1),
    Person1("  ",23,"M",3412.2f,1),
    Person1("  ",52,"M",3562.2f,1),
    Person1("  ",21,"M",5655.2f,1),
    Person1("  ",23,"F",6574.2f,2),
    Person1("  ",21,"F",4547.2f,1),
    Person1("  ",61,"M",3231.2f,2),
    Person1("mary",43,"F",4354.2f,1),
    Person1("leo",53,"M",3231.2f,1),
    Person1("jack",12,"M",1111.2f,3),
    Person1("jane",34,"F",2624.2f,1),
    Person1("lili",41,"F",6754.2f,3)
  ))

  val rddd = sc.parallelize(Array(
    Dept1(1,"  1"),
    Dept1(2,"  2"),
    Dept1(4,"  4")
  ))
  import sqlContext.implicits._
  val deptDF: DataFrame = rddd.toDF()
  val PersonDF: DataFrame = rddp.toDF()
  PersonDF.registerTempTable("p")
  deptDF.cache()
  PersonDF.cache()
  //DSL
  println("===========================DSL===============================")

  /**
   * select empName,XXX,XXX from PersonDF
   *
   *   select  ,             
   *                       ( Dataset[Person],            ),        
   *                 select        (map)casewhen,substring ....
   */
  //   :(     )
  PersonDF.select("empName","empAge","empGender").show()

  //   :(  column)
  //   $   stringtocolumn   ,    functions
  //import org.apache.spark.sql.functions._
  PersonDF.select($"empName" as("empName1"),$"empAge" as("empAge1"),$"empGender" as("empGender1")).show()

  PersonDF.select(col("empName").as("empName2"),
    col("empAge").as("empAge2"),
    col("empGender").as("empGender2")).show()
  //   :       :
  PersonDF.selectExpr("empName","empAge","gender2Num(empGender) as sex").show()


  println("===========================where/filter======================================")

  PersonDF.where("empAge > 30 AND empGender = 'M' AND deptNo = 1").show
  PersonDF.where("empAge > 30").where("empGender = 'M' AND deptNo = 1").show
  PersonDF.filter($"empAge" > 30 && $"empGender" === "F").show

  println("=============================sort===========================================")
  //      ,     

  //    
  PersonDF.sort("salary").select("empName","salary").show
  PersonDF.sort($"salary".desc).select("empName","salary").show

  //         ,         
  PersonDF.sort($"salary".desc,$"empAge".asc).select("empName","salary","empAge").show


  println("============================order by=========================================")
  PersonDF.repartition(5).orderBy($"salary".desc,$"empAge".asc).select("empName","salary","empAge").show

  //    
  print("============================sortWithinPartitions===============================")
  PersonDF.sortWithinPartitions($"salary".desc,$"empAge".asc)
    .select("empName","salary","empAge").show

  //    
  println("============================group by==================================")
  //     BUG,            ,   
  // 1.6     ,     spark   
  PersonDF.groupBy("empGender")
    .agg(
      "salary" -> "avg",
      "salary" -> "sum"
    ).show
  //    :
  PersonDF.groupBy("empGender")
    .agg(
      avg("salary").as("avg1"),
      sum("salary").as("sum1"),
      count(lit(1)).as("cnt")
    ).show
  //    sql
  sqlContext.sql(
    """
      |select empGender,sum(salary)as sum_sql,avg(salary) as avg_sql
      |from p
      |group by empGender
    """.stripMargin).show
    println("==============================     ===================================")
  //       
  sqlContext.udf.register("self_avg",Avg_UDAF)

  PersonDF.groupBy("empGender")
    .agg(
      "salary" -> "self_avg"
    ).withColumn("test",lit(0))
    .show


  println("==============================limit===================================")
  PersonDF.limit(4).show()

  println("==============================join==========================================")
  /**
   *    ,   ,  ,  ,    
   */
  //        deptno      ?
  //PersonDF.join(deptDF,$"deptno"===$"deptno").show

  PersonDF.join(deptDF,"deptno").show

  //          ,      ,             
  //    ,      (        )
  PersonDF.join(deptDF,PersonDF.col("deptno")===deptDF.col("deptno"))
    //.toDF(" 1   "," 2   ")
    .select(PersonDF.col("deptno"),PersonDF.col("salary")).show

  deptDF.toDF("d1","dname").join(PersonDF,$"deptno"===$"d1").show
  println("--------------------------     -------------------------------")
  PersonDF.join(deptDF,Seq("deptNo"),"left").show
  PersonDF.join(deptDF,Seq("deptNo"),"right").show
  PersonDF.join(deptDF,Seq("deptNo"),"full").show
  //select * from deptDF where deptNo in (select deptNo from PersonDF)
  deptDF.join(PersonDF,Seq("deptNo"),"leftsemi").show



  println("=============================casewhen====================================")
  /**
   *  select *,
   *  case when salary <= 2000
   *  then "  "
   *  when salary > 2000 and salary <=4000
   *  then "    "
   *  else "  " as salarylevel
   *  end from PersonDF
   *
   *    //         ,             
   */
  PersonDF.select(PersonDF.col("empName"),PersonDF.col("salary"),
    when(PersonDF.col("salary") <= 2000,"  ")
      .when(PersonDF.col("salary") > 2000 && PersonDF.col("salary") <= 4000,"    ")
      .otherwise("  ").as("salarylevel")
  ).show()



  println("========================    ====================================")
  /**
   *1.       ,     hivecontext
   *
   *2.   HiveContext        : -XX:PermSize=128M -XX:MaxPermSize=256M
   *3.      ,   salary  ,        (     row_number  unionall)
   * select * from
   * (select *,
   * row_number() over (partition by deptno order by salary desc) as rnk
   * from PersonDF) a
   * where rnk <=3
   *
   * select * from PersonDF where deptno = 1 order by salary desc limit 3
   * union all
   * select * from PersonDF where deptno = 2 order by salary desc limit 3
   * union all
   * select * from PersonDF where deptno = 3 order by salary desc limit 3
   */
  val w = Window.partitionBy("deptNo").orderBy($"salary".desc,$"empAge".asc)

  PersonDF.select($"empName",$"empAge",$"deptNo",$"salary",
  row_number().over(w).as("rnk")
  ).where($"rnk".leq(3)).show


//  println("=========================union ALL============================")
//  PersonDF.select($"empName",$"empAge",$"deptNo",$"salary"
//  ).where($"deptNo" === 1).sort($"salary".desc,$"empAge".asc).limit(3)
//  .unionAll(PersonDF.select($"empName",$"empAge",$"deptNo",$"salary"
//  ).where($"deptNo" === 2).sort($"salary".desc,$"empAge".asc).limit(3))
//  .unionAll(PersonDF.select($"empName",$"empAge",$"deptNo",$"salary"
//  ).where($"deptNo" === 3).sort($"salary".desc,$"empAge".asc).limit(3)).show
//
//
//
//
//  //Thread.sleep(1232323l)
//  deptDF.unpersist()
//  PersonDF.unpersist()

}

(2)例2
package MoocSparkSQL

import org.apache.spark.sql.SparkSession

/**
  *
  * DataFrame      
  */
object DataFrameCase {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().appName("DataFrameRDDApp").master("local[2]").getOrCreate()

    // RDD ==> DataFrame
    val rdd = spark.sparkContext.textFile("datas/student.data") //      !20181001

    //  :        
    import spark.implicits._
    val studentDF = rdd.map(_.split("\\|")).map(line => Student(line(0).toInt, line(1), line(2), line(3))).toDF()

    //show      20 
    studentDF.show
    studentDF.show(30)
    studentDF.show(30, false)

    studentDF.take(10)
    studentDF.first()
    studentDF.head(3)


    studentDF.select("email").show(30,false)


    studentDF.filter("name=''").show
    studentDF.filter("name='' OR name='NULL'").show


    //name M    
    studentDF.filter("SUBSTR(name,0,1)='M'").show

    studentDF.sort(studentDF("name")).show
    studentDF.sort(studentDF("name").desc).show

    studentDF.sort("name","id").show
    studentDF.sort(studentDF("name").asc, studentDF("id").desc).show

    studentDF.select(studentDF("name").as("student_name")).show


    val studentDF2 = rdd.map(_.split("\\|")).map(line => Student(line(0).toInt, line(1), line(2), line(3))).toDF()

    studentDF.join(studentDF2, studentDF.col("id") === studentDF2.col("id")).show

    spark.stop()


  }

  case class Student(id: Int, name: String, phone: String, email: String)
}