データベース接続プールsparkデータベースへの書き込みを実現
3049 ワード
sparkバッチデータがデータベースに書き込まれたブックマルチスレッドの場合、データベース接続プールを作成してデータベースConnectionの接続を最適化するプロファイルdb.properties
パラメータMAX-IDLE MIN-IDLE待ち略もあります常亮類Constans 定数が多い場合、Constansを個別にカプセル化すると拡張と修正が容易になります.接続プールConnectionPool コードコール
パーティション内のデータが大きい場合(よりデータの見積りが可能)、ps.addBatch()キャッシュが不足する可能性があります.キャッシュに追加された場合、ps.executeBatch()を5万または(推定値)で実行することができます.変数var count=0カウントを追加するだけでいい
パラメータMAX-IDLE MIN-IDLE待ち略もあります
jdbc.driver=com.mysql.jdbc.Driver
jdbc.url=jdbc:mysql://localhost:3306/test
jdbc.user=spark
jdbc.password=spark
jdbc.max.active=10 //
public interface Constants {
String JDBC_DRIVER = "jdbc.driver";
String JDBC_URL = "jdbc.url";
String JDBC_USER = "jdbc.user";
String JDBC_PASSWORD = "jdbc.password";
String JDBC_MAX_ACTIVE = "jdbc.max.active";
}
public class ConnectionPool {
private static LinkedList pool = new LinkedList();
private ConnectionPool(){}
static {//
//
try {
Properties properties = new Properties();
properties.load(ConnectionPool.class.getClassLoader().getResourceAsStream("db.properties"));
Class.forName(properties.getProperty(Constants.JDBC_DRIVER));
int maxActive = Integer.valueOf(properties.getProperty(Constants.JDBC_MAX_ACTIVE));
String url = properties.getProperty(Constants.JDBC_URL);
String password = properties.getProperty(Constants.JDBC_PASSWORD);
String user = properties.getProperty(Constants.JDBC_USER);
//
for(int i = 0; i < maxActive; i++) {
pool.push(DriverManager.getConnection(url, user, password));
}
} catch (Exception e) {
throw new ExceptionInInitializerError(" ~");
}
}
//
public static Connection getConnection() {
while(pool.isEmpty()) {
try {
System.out.println(" , ~~~~~");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return pool.poll();
}
// spark , 。
public static void release(Connection connection) {
pool.push(connection);
}
}
private def foreachOps5(rbkRDD: RDD[(String, Int)]) = {
rbkRDD.foreachPartition(partition => {
val connection:Connection = ConnectionPool.getConnection()
val sql = "insert into wordcount(word, `count`) values(?, ?)"
val ps = connection.prepareStatement(sql)
//step 3
partition.foreach{case (word, count) => {
ps.setString(1, word)
ps.setInt(2, count)
ps.addBatch()
}}
ps.executeBatch()
ps.close()
ConnectionPool.release(connection)
})
}
パーティション内のデータが大きい場合(よりデータの見積りが可能)、ps.addBatch()キャッシュが不足する可能性があります.キャッシュに追加された場合、ps.executeBatch()を5万または(推定値)で実行することができます.変数var count=0カウントを追加するだけでいい