SparkSQL操作RDD回転データFrame

14669 ワード

SparkSQL操作RDDの2つの方式の比較
1.反射を用いてSchemaタイプを推定し、具体的な解析は以下の公式サイトを参照して説明する.
The Scala interface for Spark SQL supports automatically converting an RDD containing case classes to a DataFrame. The case class defines the schema of the table. The names of the arguments to the case class are read using reflection and become the names of the columns. Case classes can also be nested or contain complex types such as Sequences or Arrays. This RDD can be implicitly converted to a DataFrame and then be registered as a table. Tables can be used in subsequent SQL statements.
ps: import sqlContext.implicits._ case class Person(name: String, age: Int)
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)

// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index:
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

// or by field name:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
// Map("name" -> "Justin", "age" -> 19)

2.プログラミング方式を用いてSchemaタイプを作成し、具体的な解析は以下の公式サイトの説明を参照する.
When case classes cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users), a DataFrame can be created programmatically with three steps.
  • Create an RDD of Rows from the original RDD;
  • Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
  • Apply the schema to the RDD of Rows via createDataFrame method provided by SQLContext.

  • ps: val schemaString = “name age” val schema = StructType( schemaString.split(” “).map(fieldName => StructField(fieldName, StringType, true)))
    // sc is an existing SparkContext.
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    
    // Create an RDD
    val people = sc.textFile("examples/src/main/resources/people.txt")
    
    // The schema is encoded in a string
    val schemaString = "name age"
    
    // Import Row.
    import org.apache.spark.sql.Row;
    
    // Import Spark SQL data types
    import org.apache.spark.sql.types.{StructType,StructField,StringType};
    
    // Generate the schema based on the string of schema
    val schema =
      StructType(
        schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
    
    // Convert records of the RDD (people) to Rows.
    val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
    
    // Apply the schema to the RDD.
    val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
    
    // Register the DataFrames as a table.
    peopleDataFrame.registerTempTable("people")
    
    // SQL statements can be run by using the sql methods provided by sqlContext.
    val results = sqlContext.sql("SELECT name FROM people")
    
    // The results of SQL queries are DataFrames and support all the normal RDD operations.
    // The columns of a row in the result can be accessed by field index or by field name.
    results.map(t => "Name: " + t(0)).collect().foreach(println)
    

    比較まとめ
    1.case classテンプレートクラスモードの可視化が良い2.case classテンプレートクラスパラメータの上限は22個で、フィールドが多い場合は3.プログラミング方式は日常開発に適している
    コード実装
    データソース
    Michael,20
    Andy,30
    Justin,19
    

    1.反射
    package Sql
    
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * Created by Administrator on 2016/12/6.
      *  textFlie -> table
        Michael,20
        Andy,30
        Justin,19
            import sqlContext.implicits._,     .toDF
          People  
        testFile->map(p=>People(p(1),p(2))) ->DataFrame->Table
            Sql.Context.sql("Sql  ")
      */
    object Sqltest3 {
      case class User(name:String,age:Int)
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf() //  SparkConf  
        //         ,                
        conf.setAppName("SparkSql App!")
        //  ,       ,     Spark  
        conf.setMaster("local[2]")
        //  SparkContext  ,    SparkConf     Spark            
        val ssc = new SparkContext(conf)
        ssc.setLogLevel("ERROR")
        //  SQL  
        val sqlContext = new SQLContext(ssc)
    
        //    
        val dr =  ssc.textFile("d://sparktest/people2.txt")
        import sqlContext.implicits._
        val df = dr.map(_.split(","))
                   .map(p => User(p(0), p(1).trim.toInt)).toDF()
        //      
        df.registerTempTable("user")
        //  , df      
        df.printSchema()
        //      
        df.select("name").show()
        df.filter(df("age") > 21).show()
        df.groupBy("age").count().show()
        val teenagers = sqlContext.sql("SELECT name, age FROM user WHERE age >= 13 AND age <= 19")
        teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
    
        println("-------")
        ssc.stop()
      }
    }
    

    プログラミングモード
    package Sql
    
    import org.apache.spark.sql.{Row, SQLContext}
    import org.apache.spark.sql.types.{StringType, StructField, StructType}
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * Created by Administrator on 2016/12/6.
      *  textFlie -> table
        Michael,20
        Andy,30
        Justin,19
              
        testFile->map(p=>Row(p(1),p(2))) ->    ->DataFrame->Table
            Sql.Context.sql("Sql  ")
      */
    object Sqltest2 {
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf() //  SparkConf  
        conf.set("spark.executor.memory", "1g");
        //         ,                
        conf.setAppName("Wow,TopNGroup App!")
        //  ,       ,     Spark  
        conf.setMaster("local[2]")
        //  SparkContext  ,    SparkConf     Spark            
    
        val ssc = new SparkContext(conf)
        ssc.setLogLevel("ERROR")
        //  SQL  
        val sqlContext = new SQLContext(ssc)
        //    
        var dr =  ssc.textFile("d://sparktest/people2.txt")
        val schemaString = "name age"
        /*  val schema = StructType(Array(
          StructField("name", StringType, true),
          StructField("age", StringType, true)
        ))*/
        val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
        val rowRDD = dr.map(_.split(",")).map(p => Row(p(0), p(1)))
        val df = sqlContext.createDataFrame(rowRDD, schema)
    
        //      
        df.registerTempTable("user")
        //  , df      
        df.printSchema()
        //      
        df.select("name").show()
        df.filter(df("age") > 21).show()
        df.groupBy("age").count().show()
        val teenagers = sqlContext.sql("SELECT name, age FROM user WHERE age >= 13 AND age <= 19")
        teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
        println("-------")
        ssc.stop()
      }
    }