Spark SQL Dataset API全集
14403 ワード
概要
org.apache.spark.sql.DatasetはSpark SQLの中の核心の種類で、定義は以下の通りです。
本文はspark 2.3.0.
以下は種類の方法の紹介です。
クラスの方法
アクション
戻るタイプはDataFrameで、Datasetではありません。
org.apache.spark.sql.DatasetはSpark SQLの中の核心の種類で、定義は以下の通りです。
class Dataset[T] extends Serializable
DataFrameはDataset[Row]の別名です。本文はspark 2.3.0.
以下は種類の方法の紹介です。
クラスの方法
アクション
collect(): Array[T]
, Dataset 。
: driver 。
collectAsList(): List[T]
, Java list。
count(): Long
describe(cols: String*): DataFrame
, count, mean, stddev, min, and max.
head(): T
head(n: Int): Array[T]
N
first(): T
, head() 。
foreach(f: (T) ⇒ Unit): Unit
f
foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit
f
reduce(func: (T, T) ⇒ T): T
func, RDD , 。
: , 。
show(numRows: Int, truncate: Int, vertical: Boolean): Unit
。numRows: ,truncate: ,vertical: 。
show(numRows: Int, truncate: Int): Unit
show(numRows: Int, truncate: Boolean): Unit
show(truncate: Boolean): Unit
numRows=20 truncate=20
show(numRows: Int): Unit
truncate=20
show(): Unit
numRows=20 truncate=20
summary(statistics: String*): DataFrame
statistics , count, mean, stddev, min, approximate quartiles (percentiles at 25%, 50%, and 75%), and max.
。
take(n: Int): Array[T]
n
takeAsList(n: Int): List[T]
n list
toLocalIterator(): Iterator[T]
The iterator will consume as much memory as the largest partition in this Dataset.
基本関数(Baic Dataset functions)as[U](implicit arg0: Encoder[U]): Dataset[U]
U, Dataset
persist(newLevel: StorageLevel): Dataset.this.type
, 。
persist(): Dataset.this.type
cache
cache(): Dataset.this.type
,MEMORY_AND_DISK 。
:RDD cache MEMORY_ONLY。
checkpoint(eager: Boolean): Dataset[T]
checkpointed Dataset,Dataset 。
checkpoint(): Dataset[T]
,eager=true.
columns: Array[String]
。
dtypes: Array[(String, String)]
。
createGlobalTempView(viewName: String): Unit
(view), Spark 。
session 。e.g. SELECT * FROM global_temp.view1.
createOrReplaceGlobalTempView(viewName: String): Unit
, 。
createTempView(viewName: String): Unit
(view), SparkSession 。
: , db1.view1 。
createOrReplaceTempView(viewName: String): Unit
, 。
explain(): Unit
:queryExecution , 。
explain(extended: Boolean): Unit
+
hint(name: String, parameters: Any*): Dataset[T]
dataset hint。//todo
e.g. df1.join(df2.hint("broadcast"))
inputFiles: Array[String]
Dataset (Returns a best-effort snapshot of the files that compose this Dataset)
isLocal: Boolean
collect take , executor.
localCheckpoint(eager: Boolean): Dataset[T]
Checkpoint, dataset。
localCheckpoint(): Dataset[T]
eager=true
printSchema(): Unit
schema
rdd: RDD[T]
dataset RDD
schema: StructType
schema
storageLevel: StorageLevel
, persist StorageLevel.NONE
toDF(): DataFrame
toDF(colNames: String*): DataFrame
DataFrame, RDD DataFrame。
unpersist(): Dataset.this.type
unpersist(blocking: Boolean): Dataset.this.type
,blocking blocks , 。
write: DataFrameWriter[T]
DataFrameWriter, 。
writeStream: DataStreamWriter[T]
DataStreamWriter, 。
フロー関数(streming)isStreaming: Boolean
withWatermark(eventTime: String, delayThreshold: String): Dataset[T]
Defines an event time watermark for this Dataset.
//TODO
強タイプ変換(Typed transformations)alias(alias: Symbol): Dataset[T]
alias(alias: String): Dataset[T]
as(alias: Symbol): Dataset[T]
as(alias: String): Dataset[T]
Dataset
coalesce(numPartitions: Int): Dataset[T]
( )
distinct(): Dataset[T]
dropDuplicates
dropDuplicates(col1: String, cols: String*): Dataset[T]
dropDuplicates(colNames: Array[String]): Dataset[T]
dropDuplicates(colNames: Seq[String]): Dataset[T]
dropDuplicates(): Dataset[T]
, 。
except(other: Dataset[T]): Dataset[T]
other 。 EXCEPT DISTINCT in SQL。
//TODO
filter(func: (T) ⇒ Boolean): Dataset[T]
filter(conditionExpr: String): Dataset[T]
filter(condition: Column): Dataset[T]
e.g.
peopleDs.filter("age > 15")
peopleDs.filter($"age" > 15)
flatMap[U](func: (T) ⇒ TraversableOnce[U])(implicit arg0: Encoder[U]): Dataset[U]
map , 。
groupByKey[K](func: (T) ⇒ K)(implicit arg0: Encoder[K]): KeyValueGroupedDataset[K, T]
func key, key 。
intersect(other: Dataset[T]): Dataset[T]
dataset , INTERSECT in SQL.
joinWith[U](other: Dataset[U], condition: Column): Dataset[(T, U)]
inner equi-join dataset
joinWith[U](other: Dataset[U], condition: Column, joinType: String): Dataset[(T, U)]
joinType :inner, cross, outer, full, full_outer, left, left_outer, right, right_outer
limit(n: Int): Dataset[T]
n , head ,head action, 。
map[U](func: (T) ⇒ U)(implicit arg0: Encoder[U]): Dataset[U]
func , dataset。
mapPartitions[U](func: (Iterator[T]) ⇒ Iterator[U])(implicit arg0: Encoder[U]): Dataset[U]
func , dataset。
orderBy(sortExprs: Column*): Dataset[T]
orderBy(sortCol: String, sortCols: String*): Dataset[T]
sort
sort(sortExprs: Column*): Dataset[T]
sort(sortCol: String, sortCols: String*): Dataset[T]
, asc。
e.g. ds.sort($"col1", $"col2".desc)
sortWithinPartitions(sortExprs: Column*): Dataset[T]
sortWithinPartitions(sortCol: String, sortCols: String*): Dataset[T]
, "SORT BY" in SQL (Hive QL).
randomSplit(weights: Array[Double]): Array[Dataset[T]]
randomSplit(weights: Array[Double], seed: Long): Array[Dataset[T]]
repartition(partitionExprs: Column*): Dataset[T]
repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]
repartition(numPartitions: Int): Dataset[T]
, , (hash), "DISTRIBUTE BY" in SQL。
spark.sql.shuffle.partitions
repartitionByRange(partitionExprs: Column*): Dataset[T]
repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T]
, , , Range partition , 。
ascending nulls first, 。
sample(withReplacement: Boolean, fraction: Double): Dataset[T]
sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T]
sample(fraction: Double): Dataset[T]
sample(fraction: Double, seed: Long): Dataset[T]
withReplacement:Sample with replacement or not.
fraction:Fraction of rows to generate, range [0.0, 1.0].
seed:Seed for sampling.
select[U1](c1: TypedColumn[T, U1]): Dataset[U1]
/
transform[U](t: (Dataset[T]) ⇒ Dataset[U]): Dataset[U]
t Dataset。
union(other: Dataset[T]): Dataset[T]
UNION ALL in SQL。
:
val df1 = Seq((1, 2, 3)).toDF("col0", "col1", "col2")
val df2 = Seq((4, 5, 6)).toDF("col1", "col2", "col0")
df1.union(df2).show
// output:
// +----+----+----+
// |col0|col1|col2|
// +----+----+----+
// | 1| 2| 3|
// | 4| 5| 6|
// +----+----+----+
unionByName(other: Dataset[T]): Dataset[T]
union , :
val df1 = Seq((1, 2, 3)).toDF("col0", "col1", "col2")
val df2 = Seq((4, 5, 6)).toDF("col1", "col2", "col0")
df1.unionByName(df2).show
// output:
// +----+----+----+
// |col0|col1|col2|
// +----+----+----+
// | 1| 2| 3|
// | 6| 4| 5|
// +----+----+----+
where(conditionExpr: String): Dataset[T]
where(condition: Column): Dataset[T]
filter
弱タイプ変換(Unityped transformations)戻るタイプはDataFrameで、Datasetではありません。
agg(expr: Column, exprs: Column*): DataFrame
agg(exprs: Map[String, String]): DataFrame
agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame
dataset 。
ds.agg(...) ds.groupBy().agg(...) 。
e.g.
ds.agg(max($"age"), avg($"salary"))
ds.agg(Map("age" -> "max", "salary" -> "avg"))
ds.agg("age" -> "max", "salary" -> "avg")
apply(colName: String): Column
col(colName: String): Column
colRegex(colName: String): Column
。
crossJoin(right: Dataset[_]): DataFrame
cross join。
cube(col1: String, cols: String*): RelationalGroupedDataset
cube(cols: Column*): RelationalGroupedDataset
cube。
//TODO
drop(col: Column): DataFrame
drop(colNames: String*): DataFrame
drop(colName: String): DataFrame
。
groupBy(col1: String, cols: String*): RelationalGroupedDataset
groupBy(cols: Column*): RelationalGroupedDataset
join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame
join(right: Dataset[_], joinExprs: Column): DataFrame
join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame
join(right: Dataset[_], usingColumns: Seq[String]): DataFrame
join(right: Dataset[_], usingColumn: String): DataFrame
join(right: Dataset[_]): DataFrame
DataFrame join。
joinExprs:$"df1Key" === $"df2Key"
usingColumn:Seq("user_id", "user_name")
joinType:Default inner. Must be one of: inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, left_anti.
na: DataFrameNaFunctions
DataFrameNaFunctions
stat: DataFrameStatFunctions
DataFrameStatFunctions
rollup(col1: String, cols: String*): RelationalGroupedDataset
rollup(cols: Column*): RelationalGroupedDataset
rollup 。//TODO
select(col: String, cols: String*): DataFrame
select(cols: Column*): DataFrame
selectExpr(exprs: String*): DataFrame
、SQL 。
withColumn(colName: String, col: Column): DataFrame
。
withColumnRenamed(existingName: String, newName: String): DataFrame
。
グループ化されていません。
queryExecution: QueryExecution
sparkSession: SparkSession
dataset SparkSession
sqlContext: SQLContext
dataset SQLContext
toJSON: Dataset[String]
JSON 。
toString(): String
Any toString