Spark Data FrameデータソースとしてMySQLを使用
4043 ワード
この文書では、MySQLからデータを整理するDataFrameについて簡単に説明します.使用言語はspark自身がサポートするscalaです
一、環境準備まず、環境の構成を含むsparkを正しくインストールしていることを確認します. testDFというデータベースを構築し、次のデータid name age 1 chen 212 liang 22 を含むテーブルuserを作成する.
二、MySQLテーブルからDataFrameを作成する
1.sparkローカル単一プロセスモードを実行する:
shellの情報から、SparkContextとSqlContextが用意されていることがわかります.
このときすでにscalaの実行環境に入っており、scala文を直接入力して実行することができます.
2.MySQLテーブルのデータを含むDataFrameを作成します.
shellに作成orgが表示する.apache.spark.sql.DataFrameが成功し、DataFrameのデータ構造が出力されます.
jdbcDF: org.apache.spark.sql.DataFrame = [id: int, name: string, age: int]
(作成に失敗して例外があった場合は、mysql-connector.jarが見つからなかったため、例外が発生した原因を確認してください)
3.運転
shellに印刷されているのが見えます(もちろん、色はありません)
id name age
1 chen 21
2 liang 22
このように、DataFrameにはtestDFが既に含む.userテーブル!
うんてん
より詳細な構造が表示されます
root
|-- id: integer (nullable = false)
|-- name: string (nullable = false)
|-- age: integer (nullable = false)
三、DataFrameの操作 filter
DataFrame中のデータをフィルタリング、戻り値はフィルタリング後の行からなるDataFrameである.
出力結果は
1 chen 21
この記録は
2.select
指定列をDataFrameから選択する、戻り値は指定列からなるDataFrameである.
name age
chen 21
liang 22
selectはこのように遊ぶこともできます.
name (age + 1)
chen 22
liang 23
3.join
接続操作、この関数には3つの署名があり、それぞれ
name address
chen ShenZhen
liang ZhanJiang
ステップ2のように、tableDF 2という名前のテーブルaddressのデータを記述するDataFrameを作成する.
次に実行
出力が
id name age name address
2 liang 22 liang ZhanJiang
1 chen 21 chen ShenZhen
4.group
グループ化
出力が
name count
liang 1
chen 1
count: Unit = ()
5.sort
6.take
先頭n行のデータを取り出し、戻りタイプはArray[ROW]
7.foreach
DataFrame行ごとに同じ操作を行います
待って...
先発From:http://blog.chenzuhuang.com/archive/54.html
一、環境準備
二、MySQLテーブルからDataFrameを作成する
1.sparkローカル単一プロセスモードを実行する:
spark-shell --master local
shellの情報から、SparkContextとSqlContextが用意されていることがわかります.
このときすでにscalaの実行環境に入っており、scala文を直接入力して実行することができます.
2.MySQLテーブルのデータを含むDataFrameを作成します.
val tableDF = sqlContext.jdbc("jdbc:mysql://mysql_hostname:mysql_port/testDF?user=your_username&password=your_password", "user")
shellに作成orgが表示する.apache.spark.sql.DataFrameが成功し、DataFrameのデータ構造が出力されます.
jdbcDF: org.apache.spark.sql.DataFrame = [id: int, name: string, age: int]
(作成に失敗して例外があった場合は、mysql-connector.jarが見つからなかったため、例外が発生した原因を確認してください)
3.運転
tableDF.show()
shellに印刷されているのが見えます(もちろん、色はありません)
id name age
1 chen 21
2 liang 22
このように、DataFrameにはtestDFが既に含む.userテーブル!
うんてん
tableDF.printSchema()
より詳細な構造が表示されます
root
|-- id: integer (nullable = false)
|-- name: string (nullable = false)
|-- age: integer (nullable = false)
三、DataFrameの操作
DataFrame中のデータをフィルタリング、戻り値はフィルタリング後の行からなるDataFrameである.
val frame1 = tableFrame.filter(tableFrame("age") === 21) // , "==="
frame1.show
出力結果は
1 chen 21
この記録は
2.select
指定列をDataFrameから選択する、戻り値は指定列からなるDataFrameである.
val frame2 = tableFrame.select("name", "age")
frame2.show
name age
chen 21
liang 22
selectはこのように遊ぶこともできます.
tableFrame.select(tableFrame("name"), tableFrame("age") + 1).show
name (age + 1)
chen 22
liang 23
3.join
接続操作、この関数には3つの署名があり、それぞれ
// ( )。
def join(right : org.apache.spark.sql.DataFrame) : org.apache.spark.sql.DataFrame = { /* compiled code */ }
// 。
def join(right : org.apache.spark.sql.DataFrame, joinExprs : org.apache.spark.sql.Column) : org.apache.spark.sql.DataFrame = { /* compiled code */ }
//
def join(right : org.apache.spark.sql.DataFrame, joinExprs : org.apache.spark.sql.Column, joinType : scala.Predef.String) : org.apache.spark.sql.DataFrame = { /* compiled code */ }
join, address,
name address
chen ShenZhen
liang ZhanJiang
ステップ2のように、tableDF 2という名前のテーブルaddressのデータを記述するDataFrameを作成する.
次に実行
tableDF.join(tableDF2, tableDF("name") === tableDF2("name")).show
出力が
id name age name address
2 liang 22 liang ZhanJiang
1 chen 21 chen ShenZhen
4.group
グループ化
tableDF.groupBy("name").count().show
出力が
name count
liang 1
chen 1
count: Unit = ()
5.sort
tableDF.sort("age").show
6.take
先頭n行のデータを取り出し、戻りタイプはArray[ROW]
tableDF.take(2)
7.foreach
DataFrame行ごとに同じ操作を行います
tableDF.foreach(row => (println(row.getString(1)))) // name. ( 0 )
待って...
先発From:http://blog.chenzuhuang.com/archive/54.html