Spark SQL Dataset API全集

14403 ワード

概要
org.apache.spark.sql.DatasetはSpark SQLの中の核心の種類で、定義は以下の通りです。
class Dataset[T] extends Serializable
DataFrameはDataset[Row]の別名です。
本文はspark 2.3.0.
以下は種類の方法の紹介です。
クラスの方法
アクション
collect(): Array[T]
      ,  Dataset      。
  :         driver     。

collectAsList(): List[T]
  ,    Java list。

count(): Long
    

describe(cols: String*): DataFrame
          ,  count, mean, stddev, min, and max.

head(): T
     

head(n: Int): Array[T]
   N 

first(): T
     , head()   。

foreach(f: (T) ⇒ Unit): Unit
       f  

foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit
         f  

reduce(func: (T, T) ⇒ T): T
      func, RDD          ,      。
  :               ,            。

show(numRows: Int, truncate: Int, vertical: Boolean): Unit
         。numRows:     ,truncate:             ,vertical:    。

show(numRows: Int, truncate: Int): Unit
show(numRows: Int, truncate: Boolean): Unit
show(truncate: Boolean): Unit
numRows=20 truncate=20

show(numRows: Int): Unit
truncate=20

show(): Unit
numRows=20 truncate=20

summary(statistics: String*): DataFrame
     statistics     ,    count, mean, stddev, min, approximate quartiles (percentiles at 25%, 50%, and 75%), and max.
          。

take(n: Int): Array[T]
   n 

takeAsList(n: Int): List[T]
   n    list

toLocalIterator(): Iterator[T]
           
The iterator will consume as much memory as the largest partition in this Dataset.

基本関数(Baic Dataset functions)
as[U](implicit arg0: Encoder[U]): Dataset[U]
          U,    Dataset

persist(newLevel: StorageLevel): Dataset.this.type
    ,       。

persist(): Dataset.this.type
 cache  

cache(): Dataset.this.type
    ,MEMORY_AND_DISK  。
  :RDD cache     MEMORY_ONLY。

checkpoint(eager: Boolean): Dataset[T]
    checkpointed Dataset,Dataset           。

checkpoint(): Dataset[T]
  ,eager=true.

columns: Array[String]
          。

dtypes: Array[(String, String)]
             。

createGlobalTempView(viewName: String): Unit
        (view),     Spark    。
   session  。e.g. SELECT * FROM global_temp.view1.

createOrReplaceGlobalTempView(viewName: String): Unit
  ,      。

createTempView(viewName: String): Unit
        (view),   SparkSession   。
  :       ,   db1.view1       。

createOrReplaceTempView(viewName: String): Unit
  ,      。

explain(): Unit
        
  :queryExecution  ,      。

explain(extended: Boolean): Unit
    +      

hint(name: String, parameters: Any*): Dataset[T]
  dataset  hint。//todo
e.g. df1.join(df2.hint("broadcast"))

inputFiles: Array[String]
    Dataset     (Returns a best-effort snapshot of the files that compose this Dataset)

isLocal: Boolean
collect take        ,   executor.

localCheckpoint(eager: Boolean): Dataset[T]
    Checkpoint,   dataset。

localCheckpoint(): Dataset[T]
eager=true

printSchema(): Unit
  schema  

rdd: RDD[T]
dataset   RDD

schema: StructType
schema

storageLevel: StorageLevel
      ,   persist  StorageLevel.NONE

toDF(): DataFrame
toDF(colNames: String*): DataFrame
  DataFrame,    RDD  DataFrame。

unpersist(): Dataset.this.type
unpersist(blocking: Boolean): Dataset.this.type
    ,blocking       blocks      ,      。

write: DataFrameWriter[T]
DataFrameWriter,        。

writeStream: DataStreamWriter[T]
DataStreamWriter,       。
フロー関数(streming)
isStreaming: Boolean
      

withWatermark(eventTime: String, delayThreshold: String): Dataset[T]
Defines an event time watermark for this Dataset.
//TODO

強タイプ変換(Typed transformations)
alias(alias: Symbol): Dataset[T]
alias(alias: String): Dataset[T]
as(alias: Symbol): Dataset[T]
as(alias: String): Dataset[T]
 Dataset    

coalesce(numPartitions: Int): Dataset[T]
    (      )

distinct(): Dataset[T]
dropDuplicates   

dropDuplicates(col1: String, cols: String*): Dataset[T]
dropDuplicates(colNames: Array[String]): Dataset[T]
dropDuplicates(colNames: Seq[String]): Dataset[T]
dropDuplicates(): Dataset[T]
      ,     。

except(other: Dataset[T]): Dataset[T]
  other     。 EXCEPT DISTINCT in SQL。
//TODO

filter(func: (T) ⇒ Boolean): Dataset[T]
filter(conditionExpr: String): Dataset[T]
filter(condition: Column): Dataset[T]
       
e.g.
peopleDs.filter("age > 15")
peopleDs.filter($"age" > 15)

flatMap[U](func: (T) ⇒ TraversableOnce[U])(implicit arg0: Encoder[U]): Dataset[U]
    map  ,          。

groupByKey[K](func: (T) ⇒ K)(implicit arg0: Encoder[K]): KeyValueGroupedDataset[K, T]
   func    key,   key  。

intersect(other: Dataset[T]): Dataset[T]
   dataset   ,   INTERSECT in SQL.

joinWith[U](other: Dataset[U], condition: Column): Dataset[(T, U)]
inner equi-join  dataset

