sparkはmysqlのいくつかの方法を書き込んで、異なるシーンに対して

7118 ワード

方法1:各フィールドは事前に定められている
val prop = new java.util.Properties
prop.setProperty("user", "root")
prop.setProperty("password", "123456")

df1.write.mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.1.97:3306/xiang_log", "nginx_code_phone", prop)
df2.write.mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.1.97:3306/xiang_log", "nginx_params_phone", prop)

方法2:フィールドを自由に増減できる
    df.foreachPartition(p => {
      @transient val conn = ConnectionPool.getConnection
      p.foreach(x => {
        val sql = "insert into app_id(id,date,appid,num) values (" +
          "'"+UUID.randomUUID+"'," +
          "'"+x.getInt(0)+"'," +
          "'"+x.getString(1)+"'," +
          "'"+x.getLong(2)+"'" +
          ")"
        val stmt = conn.createStatement
        stmt.executeUpdate(sql)
      })
      ConnectionPool.returnConnection(conn)
    })
package com.prince.spark.util;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.LinkedList;

public class ConnectionPool {
    private static LinkedList connectionQueue;

    static {
        try {
            Class.forName("com.mysql.jdbc.Driver");
        }catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }

    public synchronized static Connection getConnection() {
        try {
            if (connectionQueue == null) {
                connectionQueue = new LinkedList();
                for (int i = 0;i < 5;i ++) {
                    Connection conn = DriverManager.getConnection(
                            "jdbc:mysql://192.168.1.97:3306/xiang_log?characterEncoding=utf8",
                            "root",
                            "123456"
                    );
                    connectionQueue.push(conn);
                }
            }
        }catch (Exception e) {
            e.printStackTrace();
        }
        return connectionQueue.poll();
    }

    public static void returnConnection(Connection conn) {
        connectionQueue.push(conn);
    }
}

方法3:計算結果の書き込みに関わる場合があり、dfを組み立てる
//    RDD
val arrayRDD = sc.parallelize(List ((num,log_date)))
//   RDD   rowRDD
val resultRowRDD = arrayRDD.map(p =>Row(
  p._1.toInt,
  p._2.toString,
  new Timestamp(new java.util.Date().getTime)
))
//  StructType         schema
val resultSchema = StructType(
  List(
    StructField("verify_num", IntegerType, true), 
    StructField("log_date", StringType, true), //             
    StructField("create_time", TimestampType, true) //         
  )
)
//    DataFrame
val DF = spark.createDataFrame(resultRowRDD,resultSchema)
//      Mysql
DF.write.mode("append")
  .format("jdbc")
  .option("url","jdbc:mysql://192.168.1.97:3306/xiang_log")
  .option("dbtable","verify") //  
  .option("user","root")
  .option("password","123456")
  .save()