mysqlフロー式読み取りの大きいデータ量と大量挿入データ分析

8362 ワード

1、流式はjavaを読み取り、mysqlから大量のデータを読み取り、結果がmyqlサービスから戻ったらすぐに処理します。このように応用すると大量のメモリが必要なくなります。この場合はストリーミングで読み取るべきです。
PreparedStatement ps = connection.prepareStatement("select .. from ..", 
            ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); 

/*
TYPE_FORWARD_ONLY CONCUR_READ_ONLY mysql       ,             : PreparedStatement ps = connection.prepareStatement("select .. from .."); 
*/

//    jdbc url  defaultFetchSize     ,                    
ps.setFetchSize(Integer.MIN_VALUE); 
ResultSet rs = ps.executeQuery(); 

while (rs.next()) { 
  System.out.println(rs.getString("fieldName")); 
}

/*
mysql               ,     forward-only,read-only fatch size Integer.MIN_VALUE。          :
/**
 * We only stream result sets when they are forward-only, read-only, and the
 * fetch size has been set to Integer.MIN_VALUE
 *
 * @return true if this result set should be streamed row at-a-time, rather
 * than read all at once.
 */
protected boolean createStreamingResultSet() {
    try {
        synchronized(checkClosed().getConnectionMutex()) {
            return ((this.resultSetType == java.sql.ResultSet.TYPE_FORWARD_ONLY)
                 && (this.resultSetConcurrency == java.sql.ResultSet.CONCUR_READ_ONLY) 
                 && (this.fetchSize == Integer.MIN_VALUE));
        }
    } catch (SQLException e) {
        // we can't break the interface, having this be no-op in case of error is ok

        return false;
    }
}
*/
2、一括書込みは、アプリケーションが一本の実行insertでデータを書き込む場合、書き込みが遅いです。主な原因は、単一の書き込みの際にdb間の大量の要求応答インタラクションに適用する必要があるからです。各要求は独立した事務提出である。このようにネットワークの遅延が大きい場合、複数回の要求は大量の時間を消費するネットワークの遅延になります。二つ目は、トランザクションdbごとにディスク操作を更新して事務ログを書くので、事務の持続性を保証します。各トランザクションは1つのデータを書き込むだけなので、ディスクの利用率は高くありません。ディスクのioはブロックごとに来るので、連続して大量のデータを書き込むほうが効率がいいです。大量書き込みに変更しなければなりません。要求数と事務数を減らします。以下は一括挿入の例です。
int batchSize = 1000;
PreparedStatement ps = connection.prepareStatement("insert into tb1 (c1,c2,c3...) values (?,?,?...)");

for (int i = 0; i < list.size(); i++) {

    ps.setXXX(list.get(i).getC1());
    ps.setYYY(list.get(i).getC2());
    ps.setZZZ(list.get(i).getC3());

    ps.addBatch();

    if ((i + 1) % batchSize == 0) {
        ps.executeBatch();
    }
}

if (list.size() % batchSize != 0) {
    ps.executeBatch();
}
//  :jdbc     :rewriteBatchedStatements=true
上のコードの例は、1000個のデータごとに要求を送信することです。mysql駆動内部では、アプリケーション側で複数回のaddBatch()のパラメータをmulti valueのinsert文に統合してdbに送信して実行します。例えば、insert into tb 1(c 1、c 2、c 3)values(v 1、v 2、v 3)、(v 4、v 5、v 6)、(v 7、v 8、v 9)…といったように、一つずつのinsertより明らかに少ない要求ができます。ネットワーク遅延消費時間とディスクio時間が減少し、tpsが向上しました。
3、コードの展示
public class TestInsert {

    public static void main(String[] args) throws SQLException {

        int batchSize = 1000;
        int insertCount = 1000;

        testDefault(batchSize, insertCount);

        testRewriteBatchedStatements(batchSize,insertCount);

    }

    //      
    private static void testDefault(int batchSize, int insertCount) throws SQLException{  

        long start = System.currentTimeMillis();

        doBatchedInsert(batchSize, insertCount,"");

        long end = System.currentTimeMillis();
        System.out.println("default:" + (end -start) + "ms");
    }


    //    
    private static void testRewriteBatchedStatements(int batchSize, int insertCount) throws SQLException {

        long start = System.currentTimeMillis();

        doBatchedInsert(batchSize, insertCount, "rewriteBatchedStatements=true");

        long end = System.currentTimeMillis();
        System.out.println("rewriteBatchedStatements:" + (end -start) + "ms");
    }


    private static void doBatchedInsert(int batchSize, int insertCount, String mysqlProperties) throws SQLException {
        DruidDataSource dataSource = new DruidDataSource();
        dataSource.setUrl("jdbc:mysql://ip:3306/test?" + mysqlProperties);
        dataSource.setUsername("name");
        dataSource.setPassword("password");

        dataSource.init();

        Connection connection = dataSource.getConnection();

        PreparedStatement preparedStatement = connection.prepareStatement("insert into Test (name,gmt_created,gmt_modified) values (?,now(),now())");

        for (int i = 0; i < insertCount; i++) {
            preparedStatement.setString(1, i+" ");
            preparedStatement.addBatch();
            if((i+1) % batchSize == 0) {
                preparedStatement.executeBatch();
            }
        }
        preparedStatement.executeBatch();

        connection.close();   
        dataSource.close();
    }

}