SparkSQL(8):DataSetとDataFrameの区別と変換
1.概念:
(1)DataSetとRDD
ビッグデータのフレームワークの多くはメモリのデータをディスクに書くので、rddとdataframeの代わりにDataSetが置き換えられます.なぜなら,現段階では下位シーケンス化メカニズムはjavaまたはKryoの形式を用いているからである.しかし、Javaのシーケンス化されたデータは大きく、格納Kryoに影響を与える小さなデータ量の処理は良いが、データ量が大きくなるとまた問題が発生するため、公式の解決策はカスタムエンコーダ(Encoder)を用いてシーケンス化することである
(2)DataSetとDataFrame
DataSetとDataFrameには大きな違いがあり、DataFrame開発はsqlと書かれていますが、DataSetはRDDのようなAPIを使用しています.データセットはデータ型が格納されたRDDであると理解できる
2.DataSetDataFrameRDDの違い:
(1)同一点:いずれも分散データセットであるDataFrameの下位はRDDであるが、DataSetはそうではないが、最後にRDDに変換してDataSetを実行し、DataFrameを実行する同一点は、いずれもデータ特徴、データ型の分散データセット(schema)(2)の相違点である.(a)schema情報:RDDのデータはデータ型がないDataFrameのデータは弱いデータ型であり、データ型チェックはしないschemaでデータ型が規定されているが、コンパイル時にはエラーが報告されず、実行時にエラーが報告されるDataSetのデータ型は強いデータ型(b)のシーケンス化メカニズムである:RDDとDataFrameのデフォルトのシーケンス化メカニズムはjavaのシーケンス化であり、Kyroのメカニズムに変更できるDataSetはカスタムデータエンコーダを用いてシーケンス化と逆シーケンス化を行う
3.作成方法:
(1)toDSを使う前に
(2)メモリ内のデータをDataSetに変換する
ここで、collect():すべての行情報を含むArrayを返します.
(3)case classオブジェクトを直接DataSetに変換できる
(4)DataFrameをDataSetに変換するが、DataFrameのデータ型はcase classであることが要求される
また、DataFrameのデータ型はcase classと一致しなければならない(順序も一致しなければならない)
4.コード
(1)DataSetとRDD
ビッグデータのフレームワークの多くはメモリのデータをディスクに書くので、rddとdataframeの代わりにDataSetが置き換えられます.なぜなら,現段階では下位シーケンス化メカニズムはjavaまたはKryoの形式を用いているからである.しかし、Javaのシーケンス化されたデータは大きく、格納Kryoに影響を与える小さなデータ量の処理は良いが、データ量が大きくなるとまた問題が発生するため、公式の解決策はカスタムエンコーダ(Encoder)を用いてシーケンス化することである
(2)DataSetとDataFrame
DataSetとDataFrameには大きな違いがあり、DataFrame開発はsqlと書かれていますが、DataSetはRDDのようなAPIを使用しています.データセットはデータ型が格納されたRDDであると理解できる
2.DataSetDataFrameRDDの違い:
(1)同一点:いずれも分散データセットであるDataFrameの下位はRDDであるが、DataSetはそうではないが、最後にRDDに変換してDataSetを実行し、DataFrameを実行する同一点は、いずれもデータ特徴、データ型の分散データセット(schema)(2)の相違点である.(a)schema情報:RDDのデータはデータ型がないDataFrameのデータは弱いデータ型であり、データ型チェックはしないschemaでデータ型が規定されているが、コンパイル時にはエラーが報告されず、実行時にエラーが報告されるDataSetのデータ型は強いデータ型(b)のシーケンス化メカニズムである:RDDとDataFrameのデフォルトのシーケンス化メカニズムはjavaのシーケンス化であり、Kyroのメカニズムに変更できるDataSetはカスタムデータエンコーダを用いてシーケンス化と逆シーケンス化を行う
3.作成方法:
(1)toDSを使う前に
import sqlContext.implicits._
(2)メモリ内のデータをDataSetに変換する
// Encoders for most common types are automatically provided by importing
sqlContext.implicits._
val ds = Seq(1, 2, 3).toDS()
ds.map(_ + 1).collect() // Returns: Array(2, 3, 4)
ここで、collect():すべての行情報を含むArrayを返します.
(3)case classオブジェクトを直接DataSetに変換できる
// Encoders are also created for case classes.
case class Person(name: String, age: Long)
val ds = Seq(Person("Andy", 32)).toDS()
(4)DataFrameをDataSetに変換するが、DataFrameのデータ型はcase classであることが要求される
また、DataFrameのデータ型はcase classと一致しなければならない(順序も一致しなければならない)
4.コード
package _0729DF
import org.apache.spark
import org.apache.spark.sql.{Dataset, SparkSession}
//import org.apache.spark
/**
*
*/
object Dataset extends App{
// import spark.implicits._
//
// val ds = Seq(1, 2, 3).toDS()
// ds.map(_ + 1).collect() // Returns: Array(2, 3, 4)
//
//
// // Encoders are also created for case classes.
// case class Person(name: String, age: Long)
// val ds = Seq(Person("Andy", 32)).toDS()
// ds.show
val session = SparkSession.builder()
.appName("app")
.master("local")
.getOrCreate()
val sqlContext = session.sqlContext
val wcDs = sqlContext.read.textFile("datas/halibote.txt")
//
import session.implicits._
val wordData=wcDs.flatMap(_.split(" "))
wordData.createTempView("t_word")
wordData.show()
//wordData.printSchema()
// Encoders for most common types are automatically provided by importing sqlContext.implicits._
val ds=Seq(1,2,3).toDS()
ds.map(_ + 1).collect() // Returns: Array(2, 3, 4)
// // Encoders are also created for case classes.
// case class Person(name: String, age: Long)
// val ds = Seq(Person("Andy", 32)).toDS()
// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name.
case class Person(age:Long,name:String)
val path = "datas/people.json"
val people: Dataset[Person] = sqlContext.read.json(path).as[Person]
people.show()
}