Spark Data FrameデータソースとしてMySQLを使用


この文書では、MySQLからデータを整理するDataFrameについて簡単に説明します.使用言語はspark自身がサポートするscalaです
一、環境準備
  • まず、環境の構成を含むsparkを正しくインストールしていることを確認します.
  • testDFというデータベースを構築し、次のデータid name age 1 chen 212 liang 22
  • を含むテーブルuserを作成する.
    二、MySQLテーブルからDataFrameを作成する
    1.sparkローカル単一プロセスモードを実行する:
    spark-shell --master local

    shellの情報から、SparkContextとSqlContextが用意されていることがわかります.
    このときすでにscalaの実行環境に入っており、scala文を直接入力して実行することができます.
    2.MySQLテーブルのデータを含むDataFrameを作成します.
    val tableDF = sqlContext.jdbc("jdbc:mysql://mysql_hostname:mysql_port/testDF?user=your_username&password=your_password", "user")

    shellに作成orgが表示する.apache.spark.sql.DataFrameが成功し、DataFrameのデータ構造が出力されます.
    jdbcDF: org.apache.spark.sql.DataFrame = [id: int, name: string, age: int]
    (作成に失敗して例外があった場合は、mysql-connector.jarが見つからなかったため、例外が発生した原因を確認してください)

    3.運転
    tableDF.show()

    shellに印刷されているのが見えます(もちろん、色はありません)
    id    name    age
    1    chen    21
    2    liang    22
    このように、DataFrameにはtestDFが既に含む.userテーブル!
    うんてん
    tableDF.printSchema()

    より詳細な構造が表示されます
    root
     |-- id: integer (nullable = false)
     |-- name: string (nullable = false)
     |-- age: integer (nullable = false)

    三、DataFrameの操作
  • filter

  • DataFrame中のデータをフィルタリング、戻り値はフィルタリング後の行からなるDataFrameである.
    val frame1 = tableFrame.filter(tableFrame("age") === 21)    //     ,   "==="
    frame1.show

    出力結果は
      1   chen   21
    この記録は

    2.select
    指定列をDataFrameから選択する、戻り値は指定列からなるDataFrameである.
    val frame2 = tableFrame.select("name", "age")
    frame2.show

    name    age
    chen    21
    liang    22
    selectはこのように遊ぶこともできます.
    tableFrame.select(tableFrame("name"), tableFrame("age") + 1).show

    name     (age + 1)
    chen     22       
    liang    23    

    3.join
    接続操作、この関数には3つの署名があり、それぞれ
    //    (    )。        
    def join(right : org.apache.spark.sql.DataFrame) : org.apache.spark.sql.DataFrame = { /* compiled code */ }
     
    //   。          
    def join(right : org.apache.spark.sql.DataFrame, joinExprs : org.apache.spark.sql.Column) : org.apache.spark.sql.DataFrame = { /* compiled code */ }
     
    //              
    def join(right : org.apache.spark.sql.DataFrame, joinExprs : org.apache.spark.sql.Column, joinType : scala.Predef.String) : org.apache.spark.sql.DataFrame = { /* compiled code */ }
          join,       address,       

    name    address
    chen    ShenZhen
    liang    ZhanJiang
    ステップ2のように、tableDF 2という名前のテーブルaddressのデータを記述するDataFrameを作成する.
    次に実行
    tableDF.join(tableDF2, tableDF("name") === tableDF2("name")).show

    出力が
    id name  age name  address  
    2  liang 22  liang ZhanJiang
    1  chen  21  chen  ShenZhen 

    4.group
    グループ化
    tableDF.groupBy("name").count().show

    出力が
    name  count
    liang 1       
    chen  1    
    count: Unit = ()
    5.sort
    tableDF.sort("age").show


    6.take
    先頭n行のデータを取り出し、戻りタイプはArray[ROW]
    tableDF.take(2)

    7.foreach
    DataFrame行ごとに同じ操作を行います
    tableDF.foreach(row => (println(row.getString(1))))    //           name. (    0  )

    待って...
    先発From:http://blog.chenzuhuang.com/archive/54.html