SparkSQL(8):DataSetとDataFrameの区別と変換


1.概念:
(1)DataSetとRDD
ビッグデータのフレームワークの多くはメモリのデータをディスクに書くので、rddとdataframeの代わりにDataSetが置き換えられます.なぜなら,現段階では下位シーケンス化メカニズムはjavaまたはKryoの形式を用いているからである.しかし、Javaのシーケンス化されたデータは大きく、格納Kryoに影響を与える小さなデータ量の処理は良いが、データ量が大きくなるとまた問題が発生するため、公式の解決策はカスタムエンコーダ(Encoder)を用いてシーケンス化することである
(2)DataSetとDataFrame
DataSetとDataFrameには大きな違いがあり、DataFrame開発はsqlと書かれていますが、DataSetはRDDのようなAPIを使用しています.データセットはデータ型が格納されたRDDであると理解できる
2.DataSetDataFrameRDDの違い:
(1)同一点:いずれも分散データセットであるDataFrameの下位はRDDであるが、DataSetはそうではないが、最後にRDDに変換してDataSetを実行し、DataFrameを実行する同一点は、いずれもデータ特徴、データ型の分散データセット(schema)(2)の相違点である.(a)schema情報:RDDのデータはデータ型がないDataFrameのデータは弱いデータ型であり、データ型チェックはしないschemaでデータ型が規定されているが、コンパイル時にはエラーが報告されず、実行時にエラーが報告されるDataSetのデータ型は強いデータ型(b)のシーケンス化メカニズムである:RDDとDataFrameのデフォルトのシーケンス化メカニズムはjavaのシーケンス化であり、Kyroのメカニズムに変更できるDataSetはカスタムデータエンコーダを用いてシーケンス化と逆シーケンス化を行う
3.作成方法:
(1)toDSを使う前に
import sqlContext.implicits._

(2)メモリ内のデータをDataSetに変換する
// Encoders for most common types are automatically provided by importing 
sqlContext.implicits._
val ds = Seq(1, 2, 3).toDS()
ds.map(_ + 1).collect() // Returns: Array(2, 3, 4)

ここで、collect():すべての行情報を含むArrayを返します.
(3)case classオブジェクトを直接DataSetに変換できる
    // Encoders are also created for case classes.
    case class Person(name: String, age: Long)
    val ds = Seq(Person("Andy", 32)).toDS()

(4)DataFrameをDataSetに変換するが、DataFrameのデータ型はcase classであることが要求される
また、DataFrameのデータ型はcase classと一致しなければならない(順序も一致しなければならない)
4.コード
package _0729DF
import org.apache.spark
import org.apache.spark.sql.{Dataset, SparkSession}
//import  org.apache.spark


/**
  * 
  */
object Dataset extends App{
//  import spark.implicits._
//
//  val ds = Seq(1, 2, 3).toDS()
//  ds.map(_ + 1).collect() // Returns: Array(2, 3, 4)
//
//
//  // Encoders are also created for case classes.
//  case class Person(name: String, age: Long)
//  val ds = Seq(Person("Andy", 32)).toDS()
//  ds.show


  val session = SparkSession.builder()
    .appName("app")
    .master("local")
    .getOrCreate()

  val sqlContext = session.sqlContext
  val wcDs = sqlContext.read.textFile("datas/halibote.txt")

  //       
  import session.implicits._
  val wordData=wcDs.flatMap(_.split(" "))
  wordData.createTempView("t_word")

  wordData.show()
  //wordData.printSchema()

  // Encoders for most common types are automatically provided by importing sqlContext.implicits._
  val ds=Seq(1,2,3).toDS()
  ds.map(_ + 1).collect() // Returns: Array(2, 3, 4)

//  // Encoders are also created for case classes.
//  case class Person(name: String, age: Long)
//  val ds = Seq(Person("Andy", 32)).toDS()


  // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name.
  case class Person(age:Long,name:String)
  val path = "datas/people.json"
  val people: Dataset[Person] = sqlContext.read.json(path).as[Person]
  people.show()


}