spark sql学習入門

5835 ワード

SPark SQLのDataFrameの操作とRDDの変換
関連概念:
 
         SParkのコアはRDDであり、それは弾性分布データセットであり、一連の動作に対応している。Spark SQLはsparkにおけるデータ処理の一つのモジュールであり、抽象的なデータ操作方法を提供し、分散的な照会データセットをDataFrameといいます。また、sparkSQLは、既存のデータセットから、例えばhive種から直接データを読み込むこともできます。具体的には、ここを見に行ってもいいです。DataFrameは分散式で、列名を組織とするデータセットです。本質的にはデータベースの中の一つのデータテーブルにも相当します。これとは違って、彼自身の一連の最適化方法をみんなで使います。データ構造のファイル(json形式)、hiveの中のtable、外部データセット、既存のRDDはすべてDataFrameを生成できます。sparkは非常に便利なコードを提供しています。spark-shellで具体的なアプリケーションを見に来ます。
       まず、最も主要なタスクはもちろん、sparkContect(sc)を作成し、sparkSqlという二つの最も主要なメンバー変数、コードの例は以下の通りです。
<span style="font-size:18px;">val sc: SparkContext 
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._</span>
       
   隠士変換とは現存するRDDをDataFrameに変換するためのものです。もちろん、hiveContect変数を作成することもできます。これはsql Contectの父の類です。具体的にはhiveContectに関してはここでは多く言いません。
        
      Data Frameの具体的な操作を見てみます。ここではdataFrameの作成と操作方法について説明します。
val sc: SparkContext 
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// dataframe   , json    
val df = sqlContext.read.json("examples/src/main/resources/people.json")

// Show      DataFrame   
df.show()
// age  name
// null Michael
// 30   Andy
// 19   Justin

//           ,     ,            
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select          
df.select("name").show()
// name
// Michael
// Andy
// Justin

df.select(df("name"), df("age") + 1).show()
// name    (age + 1)
// Michael null
// Andy    31
// Justin  20

df.filter(df("age") > 21).show()
// age name
// 30  Andy
// age         age    
df.groupBy("age").count().show()
// age  count
// null 1
// 19   1
// 30   1
RDD作成DataFrame
      ここでは既存のRDDからDataFrameに変換する2つの方法がある。第一の方法は、まずcase classを作成し、対応するパラメータを反射のメカニズムで読み取り、対応するテーブルの列名にします。反射機構を利用して見て以来、特に簡潔で、既存のパターンを知っているときに、すぐに作成できます。二つ目の方法はcase classでパラメータを抽出するのではなく、直接文字列を定義してパラメータを抽出します。case classがない場合がありますから。プログラムインターフェースを実現し、その後、既存のRDDにappyをアップデートする。この方法はロバスト性があります。あなたがアプリケーションを実行している間、適応するパラメータタイプをまだ知らないとき、この方法はいい選択です。
 
反射メカニズム:
         簡単に言えば、反射機構はまずcase classを定義します。このクラスにはいくつかのパラメータが含まれています。その後、反射機構を利用してパラメータを抽出すれば、対応するパラメータがtableの列名として利用できます。しかしcase classのメリットは言うまでもなく、皆さんは知っています。tableを登録してから、相応の検索を行って修正を加えることができます。ハハ。具体的なコードは以下の通りです。
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)//   ,             

// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()// RDD   DataFrame
people.registerTempTable("people")//  table

// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index:
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

// or by field name:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
// Map("name" -> "Justin", "age" -> 19)// getValuesMap[Any]  
決定されたモデルプログラミングインターフェース:
        第一歩:ここでのモデルは、文字列表現のmodelからrows RDDを作成します。
        ステップ2:通過  StructTypeは、第1のステップに従って作成されたRDDに基づいて、schemaを作成する。
        ステップ3:createData Frame(rowRDD,schema)
<span style="font-size:18px;">// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create an RDD
val people = sc.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string
val schemaString = "name age"//   

// Import Row.
import org.apache.spark.sql.Row;

// Import Spark SQL data types
import org.apache.spark.sql.types.{StructType,StructField,StringType};

// Generate the schema based on the string of schema
val schema =//  StructField            schema
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// Convert records of the RDD (people) to Rows.//  Row    people     RDD  rowRDD
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))

// Apply the schema to the RDD.
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)//  table

// Register the DataFrames as a table.
peopleDataFrame.registerTempTable("people")//  table

// SQL statements can be run by using the sql methods provided by sqlContext.
val results = sqlContext.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index or by field name.
results.map(t => "Name: " + t(0)).collect().foreach(println)</span>
        上で紹介したのは初歩のdataframeの作成と簡単な使用です。個人的にはsparkの速度は確かに速いと思いますが、性能的にはまだmapReduceのように安定しています。時々データが無くなります。