Mysql学習(三)Spark(Scala)Mysqlを書き込む二つの方法

3591 ワード

package total
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import java.util.Properties

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}

object CountProvince {
  def main(args: Array[String]): Unit = {

    /**
      *      Mysql  
      */


    /**
      *          
       */
    if (args.length < 2){
      println(
        """
          |total.CountProvince
          |        
          |        
        """.stripMargin
      )
      System.exit(0)
    }
    /**
      *     
      */
    val Array(inputFile,outputFile) = args

    val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}")
    //    val sc = new SparkContext(conf)
//    val input = ("F:\\Scala\\Spark\\    -dmp  \\  \\out1")
    /**
      *   SparkSession
      */
    val session: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    /**
      *        
      */
    val df: DataFrame = session.sqlContext.read.parquet(inputFile)

    /**
      *        
      */
    df.createOrReplaceTempView("countProvince")
    /**
      *     count
      */
    val  sql =
      """
          select
              count(*),provincename, cityname
          from
              countProvince
          group by
              provincename, cityname
          order by
              provincename"""
    val dff = session.sqlContext.sql(sql)
    val url = "jdbc:mysql://192.168.123.102:3306/test"
    val properties = new Properties()
    properties.put("user","root")
    properties.put("password","root")
    dff.write.jdbc(url,"count",properties)
    


    /**
      *      Mysql  
      */
//    val conf = new SparkConf().setMaster("local[2]").setAppName("CountProvince")
////    val sc = new SparkContext(conf)
//    val Spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
//    val sc = Spark.sparkContext
//    /**
//      *     
//      */
//    val inputPath =  sc.textFile("F:\\Scala\\Spark\\    -dmp  \\  \\data.txt")
//    inputPath.foreach(println(_))
//    /**
//      *   count
//      */
//    val countRDD = inputPath
//      .map(line => {
//        val fields = line.split(",")
//        (fields(24) + ":" + fields(25), 1)
//      }).reduceByKey(_ + _)
//
//    /**
//      *  Row   
//      */
//    val RowRdd: RDD[Row] = countRDD.map(tuple => {
//      val diming = tuple._1
//      val count = tuple._2
//      Row(diming, count)
//    })
//    /**
//      *   schema
//      */
//    val schema: StructType = StructType(
//      StructField("diming", StringType, true) ::
//        StructField("count", LongType, true) :: Nil
//    )
//    /**
//      *   Row  schema      DataFrame
//      */
//    val df: DataFrame = Spark.createDataFrame(RowRdd,schema)
//
//    /**
//      *        
//      */
//    df.createOrReplaceTempView("countProvince")
//
//    /**
//      *           
//      */
//    val url = "jdbc:mysql://192.168.123.102:3306/test"
//    val properties = new Properties()
//    properties.put("user","root")
//    properties.put("password","root")
//    df.write.jdbc(url,"count2",properties)

  }
}