Spark_Scala-累積-RDD-DataSet-DataFrame-相互変換


Spark_Scala-累積-RDD-DataSet-DataFrame-相互変換
コードの例


import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

case class User(id:Int,name:String,age:Int)

object SparkSQL03_Transform {
  def main(args: Array[String]): Unit = {

    //      
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL03_Transform")

    //    
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    //       
    import spark.implicits._

    //  RDD
    val rdd: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(List((1,"zhansgan",30),(2,"lisi",40),(3,"wangwu",79)))

    // to DF
    val df: DataFrame = rdd.toDF("id","name","age")

    // to DS
    val ds: Dataset[User] = df.as[User]

    // to DF
    val df1: DataFrame = ds.toDF()

    //   rdd
    val rdd1: RDD[Row] = df1.rdd

    rdd1.foreach(row=>{
      //               
      println(row.getString(1))
    })

    //    
    spark.stop()
  }

}

/*
00/00/23 00:12:07 INFO CodeGenerator: Code generated in 11.185477 ms
zhansgan
lisi
wangwu
 */


rdd直接datasetに変換
package com.atguigu.bigdata.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

/**
 * rdd     dataset
 */


/**
 *    
 * @param id
 * @param name
 * @param age
 */

case class User1(id:Int,name:String,age:Int)

object SparkSQL03_Transform1 {
  def main(args: Array[String]): Unit = {

    //      
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL03_Transform")

    //    
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    //       
    import spark.implicits._

    //  RDD
    val rdd: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(List((1,"zhansgan",30),(2,"lisi",40),(3,"wangwu",79)))

    //RDD   ->DataSet
    val userRDD: RDD[User1] = rdd.map {
      case (id, name, age) => {
        User1(id, name, age)
      }
    }
    val userDS: Dataset[User1] = userRDD.toDS()

    //    rdd    
    val rdd1: RDD[User1] = userDS.rdd
    rdd1.foreach(println) //rdd       


    //    
    spark.stop()
  }

}

/*
00/00/23 00:00:00 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
User1(3,wangwu,79)
User1(1,zhansgan,30)
User1(2,lisi,40)
 */




             ,
            ,
            ,
       ,
       ,
             ,
            ,
        ,
        ,
         ,
         ,
         ,
            ,
          ,
           ,
     ,
       ,
         ,
         ,
         ,
         ,
         ,
         ,
       ,
          ,
         ,
         ,
            ,
          ,
           ,
         ,
         ,
            ,
         ,
         ,
         ,
    ,
    ,
        ……
  
  
     
     ,   
     
    ,