SparkSQLベースプログラミング


紹介します
Spark Coreでは、アプリケーションを実行するには、まずコンテキスト環境オブジェクトSpark Cotextを構築する必要があります。Spark SQLは、Spark Coreのパッケージとして理解できます。モデルだけでなく、コンテキスト環境オブジェクトもパッケージ化されています。
古いバージョンでは、SparkSQLは二つのSQLクエリの開始点を提供します。一つはSQLContextといい、Spark自身が提供するSQLクエリに使います。一つはHveContextといいます。Hiveのクエリを接続するために使います。
SparkSessionはSparkの最新のSQLクエリの開始点であり、実質的にSQLContextとHveContectの組み合わせであるため、SQLContextとHveContextで利用できるAPIはSparkSessionでも同様に使用できる。SparkSessionの内部にSparkContectが実装されていますので、計算は実際にSparkContectで行われています。私たちはspark-shellを使うと、sparkはsparkというSpark Sessionを自動的に作成します。以前のように、自動的にscを取得してSparkContectを表します。
二、Data Frame
2.1 dataframe紹介
Sparkでは、DataFrameはRDDをベースとした分散データセットであり、従来のデータベースにおける二次元テーブルと同様である。DataFrameとRDDの主な違いは、前者はschema元情報、すなわちDataFrameが表す二次元テーブルデータセットの各列に名前とタイプがついていることである。これによりSpark SQLはより多くの構造情報を洞察し、DataFrameの背後に隠されているデータソースとDataFrameの上に作用する変換を対象とした最適化を行い、最終的に運転時の効率を大幅に向上させる目標を達成した。逆にRDDを見ると、保存されているデータ要素の具体的な内部構造が分かりませんので、Spark Coreはステージレベルで簡単で通用するライン最適化しかできません。また、Hiveと同様に、DataFrameもネストデータタイプ(struct、array、map)に対応しています。APIの使いやすさの観点から、Data Frame APIが提供するのはトップレベルの関係操作であり、関数式のRDD APIよりも友好的で、敷居が低い。
2.2 dataframe使用
1、dataframeを作成する(1)SparkデータソースからAを作成し、Sparkがファイル作成に対応するデータソースフォーマットを確認する。
scala> spark.read.
csv      jdbc   load     options   parquet   table   textFile   
format   json   option   orc       schema    text               
B、user.jsonファイルを作成する
[root@hadoop151 data]# cat user.json 
{"username":"zhangsan","age":20}
C、jsonファイルを読み込みdataframeを作成する
scala> var df = spark.read.json("/opt/module/spark/data/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]         
+---+--------+                                                                  
|age|username|
+---+--------+
| 20|zhangsan|
+---+--------+
(2)RDDから変換する
(3)Hive Tableからクエリーを返します。
2、SQL構文SQL構文スタイルとは、データを検索する時にSQL文を使って照会することであり、このようなスタイルのクエリーは、一時的なビューまたはグローバルビューでサポートする必要があります。A、JSONファイルを読み込み、DataFrameを作成する
scala> val df = spark.read.json("/opt/module/spark/data/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, username: string] 
B、Data Frameに対して臨時表を作成する
scala> df.createOrReplaceTempView("people")
C、SQL文によるクエリーの全表
scala> val sqlDF = spark.sql("select * from people")
D、結果の展示
scala> sqlDF.show
+---+--------+                                                                  
|age|username|
+---+--------+
| 20|zhangsan|
+---+--------+
注意:一般的な臨時表はSessionの範囲内であり、適用範囲内で有効であれば、グローバル臨時表を使用することができます。グローバル臨時表を使う時はフルパスアクセスが必要です。temp.people
E、DataFrameに対してグローバルテーブルを作成する
df.createGlobalTempView("people")
F、SQL文によるクエリーの全表
spark.sql("SELECT * FROM global_temp.people").show()
三、DataSet
3.1 dataset紹介
DataSetは分散データセットです。DataSetはSpark 1.6に追加された新しい抽象であり、DataFrameの拡張である。これはRDDの利点(強いタイプ、強力なラボダ関数を使用する能力)とSpark SQL最適化実行エンジンの利点を提供します。DataSetは、機能的な変換(操作map、flatMap、filterなど)を使用することもできる。1、DataSetはDataFrame APIの拡張であり、SparkSQLの最新のデータ抽象である。2、ユーザーフレンドリーなAPIスタイルは、タイプのセキュリティチェックとDataFrameのクエリ最適化特性を持っています。3、サンプルクラスでデータを定義する構造情報に対して、サンプルクラスの各属性の名前を直接にDataSetのフィールド名にマッピングします。4、DataSetは強いタイプで、例えばDataSet、DataSet[Person]があります。5、DataFrameはDataSetの特例で、DataFrame=DataSet[Row]ですので、as方法でDataFrameをDataSetに変換できます。Rowはタイプで、Car、Personなどのタイプと同じで、すべてのテーブル構造情報はRowで表されています。データの取得には順序指定が必要です。
3.2 dataset使用
1、サンプルのシーケンスを使ってDataSetを作成する

scala> case class Person(name: String, age: Long)
defined class Person

scala> val caseClassDS = Seq(Person("zhangsan",2)).toDS()
caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

scala> caseClassDS.show
+--------+---+
|    name|age|
+--------+---+
|zhangsan|  2|
+--------+---+
2、基本タイプのシーケンスを使ってDataSetを作成する
scala>  val ds = Seq(1,2,3,4,5).toDS
ds: org.apache.spark.sql.Dataset[Int] = [value: int]

scala>  ds.show
+-----+
|value|
+-----+
|    1|
|    2|
|    3|
|    4|
|    5|
+-----+
四、RDD、DataFrame、DataSetの三者の関係
4.1紹介
SparkSQLでSparkはDataFrameとDataSetの2つの新しい抽象を提供してくれました。彼らはRDDと何の違いがありますか?まず、バージョンの生成から見ますと、Spark 1.0=>RDD Spark 1.3=>DataFrame Spark 1.6=>Datasetは、同じデータをこの3つのデータ構造に与えたら、それぞれ計算した後、同じ結果を示します。彼らの実行効率と実行方法は違っています。後期のSparkバージョンでは、DataSetはRDDとDataFrameの代わりに徐々に唯一のAPIインターフェースになるかもしれない。
4.2共通性
1、RDD、DataFrame、DataSetはすべてsparkプラットフォームの下の分散式弾性データセットであり、超大型データの処理に便利である。2、3つのいずれも惰性の機構があり、map方法のような作成、変換を行っている場合、直ちに実行しないで、アクションがforeachのようなものに出会った時だけ、3つの者は演算を遍歴することができます。3、三者はfilter、並べ替えなど多くの共通の関数があります。4、DataFrameとDatasetを操作するには多くの操作が必要です。このカバンは、import spark.implicits.u(SparkSessionオブジェクトを作成した後、できるだけ直接導入する)5、3つはSparkのメモリ状況によって自動的にキャッシュされます。データ量が多くても、メモリが溢れ出す心配がありません。6、三者ともパーティーの概念があります。7、DataFrameとDataSetは、モードマッチングを使用して各フィールドの値とタイプを取得することができます。
4.3違い
1、RDD(1)RDDは一般的にspark mlibと同時に使用されています(2)RDDはsparksql操作に対応していません。
2、DataFrame(1)はRDDとDatasetとは異なり、DataFrameの各行のタイプはRowに固定されており、各列の値は直接アクセスできず、解析によって各フィールドの値を取得することができる。(2)DataFrameとDataSetは一般的にspark mlibと同時に使用しない。(3)DataFrameとDataSetは、select、groupbyなどのSparkSQLの操作をサポートしています。臨時テーブル/ウィンドウを登録して、sql文操作を行うこともできます。(4)DataFrameとDataSetは、csvとして保存するなど、特別に便利な保存方法をサポートしています。表のヘッダを持つことができます。このように、各列のフィールド名は一目瞭然です。
3、DataSet(1)DatasetとDataFrameはまったく同じメンバー関数を持っています。違いは1行あたりのデータタイプが違います。DataFrameとは、DataSetの特殊なタイプのDataFrame=Dataset[Row]です。(2)DataFrameはDataset[Row]ともいい、各行のタイプはRowであり、解析しないで、どのフィールドが各行にあるかは、各フィールドにはどのタイプがあるかは分かりません。上記のgetAS方法またはモードマッチングで特定のフィールドを取り出すしかないです。Datasetでは、各行がどのタイプかは必ずしも決まっていません。case classをカスタマイズした後、各行の情報を自由に入手できます。
4.4相互変換
IDEA中開発のプログラムでRDDとDFまたはDSの間の相互操作が必要であれば、import spark.implicits.uを導入する必要があります。ここのsparkはScalaのパッケージ名ではなく、作成したsparkSessionオブジェクトの変数名ですので、まずSparkSessionオブジェクトを作成してから導入しなければなりません。ここのsparkオブジェクトはvar声明を使用できません。Scaraはval修飾の対象の導入だけをサポートしています。spark-shellには導入する必要がなく、自動的にこの操作を完了します。
1、rddとdataframe(1)rdd->dataframeは、一般的にサンプルクラスで変換されます。
scala> case class User(name:String, age:Int)
defined class User

scala> sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t => User(t._1,t._2)).toDF.show
+--------+---+                                                                  
|    name|age|
+--------+---+
|zhangsan| 30|
|    lisi| 40|
+--------+---+
(2)dataframe->rdd
scala> case class User(name:String, age:Int)
defined class User

scala> val rdd = df.rdd
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[12] at rdd at <console>:25

scala> val array = rdd.collect
array: Array[org.apache.spark.sql.Row] = Array([zhangsan,30], [lisi,40])        

scala> array(0)
res1: org.apache.spark.sql.Row = [zhangsan,30]

scala> array(0)(0)
res2: Any = zhangsan

scala> array(0).getAs[String]("name")
res3: String = zhangsan
2、rddとdataset(1)rdd->dataset
scala> case class User(name:String, age:Int)
defined class User

scala>  sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDS
res4: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
(2)dataset->rdd
scala> case class User(name:String, age:Int)
defined class User

scala> val ds = sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDS
ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]

scala> val rdd = ds.rdd
rdd: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[19] at rdd at <console>:25

scala> rdd.collect
res5: Array[User] = Array(User(zhangsan,30), User(lisi,49))
3、dataframeとdataset(1)dataframe->dataset
scala> case class User(name:String, age:Int)
defined class User

scala> val df = sc.makeRDD(List(("zhangsan",30), ("lisi",49))).toDF("name","age")
df: org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> val ds = df.as[User]
ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
(2)dataset->dataframe
scala> val ds = df.as[User]
ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]

scala> val df = ds.toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: int]