Spark-RDD、DS、DF相互変換

29713 ワード

一、RDDとDataFrame間の変換
テストデータを準備し、ローカルセットをRDDに変換
scala> val rdd=sc.makeRDD(List("Mina,19","Andy,30","Michael,29"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at makeRDD at <console>:24

なお、RDDはimport spark.implicits._以降のみtoDF、toDS機能を有する
scala> import spark.implicits._
import spark.implicits._

1.RDDをDataFrameに変換
{1}toDF,左のrddの汎用型はTupleでなければならない
scala> rdd.map{
     x=>{
     val par=x.split(",");(par(0),par(1).toInt)}}.toDF("name","age")
res3: org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> res3.show
+-------+---+
|   name|age|
+-------+---+
|   Mina| 19|
|   Andy| 30|
|Michael| 29|
+-------+---+

{2}toDF,左のrddの汎用型はcaseクラスのrdd
SparkSQLは、汎用型がcaseクラスのRDDを自動的にDFに変換することができ、caseクラスはテーブルのschemaを定義し、caseクラス属性は反射によってテーブルのカラム名になる.Caseクラスは、SeqsやArrayのような複雑な構造を含むことができる.
scala> case class Person(name:String,age:Int)
defined class Person

scala> val df = rdd.map{
     x => val par = x.split(",");Person(par(0),par(1).toInt)}.toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> df.show
+-------+---+
|   name|age|
+-------+---+
|   Mina| 19|
|   Andy| 30|
|Michael| 29|
+-------+---+

{3}左rddがTupleまたはcaseクラスに戻れない場合は、StructTypeをカスタマイズできます.
caseクラスが事前に定義できない場合は、次の3つのステップでDataFrameを定義できます.
  • は、複数行構造のRDDを作成する.
  • StructTypeで表される行構造情報を作成する.
  • SparkSessionが提供するcreateDataFrame法によりSchema
  • を適用する
    scala> import org.apache.spark.sql._
    import org.apache.spark.sql._
    
    scala> import org.apache.spark.sql.types._
    import org.apache.spark.sql.types._
    
    //               ,              ,  StructType
    scala> val schemaString = "name age"
    schemaString: String = name age
    
    scala> val field = schemaString.split(" ").map(filename=> filename match{
          case "name"=> StructField(filename,StringType,nullable = true); case "age"=>StructField(filename, IntegerType,nullable = true)})
    field: Array[org.apache.spark.sql.types.StructField] = Array(StructField(name,StringType,true), StructField(age,IntegerType,true))
    
    scala> val schema = StructType(field)
    schema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,IntegerType,true))
    
    scala> val rowRDD = rdd.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).toInt))
    rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[2] at map at <console>:35
    
    scala> val peopleDF = spark.createDataFrame(rowRDD, schema)
    peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: int]
    
    scala> peopleDF.show
    +-------+---+
    |   name|age|
    +-------+---+
    |   Mina| 19|
    |   Andy| 30|
    |Michael| 29|
    +-------+---+
    

    二、RDDとDataSet間の変換
    1.RDDをDataSetに変換
    case class Person(name:String,age:Int)  
    import spark.implicits._
    
    scala> val ds = rdd.map{
         x => val par = x.split(",");Person(par(0),par(1).trim().toInt)}.toDS
    ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: int]
    
    scala> ds.show
    +-------+---+
    |   name|age|
    +-------+---+
    |   Mina| 19|
    |   Andy| 30|
    |Michael| 29|
    +-------+---+
    

    三、DataFrameとDataSet間の変換
    1.DataSetをDataFrameに変換
    scala> ds.toDF
    res14: org.apache.spark.sql.DataFrame = [name: string, age: int]
    
    scala> ds.toDF.show
    +-------+---+
    |   name|age|
    +-------+---+
    |   Mina| 19|
    |   Andy| 30|
    |Michael| 29|
    +-------+---+
    

    2.DataFrameをDataSetに変換
    //	  case class
    case class Person(name:String,age:Int) 
    //	   dataframe            case class     
    df.as[Person] 
    
    scala> df.as[Person]
    res16: org.apache.spark.sql.Dataset[Person] = [name: string, age: int]
    scala> df.as[Person].show
    +-------+---+
    |   name|age|
    +-------+---+
    |   Mina| 19|
    |   Andy| 30|
    |Michael| 29|
    +-------+---+