spark sql学習入門
5835 ワード
SPark SQLのDataFrameの操作とRDDの変換
関連概念:
SParkのコアはRDDであり、それは弾性分布データセットであり、一連の動作に対応している。Spark SQLはsparkにおけるデータ処理の一つのモジュールであり、抽象的なデータ操作方法を提供し、分散的な照会データセットをDataFrameといいます。また、sparkSQLは、既存のデータセットから、例えばhive種から直接データを読み込むこともできます。具体的には、ここを見に行ってもいいです。DataFrameは分散式で、列名を組織とするデータセットです。本質的にはデータベースの中の一つのデータテーブルにも相当します。これとは違って、彼自身の一連の最適化方法をみんなで使います。データ構造のファイル(json形式)、hiveの中のtable、外部データセット、既存のRDDはすべてDataFrameを生成できます。sparkは非常に便利なコードを提供しています。spark-shellで具体的なアプリケーションを見に来ます。
まず、最も主要なタスクはもちろん、sparkContect(sc)を作成し、sparkSqlという二つの最も主要なメンバー変数、コードの例は以下の通りです。
隠士変換とは現存するRDDをDataFrameに変換するためのものです。もちろん、hiveContect変数を作成することもできます。これはsql Contectの父の類です。具体的にはhiveContectに関してはここでは多く言いません。
Data Frameの具体的な操作を見てみます。ここではdataFrameの作成と操作方法について説明します。
ここでは既存のRDDからDataFrameに変換する2つの方法がある。第一の方法は、まずcase classを作成し、対応するパラメータを反射のメカニズムで読み取り、対応するテーブルの列名にします。反射機構を利用して見て以来、特に簡潔で、既存のパターンを知っているときに、すぐに作成できます。二つ目の方法はcase classでパラメータを抽出するのではなく、直接文字列を定義してパラメータを抽出します。case classがない場合がありますから。プログラムインターフェースを実現し、その後、既存のRDDにappyをアップデートする。この方法はロバスト性があります。あなたがアプリケーションを実行している間、適応するパラメータタイプをまだ知らないとき、この方法はいい選択です。
反射メカニズム:
簡単に言えば、反射機構はまずcase classを定義します。このクラスにはいくつかのパラメータが含まれています。その後、反射機構を利用してパラメータを抽出すれば、対応するパラメータがtableの列名として利用できます。しかしcase classのメリットは言うまでもなく、皆さんは知っています。tableを登録してから、相応の検索を行って修正を加えることができます。ハハ。具体的なコードは以下の通りです。
第一歩:ここでのモデルは、文字列表現のmodelからrows RDDを作成します。
ステップ2:通過
ステップ3:createData Frame(rowRDD,schema)
関連概念:
SParkのコアはRDDであり、それは弾性分布データセットであり、一連の動作に対応している。Spark SQLはsparkにおけるデータ処理の一つのモジュールであり、抽象的なデータ操作方法を提供し、分散的な照会データセットをDataFrameといいます。また、sparkSQLは、既存のデータセットから、例えばhive種から直接データを読み込むこともできます。具体的には、ここを見に行ってもいいです。DataFrameは分散式で、列名を組織とするデータセットです。本質的にはデータベースの中の一つのデータテーブルにも相当します。これとは違って、彼自身の一連の最適化方法をみんなで使います。データ構造のファイル(json形式)、hiveの中のtable、外部データセット、既存のRDDはすべてDataFrameを生成できます。sparkは非常に便利なコードを提供しています。spark-shellで具体的なアプリケーションを見に来ます。
まず、最も主要なタスクはもちろん、sparkContect(sc)を作成し、sparkSqlという二つの最も主要なメンバー変数、コードの例は以下の通りです。
<span style="font-size:18px;">val sc: SparkContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._</span>
隠士変換とは現存するRDDをDataFrameに変換するためのものです。もちろん、hiveContect変数を作成することもできます。これはsql Contectの父の類です。具体的にはhiveContectに関してはここでは多く言いません。
Data Frameの具体的な操作を見てみます。ここではdataFrameの作成と操作方法について説明します。
val sc: SparkContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// dataframe , json
val df = sqlContext.read.json("examples/src/main/resources/people.json")
// Show DataFrame
df.show()
// age name
// null Michael
// 30 Andy
// 19 Justin
// , ,
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Select
df.select("name").show()
// name
// Michael
// Andy
// Justin
df.select(df("name"), df("age") + 1).show()
// name (age + 1)
// Michael null
// Andy 31
// Justin 20
df.filter(df("age") > 21).show()
// age name
// 30 Andy
// age age
df.groupBy("age").count().show()
// age count
// null 1
// 19 1
// 30 1
RDD作成DataFrameここでは既存のRDDからDataFrameに変換する2つの方法がある。第一の方法は、まずcase classを作成し、対応するパラメータを反射のメカニズムで読み取り、対応するテーブルの列名にします。反射機構を利用して見て以来、特に簡潔で、既存のパターンを知っているときに、すぐに作成できます。二つ目の方法はcase classでパラメータを抽出するのではなく、直接文字列を定義してパラメータを抽出します。case classがない場合がありますから。プログラムインターフェースを実現し、その後、既存のRDDにappyをアップデートする。この方法はロバスト性があります。あなたがアプリケーションを実行している間、適応するパラメータタイプをまだ知らないとき、この方法はいい選択です。
反射メカニズム:
簡単に言えば、反射機構はまずcase classを定義します。このクラスにはいくつかのパラメータが含まれています。その後、反射機構を利用してパラメータを抽出すれば、対応するパラメータがtableの列名として利用できます。しかしcase classのメリットは言うまでもなく、皆さんは知っています。tableを登録してから、相応の検索を行って修正を加えることができます。ハハ。具体的なコードは以下の通りです。
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)// ,
// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()// RDD DataFrame
people.registerTempTable("people")// table
// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index:
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
// or by field name:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)
// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
// Map("name" -> "Justin", "age" -> 19)// getValuesMap[Any]
決定されたモデルプログラミングインターフェース:第一歩:ここでのモデルは、文字列表現のmodelからrows RDDを作成します。
ステップ2:通過
StructType
は、第1のステップに従って作成されたRDDに基づいて、schemaを作成する。ステップ3:createData Frame(rowRDD,schema)
<span style="font-size:18px;">// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Create an RDD
val people = sc.textFile("examples/src/main/resources/people.txt")
// The schema is encoded in a string
val schemaString = "name age"//
// Import Row.
import org.apache.spark.sql.Row;
// Import Spark SQL data types
import org.apache.spark.sql.types.{StructType,StructField,StringType};
// Generate the schema based on the string of schema
val schema =// StructField schema
StructType(
schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
// Convert records of the RDD (people) to Rows.// Row people RDD rowRDD
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
// Apply the schema to the RDD.
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)// table
// Register the DataFrames as a table.
peopleDataFrame.registerTempTable("people")// table
// SQL statements can be run by using the sql methods provided by sqlContext.
val results = sqlContext.sql("SELECT name FROM people")
// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index or by field name.
results.map(t => "Name: " + t(0)).collect().foreach(println)</span>
上で紹介したのは初歩のdataframeの作成と簡単な使用です。個人的にはsparkの速度は確かに速いと思いますが、性能的にはまだmapReduceのように安定しています。時々データが無くなります。