SparkSQL update操作を実行mysqlデータを修正する
1439 ワード
これはソースコードを変更する必要がない方法です.
中国語を書くと文字化けしが発生します.の場合、ご存じの方、教えてください
//user
case class User1(id: Long, name: String, password: String, imgUrl: String, update_date: String)
object SparkSQLUpdateMySQLOfJDBC {
def main(args: Array[String]): Unit = {
//SparkSession
val spark: SparkSession = SparkSession.builder()
.appName("SparkSqlToMysql")
.master("local")
.getOrCreate()
// json/csv
val df = spark.read.json("data/user.json")
df.show()
val data: Array[Row] = df.collect()
//
val connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/ssm?characterEcoding=UTF-8",
"root", "000000")
// SQL
val statement = connection.prepareStatement("update user set name=?, password=?, imgUrl=?, update_date=? where id=?")
//
val now: String = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now)
data.foreach(
u => {
// statement.setObject(sql , )
statement.setObject(1, u.getString(2))
statement.setObject(2, u.getString(3))
statement.setObject(3, u.getString(4))
statement.setObject(4, now)
statement.setObject(5, u.getLong(0))
statement.addBatch() //
}
)
// SQL
statement.executeBatch()
spark.stop()
}
}
中国語を書くと文字化けしが発生します.の場合、ご存じの方、教えてください