joinWith[U](other: Dataset[U], condition: Column, joinType: String): Dataset[(T, U)]
joinType  :inner, cross, outer, full, full_outer, left, left_outer, right, right_outer

limit(n: Int): Dataset[T]
   n , head    ,head   action,         。

map[U](func: (T) ⇒ U)(implicit arg0: Encoder[U]): Dataset[U]
        func  ,        dataset。

mapPartitions[U](func: (Iterator[T]) ⇒ Iterator[U])(implicit arg0: Encoder[U]): Dataset[U]
        func  ,        dataset。

orderBy(sortExprs: Column*): Dataset[T]
orderBy(sortCol: String, sortCols: String*): Dataset[T]
sort   

sort(sortExprs: Column*): Dataset[T]
sort(sortCol: String, sortCols: String*): Dataset[T]
      ,  asc。
e.g. ds.sort($"col1", $"col2".desc)

sortWithinPartitions(sortExprs: Column*): Dataset[T]
sortWithinPartitions(sortCol: String, sortCols: String*): Dataset[T]
     , "SORT BY" in SQL (Hive QL).

randomSplit(weights: Array[Double]): Array[Dataset[T]]
randomSplit(weights: Array[Double], seed: Long): Array[Dataset[T]]
         

repartition(partitionExprs: Column*): Dataset[T]
repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]
repartition(numPartitions: Int): Dataset[T]
      ,   ,    (hash), "DISTRIBUTE BY" in SQL。
      spark.sql.shuffle.partitions

repartitionByRange(partitionExprs: Column*): Dataset[T]
repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T]
      ,   ,    ,  Range partition  ,      。
         ascending nulls first,        。

sample(withReplacement: Boolean, fraction: Double): Dataset[T]
sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T]
sample(fraction: Double): Dataset[T]
sample(fraction: Double, seed: Long): Dataset[T]
       
withReplacement:Sample with replacement or not.
fraction:Fraction of rows to generate, range [0.0, 1.0].
seed:Seed for sampling.

select[U1](c1: TypedColumn[T, U1]): Dataset[U1]
   /        

transform[U](t: (Dataset[T]) ⇒ Dataset[U]): Dataset[U]
  t    Dataset。

union(other: Dataset[T]): Dataset[T]
  UNION ALL in SQL。
         :
val df1 = Seq((1, 2, 3)).toDF("col0", "col1", "col2")
val df2 = Seq((4, 5, 6)).toDF("col1", "col2", "col0")
df1.union(df2).show

// output:
// +----+----+----+
// |col0|col1|col2|
// +----+----+----+
// |   1|   2|   3|
// |   4|   5|   6|
// +----+----+----+

unionByName(other: Dataset[T]): Dataset[T]
 union  ,       :
val df1 = Seq((1, 2, 3)).toDF("col0", "col1", "col2")
val df2 = Seq((4, 5, 6)).toDF("col1", "col2", "col0")
df1.unionByName(df2).show
// output:
// +----+----+----+
// |col0|col1|col2|
// +----+----+----+
// |   1|   2|   3|
// |   6|   4|   5|
// +----+----+----+

where(conditionExpr: String): Dataset[T]
where(condition: Column): Dataset[T]
filter   


弱タイプ変換(Unityped transformations)
戻るタイプはDataFrameで、Datasetではありません。
agg(expr: Column, exprs: Column*): DataFrame
agg(exprs: Map[String, String]): DataFrame
agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame
   dataset    。
ds.agg(...)   ds.groupBy().agg(...)    。
e.g.
ds.agg(max($"age"), avg($"salary"))
ds.agg(Map("age" -> "max", "salary" -> "avg"))
ds.agg("age" -> "max", "salary" -> "avg")


apply(colName: String): Column
col(colName: String): Column
colRegex(colName: String): Column
     。

crossJoin(right: Dataset[_]): DataFrame
cross join。

cube(col1: String, cols: String*): RelationalGroupedDataset
cube(cols: Column*): RelationalGroupedDataset
         cube。
//TODO

drop(col: Column): DataFrame
drop(colNames: String*): DataFrame
drop(colName: String): DataFrame
      。

groupBy(col1: String, cols: String*): RelationalGroupedDataset
groupBy(cols: Column*): RelationalGroupedDataset
      

join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame
join(right: Dataset[_], joinExprs: Column): DataFrame
join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame
join(right: Dataset[_], usingColumns: Seq[String]): DataFrame
join(right: Dataset[_], usingColumn: String): DataFrame
join(right: Dataset[_]): DataFrame
    DataFrame join。
joinExprs:$"df1Key" === $"df2Key"
usingColumn:Seq("user_id", "user_name")
joinType:Default inner. Must be one of: inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, left_anti.


na: DataFrameNaFunctions
 DataFrameNaFunctions

stat: DataFrameStatFunctions
 DataFrameStatFunctions

rollup(col1: String, cols: String*): RelationalGroupedDataset
rollup(cols: Column*): RelationalGroupedDataset
       rollup  。//TODO

select(col: String, cols: String*): DataFrame
select(cols: Column*): DataFrame
selectExpr(exprs: String*): DataFrame
     、SQL   。

withColumn(colName: String, col: Column): DataFrame
       。

withColumnRenamed(existingName: String, newName: String): DataFrame
      。
グループ化されていません。

queryExecution: QueryExecution
    

sparkSession: SparkSession
   dataset SparkSession

sqlContext: SQLContext
dataset SQLContext

toJSON: Dataset[String]
      JSON   。

toString(): String
Any toString