RDD Data Frame Data Setの相互変換

7538 ワード

RDD、DataFrame、Datasetの3つには多くの共通性があり、それぞれに適用されるシーンがある場合、3つの間で変換する必要があることが多い
DataFrame/Dataset RDD:
この変換は簡単です
val rdd1=testDF.rdd
val rdd2=testDS.rdd

RDD転送データFrame:
import spark.implicits._
val testDF = rdd.map {line=>
      (line._1,line._2)
    }.toDF("col1","col2")

一般的には、1行のデータをメタグループで一緒に書き、toDFにフィールド名を指定します.
RDD転送データセット:
import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //        
val testDS = rdd.map {line=>
      Coltest(line._1,line._2)
    }.toDS

各行のタイプ(case class)を定義する場合、フィールド名とタイプが与えられており、case classに値を追加すればよいことに注意してください.
Dataset転送DataFrame:
これも簡単です.case classをRowにカプセル化するだけですから.
import spark.implicits._
val testDF = testDS.toDF

DataFrame転送Dataset:
import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //        
val testDS = testDF.as[Coltest]

この方法は、各カラムのタイプが与えられた後、asメソッドを使用してDatasetに移行することであり、これは、データ型がDataFrameであり、各フィールドの処理が必要である場合に極めて便利である.いくつかの特殊な操作を使う時、必ずimport spark.implicits._をプラスしてさもなくばtoDF、toDSは使うことができません
DataFrame小Demo
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
/**
  * sparksql          demo
  */
object SparkSqlDemo {

  //   sparkconf
  val conf: SparkConf = new SparkConf().setAppName("sqlDemo").setMaster("local[*]")

  //   sparkcontext
  val sc = new SparkContext(conf)

  //   sparksession      
  val sparkSession = new SparkSession(sc)

  //   sparksession      
  //  val ssc: SparkSession = SparkSession.builder().config(conf).getOrCreate()

  //   dataFrame 
  val dataFrame: DataFrame = sparkSession.read.json("path")

  //     DSL  ,      。
  // dataFrame.select("name").show()

  // Sql   
  dataFrame.createGlobalTempView("people")
  //    sql  
  sparkSession.sql("select * from people")
}