データベース接続プールsparkデータベースへの書き込みを実現

3049 ワード

sparkバッチデータがデータベースに書き込まれたブックマルチスレッドの場合、データベース接続プールを作成してデータベースConnectionの接続を最適化する
  • プロファイルdb.properties

  • パラメータMAX-IDLE MIN-IDLE待ち略もあります
    jdbc.driver=com.mysql.jdbc.Driver
    jdbc.url=jdbc:mysql://localhost:3306/test
    jdbc.user=spark
    jdbc.password=spark
    jdbc.max.active=10  //     
    
  • 常亮類Constans
  • 定数が多い場合、Constansを個別にカプセル化すると拡張と修正が容易になります.
    public interface Constants {
        String JDBC_DRIVER = "jdbc.driver";
        String JDBC_URL = "jdbc.url";
        String JDBC_USER = "jdbc.user";
        String JDBC_PASSWORD = "jdbc.password";
        String JDBC_MAX_ACTIVE = "jdbc.max.active";
    }
    
  • 接続プールConnectionPool
  • public class ConnectionPool {
    
        private static LinkedList pool = new LinkedList();
        private ConnectionPool(){}
    
        static {//     
            //    
            try {
                Properties properties = new Properties();
                properties.load(ConnectionPool.class.getClassLoader().getResourceAsStream("db.properties"));
                Class.forName(properties.getProperty(Constants.JDBC_DRIVER));
                int maxActive = Integer.valueOf(properties.getProperty(Constants.JDBC_MAX_ACTIVE));
                String url = properties.getProperty(Constants.JDBC_URL);
                String password = properties.getProperty(Constants.JDBC_PASSWORD);
                String user = properties.getProperty(Constants.JDBC_USER);
                
                //       
                for(int i = 0; i < maxActive; i++) {
                    pool.push(DriverManager.getConnection(url, user, password));
                }
            } catch (Exception e) {
                throw new ExceptionInInitializerError("     ~");
            }
        }
        
    	//        
        public static Connection getConnection() {
            while(pool.isEmpty()) {
                try {
                    System.out.println("     ,     ~~~~~");
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return pool.poll();
        }
        
    	// spark          ,    。          
        public static void release(Connection connection) {
            pool.push(connection);
        }
    }
    
  • コードコール
  • private def foreachOps5(rbkRDD: RDD[(String, Int)]) = {
        rbkRDD.foreachPartition(partition => {
            val connection:Connection = ConnectionPool.getConnection()
            val sql = "insert into wordcount(word, `count`) values(?, ?)"
            val ps = connection.prepareStatement(sql)
            //step 3
            partition.foreach{case (word, count) => {
                ps.setString(1, word)
                ps.setInt(2, count)
                ps.addBatch()
            }}
            ps.executeBatch()
            ps.close()
            ConnectionPool.release(connection)
        })
    }
    

    パーティション内のデータが大きい場合(よりデータの見積りが可能)、ps.addBatch()キャッシュが不足する可能性があります.キャッシュに追加された場合、ps.executeBatch()を5万または(推定値)で実行することができます.変数var count=0カウントを追加するだけでいい