Mysql学習(三)Spark(Scala)Mysqlを書き込む二つの方法
3591 ワード
package total
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import java.util.Properties
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
object CountProvince {
def main(args: Array[String]): Unit = {
/**
* Mysql
*/
/**
*
*/
if (args.length < 2){
println(
"""
|total.CountProvince
|
|
""".stripMargin
)
System.exit(0)
}
/**
*
*/
val Array(inputFile,outputFile) = args
val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}")
// val sc = new SparkContext(conf)
// val input = ("F:\\Scala\\Spark\\ -dmp \\ \\out1")
/**
* SparkSession
*/
val session: SparkSession = SparkSession.builder().config(conf).getOrCreate()
/**
*
*/
val df: DataFrame = session.sqlContext.read.parquet(inputFile)
/**
*
*/
df.createOrReplaceTempView("countProvince")
/**
* count
*/
val sql =
"""
select
count(*),provincename, cityname
from
countProvince
group by
provincename, cityname
order by
provincename"""
val dff = session.sqlContext.sql(sql)
val url = "jdbc:mysql://192.168.123.102:3306/test"
val properties = new Properties()
properties.put("user","root")
properties.put("password","root")
dff.write.jdbc(url,"count",properties)
/**
* Mysql
*/
// val conf = new SparkConf().setMaster("local[2]").setAppName("CountProvince")
//// val sc = new SparkContext(conf)
// val Spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
// val sc = Spark.sparkContext
// /**
// *
// */
// val inputPath = sc.textFile("F:\\Scala\\Spark\\ -dmp \\ \\data.txt")
// inputPath.foreach(println(_))
// /**
// * count
// */
// val countRDD = inputPath
// .map(line => {
// val fields = line.split(",")
// (fields(24) + ":" + fields(25), 1)
// }).reduceByKey(_ + _)
//
// /**
// * Row
// */
// val RowRdd: RDD[Row] = countRDD.map(tuple => {
// val diming = tuple._1
// val count = tuple._2
// Row(diming, count)
// })
// /**
// * schema
// */
// val schema: StructType = StructType(
// StructField("diming", StringType, true) ::
// StructField("count", LongType, true) :: Nil
// )
// /**
// * Row schema DataFrame
// */
// val df: DataFrame = Spark.createDataFrame(RowRdd,schema)
//
// /**
// *
// */
// df.createOrReplaceTempView("countProvince")
//
// /**
// *
// */
// val url = "jdbc:mysql://192.168.123.102:3306/test"
// val properties = new Properties()
// properties.put("user","root")
// properties.put("password","root")
// df.write.jdbc(url,"count2",properties)
}
}