SparkSQL(10)DataFrame基本APIの操作-1-(DSL)
一、DataFrameとRDDの対比
1.同じ点:両方とも分散型ストレージデータセットであり、ビッグデータ処理に適している.
2.相違点:
(1)RDDは内部構造を知らない
(2)DataFrameは詳細な内部構造があり、データテーブルに類似する
二、DataFrame基本データソースとAPI操作
1.DataFrameデータソース
公式サイト:http://spark.apache.org/docs/2.1.0/sql-programming-guide.html#datasets-and-dataframes
dataframeデータのロード
2.基本APIの操作
(1)printSchema:dataframe対応のschema情報を出力する
(2)show:dataframeの上位20レコードを出力
(3)select:ある列のすべてのデータをクエリーする
(4)col:ある列の列名を返す
(5)where/filter:カラムの値に基づいてフィルタリング
または
(6)groupBy:ある列に従ってグループ化し、集約操作を行う
(7)sort/orderBy:グローバルデータ並べ替え機能、Hiveのorder by文のように所与のフィールドに従ってすべてのデータの並べ替えを行う
または、サーバで検証します.
(8)limit:前のN個のデータ記録を取得する
(9)sortWithinPartitions:ローカルソート
3.コード
(1)例1
(2)例2
1.同じ点:両方とも分散型ストレージデータセットであり、ビッグデータ処理に適している.
2.相違点:
(1)RDDは内部構造を知らない
(2)DataFrameは詳細な内部構造があり、データテーブルに類似する
二、DataFrame基本データソースとAPI操作
1.DataFrameデータソース
公式サイト:http://spark.apache.org/docs/2.1.0/sql-programming-guide.html#datasets-and-dataframes
dataframeデータのロード
val spark=SparkSession.builder().appName("DataFrameApp").master("local[2]").getOrCreate()
// json dataframe
val peopleDF= spark.read.format("json").load("datas/people.json")
2.基本APIの操作
(1)printSchema:dataframe対応のschema情報を出力する
peopleDF.printSchema()
(2)show:dataframeの上位20レコードを出力
peopleDF.show() // 20
// 30
peopleDF.show(30)
(3)select:ある列のすべてのデータをクエリーする
peopleDF.select("name").show()
(4)col:ある列の列名を返す
// ,
peopleDF.select(peopleDF.col("name"),(peopleDF.col("age")+10).as("age2")).show()
(5)where/filter:カラムの値に基づいてフィルタリング
peopleDF.filter(peopleDF.col("age")>19).show()
または
//name M
studentDF.filter("SUBSTR(name,0,1)='M'").show
(6)groupBy:ある列に従ってグループ化し、集約操作を行う
peopleDF.groupBy("age").count().show()
(7)sort/orderBy:グローバルデータ並べ替え機能、Hiveのorder by文のように所与のフィールドに従ってすべてのデータの並べ替えを行う
// ,
PersonDF.sort($"salary".desc,$"empAge".asc).select("empName","salary","empAge").show
または、サーバで検証します.
studentDF.sort("name","id").show
studentDF.sort(studentDF("name").asc, studentDF("id").desc).show
(8)limit:前のN個のデータ記録を取得する
(9)sortWithinPartitions:ローカルソート
PersonDF.sortWithinPartitions($"salary".desc,$"empAge".asc)
.select("empName","salary","empAge").show
3.コード
(1)例1
package _0729DF
import _0728sql.Avg_UDAF
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.functions._
/**
*
*/
/**
* sparkDSL
* sparksql ,sql
*
* 1/ , sql
* 2/ sql
* ( ) join , ,filter
* 3/ sql , sql , , dsl
* 4/spark , , spark ,
*
*/
case class Person1(empName:String,empAge:Int,empGender:String,salary:Float,deptNo:Int)
case class Dept1(deptNo:Int,deptName:String)
object DSLdemo extends App{
// 1.
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("DSLdemo")
val sc = SparkContext.getOrCreate(conf)
val sqlContext = new HiveContext(sc)
sqlContext.udf.register("gender2Num",(gender:String) => {
gender.toUpperCase() match {
case "M" => 0
case "F" => 1
case _ => -1
}
})
val rddp = sc.parallelize(Array(
Person1(" ",21,"M",3231.2f,1),
Person1(" ",23,"M",3412.2f,1),
Person1(" ",52,"M",3562.2f,1),
Person1(" ",21,"M",5655.2f,1),
Person1(" ",23,"F",6574.2f,2),
Person1(" ",21,"F",4547.2f,1),
Person1(" ",61,"M",3231.2f,2),
Person1("mary",43,"F",4354.2f,1),
Person1("leo",53,"M",3231.2f,1),
Person1("jack",12,"M",1111.2f,3),
Person1("jane",34,"F",2624.2f,1),
Person1("lili",41,"F",6754.2f,3)
))
val rddd = sc.parallelize(Array(
Dept1(1," 1"),
Dept1(2," 2"),
Dept1(4," 4")
))
import sqlContext.implicits._
val deptDF: DataFrame = rddd.toDF()
val PersonDF: DataFrame = rddp.toDF()
PersonDF.registerTempTable("p")
deptDF.cache()
PersonDF.cache()
//DSL
println("===========================DSL===============================")
/**
* select empName,XXX,XXX from PersonDF
*
* select ,
* ( Dataset[Person], ),
* select (map)casewhen,substring ....
*/
// :( )
PersonDF.select("empName","empAge","empGender").show()
// :( column)
// $ stringtocolumn , functions
//import org.apache.spark.sql.functions._
PersonDF.select($"empName" as("empName1"),$"empAge" as("empAge1"),$"empGender" as("empGender1")).show()
PersonDF.select(col("empName").as("empName2"),
col("empAge").as("empAge2"),
col("empGender").as("empGender2")).show()
// : :
PersonDF.selectExpr("empName","empAge","gender2Num(empGender) as sex").show()
println("===========================where/filter======================================")
PersonDF.where("empAge > 30 AND empGender = 'M' AND deptNo = 1").show
PersonDF.where("empAge > 30").where("empGender = 'M' AND deptNo = 1").show
PersonDF.filter($"empAge" > 30 && $"empGender" === "F").show
println("=============================sort===========================================")
// ,
//
PersonDF.sort("salary").select("empName","salary").show
PersonDF.sort($"salary".desc).select("empName","salary").show
// ,
PersonDF.sort($"salary".desc,$"empAge".asc).select("empName","salary","empAge").show
println("============================order by=========================================")
PersonDF.repartition(5).orderBy($"salary".desc,$"empAge".asc).select("empName","salary","empAge").show
//
print("============================sortWithinPartitions===============================")
PersonDF.sortWithinPartitions($"salary".desc,$"empAge".asc)
.select("empName","salary","empAge").show
//
println("============================group by==================================")
// BUG, ,
// 1.6 , spark
PersonDF.groupBy("empGender")
.agg(
"salary" -> "avg",
"salary" -> "sum"
).show
// :
PersonDF.groupBy("empGender")
.agg(
avg("salary").as("avg1"),
sum("salary").as("sum1"),
count(lit(1)).as("cnt")
).show
// sql
sqlContext.sql(
"""
|select empGender,sum(salary)as sum_sql,avg(salary) as avg_sql
|from p
|group by empGender
""".stripMargin).show
println("============================== ===================================")
//
sqlContext.udf.register("self_avg",Avg_UDAF)
PersonDF.groupBy("empGender")
.agg(
"salary" -> "self_avg"
).withColumn("test",lit(0))
.show
println("==============================limit===================================")
PersonDF.limit(4).show()
println("==============================join==========================================")
/**
* , , , ,
*/
// deptno ?
//PersonDF.join(deptDF,$"deptno"===$"deptno").show
PersonDF.join(deptDF,"deptno").show
// , ,
// , ( )
PersonDF.join(deptDF,PersonDF.col("deptno")===deptDF.col("deptno"))
//.toDF(" 1 "," 2 ")
.select(PersonDF.col("deptno"),PersonDF.col("salary")).show
deptDF.toDF("d1","dname").join(PersonDF,$"deptno"===$"d1").show
println("-------------------------- -------------------------------")
PersonDF.join(deptDF,Seq("deptNo"),"left").show
PersonDF.join(deptDF,Seq("deptNo"),"right").show
PersonDF.join(deptDF,Seq("deptNo"),"full").show
//select * from deptDF where deptNo in (select deptNo from PersonDF)
deptDF.join(PersonDF,Seq("deptNo"),"leftsemi").show
println("=============================casewhen====================================")
/**
* select *,
* case when salary <= 2000
* then " "
* when salary > 2000 and salary <=4000
* then " "
* else " " as salarylevel
* end from PersonDF
*
* // ,
*/
PersonDF.select(PersonDF.col("empName"),PersonDF.col("salary"),
when(PersonDF.col("salary") <= 2000," ")
.when(PersonDF.col("salary") > 2000 && PersonDF.col("salary") <= 4000," ")
.otherwise(" ").as("salarylevel")
).show()
println("======================== ====================================")
/**
*1. , hivecontext
*
*2. HiveContext : -XX:PermSize=128M -XX:MaxPermSize=256M
*3. , salary , ( row_number unionall)
* select * from
* (select *,
* row_number() over (partition by deptno order by salary desc) as rnk
* from PersonDF) a
* where rnk <=3
*
* select * from PersonDF where deptno = 1 order by salary desc limit 3
* union all
* select * from PersonDF where deptno = 2 order by salary desc limit 3
* union all
* select * from PersonDF where deptno = 3 order by salary desc limit 3
*/
val w = Window.partitionBy("deptNo").orderBy($"salary".desc,$"empAge".asc)
PersonDF.select($"empName",$"empAge",$"deptNo",$"salary",
row_number().over(w).as("rnk")
).where($"rnk".leq(3)).show
// println("=========================union ALL============================")
// PersonDF.select($"empName",$"empAge",$"deptNo",$"salary"
// ).where($"deptNo" === 1).sort($"salary".desc,$"empAge".asc).limit(3)
// .unionAll(PersonDF.select($"empName",$"empAge",$"deptNo",$"salary"
// ).where($"deptNo" === 2).sort($"salary".desc,$"empAge".asc).limit(3))
// .unionAll(PersonDF.select($"empName",$"empAge",$"deptNo",$"salary"
// ).where($"deptNo" === 3).sort($"salary".desc,$"empAge".asc).limit(3)).show
//
//
//
//
// //Thread.sleep(1232323l)
// deptDF.unpersist()
// PersonDF.unpersist()
}
(2)例2
package MoocSparkSQL
import org.apache.spark.sql.SparkSession
/**
*
* DataFrame
*/
object DataFrameCase {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("DataFrameRDDApp").master("local[2]").getOrCreate()
// RDD ==> DataFrame
val rdd = spark.sparkContext.textFile("datas/student.data") // !20181001
// :
import spark.implicits._
val studentDF = rdd.map(_.split("\\|")).map(line => Student(line(0).toInt, line(1), line(2), line(3))).toDF()
//show 20
studentDF.show
studentDF.show(30)
studentDF.show(30, false)
studentDF.take(10)
studentDF.first()
studentDF.head(3)
studentDF.select("email").show(30,false)
studentDF.filter("name=''").show
studentDF.filter("name='' OR name='NULL'").show
//name M
studentDF.filter("SUBSTR(name,0,1)='M'").show
studentDF.sort(studentDF("name")).show
studentDF.sort(studentDF("name").desc).show
studentDF.sort("name","id").show
studentDF.sort(studentDF("name").asc, studentDF("id").desc).show
studentDF.select(studentDF("name").as("student_name")).show
val studentDF2 = rdd.map(_.split("\\|")).map(line => Student(line(0).toInt, line(1), line(2), line(3))).toDF()
studentDF.join(studentDF2, studentDF.col("id") === studentDF2.col("id")).show
spark.stop()
}
case class Student(id: Int, name: String, phone: String, email: String)
}