いくつかのデータベースの大規模なデータの大量挿入を実現します。


Sql Serverサポートデータの大量挿入だけは知っていましたが、Oracle、SQLite、MySQLもサポートしていますが、OracleはOrace.DataAccesドライバを使う必要があります。今日はいくつかのデータベースの一括挿入解決方法を貼り付けます。
まず、IProviderに大量挿入を実現するためのプラグインサービスインターフェースIBacher Providerがあります。このインターフェースは前の記事ですでに述べました。

/// <summary> 
  ///            。 
  /// </summary> 
  public interface IBatcherProvider : IProviderService 
  { 
    /// <summary> 
    ///   <see cref="DataTable"/>             。 
    /// </summary> 
    /// <param name="dataTable">       <see cref="DataTable"/>。</param> 
    /// <param name="batchSize">         。</param> 
    void Insert(DataTable dataTable, int batchSize = 10000); 
  } 
一、Sql Serverデータ一括挿入
Sql Serverの大量挿入は簡単で、Sql BulkCopyを使えばいいです。以下はこの種類の実現です。

/// <summary> 
  ///   System.Data.SqlClient             。 
  /// </summary> 
  public sealed class MsSqlBatcher : IBatcherProvider 
  { 
    /// <summary> 
    ///               。 
    /// </summary> 
    public ServiceContext ServiceContext { get; set; } 
 
    /// <summary> 
    ///   <see cref="DataTable"/>             。 
    /// </summary> 
    /// <param name="dataTable">       <see cref="DataTable"/>。</param> 
    /// <param name="batchSize">         。</param> 
    public void Insert(DataTable dataTable, int batchSize = 10000) 
    { 
      Checker.ArgumentNull(dataTable, "dataTable"); 
      if (dataTable.Rows.Count == 0) 
      { 
        return; 
      } 
      using (var connection = (SqlConnection)ServiceContext.Database.CreateConnection()) 
      { 
        try 
        { 
          connection.TryOpen(); 
          //          
          var tableName = DbUtility.FormatByQuote(ServiceContext.Database.Provider.GetService<ISyntaxProvider>(), dataTable.TableName); 
          using (var bulk = new SqlBulkCopy(connection, SqlBulkCopyOptions.KeepIdentity, null) 
            { 
              DestinationTableName = tableName,  
              BatchSize = batchSize 
            }) 
          { 
            //     , bulk     
            dataTable.EachColumn(c => bulk.ColumnMappings.Add(c.ColumnName, c.ColumnName), c => !c.AutoIncrement); 
            bulk.WriteToServer(dataTable); 
            bulk.Close(); 
          } 
        } 
        catch (Exception exp) 
        { 
          throw new BatcherException(exp); 
        } 
        finally 
        { 
          connection.TryClose(); 
        } 
      } 
    } 
  } 
以上は事務を使用していません。使用事務は性能に影響があります。事務を使うなら、Sql BulkCopyOptions.UseInternal Transationを設定できます。
二、Oracleデータ一括挿入
System.Data.OracleClientは大量挿入に対応していないので、Oracle.DataAccessコンポーネントを提供者として使用するしかないです。

/// <summary> 
  /// Oracle.Data.Access               。 
  /// </summary> 
  public sealed class OracleAccessBatcher : IBatcherProvider 
  { 
    /// <summary> 
    ///               。 
    /// </summary> 
    public ServiceContext ServiceContext { get; set; } 
 
    /// <summary> 
    ///   <see cref="DataTable"/>             。 
    /// </summary> 
    /// <param name="dataTable">       <see cref="DataTable"/>。</param> 
    /// <param name="batchSize">         。</param> 
    public void Insert(DataTable dataTable, int batchSize = 10000) 
    { 
      Checker.ArgumentNull(dataTable, "dataTable"); 
      if (dataTable.Rows.Count == 0) 
      { 
        return; 
      } 
      using (var connection = ServiceContext.Database.CreateConnection()) 
      { 
        try 
        { 
          connection.TryOpen(); 
          using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand()) 
          { 
            if (command == null) 
            { 
              throw new BatcherException(new ArgumentException("command")); 
            } 
            command.Connection = connection; 
            command.CommandText = GenerateInserSql(ServiceContext.Database, command, dataTable); 
            command.ExecuteNonQuery(); 
          } 
        } 
        catch (Exception exp) 
        { 
          throw new BatcherException(exp); 
        } 
        finally 
        { 
          connection.TryClose(); 
        } 
      } 
    } 
 
    /// <summary> 
    ///        sql  。 
    /// </summary> 
    /// <param name="database"></param> 
    /// <param name="command"></param> 
    /// <param name="table"></param> 
    /// <returns></returns> 
    private string GenerateInserSql(IDatabase database, DbCommand command, DataTable table) 
    { 
      var names = new StringBuilder(); 
      var values = new StringBuilder(); 
      //   DataTable            
      var data = table.ToArray(); 
 
      //  ArrayBindCount   
      command.GetType().GetProperty("ArrayBindCount").SetValue(command, table.Rows.Count, null); 
 
      var syntax = database.Provider.GetService<ISyntaxProvider>(); 
      for (var i = 0; i < table.Columns.Count; i++) 
      { 
        var column = table.Columns[i]; 
 
        var parameter = database.Provider.DbProviderFactory.CreateParameter(); 
        if (parameter == null) 
        { 
          continue; 
        } 
        parameter.ParameterName = column.ColumnName; 
        parameter.Direction = ParameterDirection.Input; 
        parameter.DbType = column.DataType.GetDbType(); 
        parameter.Value = data[i]; 
 
        if (names.Length > 0) 
        { 
          names.Append(","); 
          values.Append(","); 
        } 
        names.AppendFormat("{0}", DbUtility.FormatByQuote(syntax, column.ColumnName)); 
        values.AppendFormat("{0}{1}", syntax.ParameterPrefix, column.ColumnName); 
 
        command.Parameters.Add(parameter); 
      } 
      return string.Format("INSERT INTO {0}({1}) VALUES ({2})", DbUtility.FormatByQuote(syntax, table.TableName), names, values); 
    } 
  } 
以上の最も重要なステップは、DataTableを配列の配列表現、すなわちobject[]に変換し、前の配列の上付きが列の個数であり、後の配列が行の個数であるため、循環Columnsは後の配列をParameeterの値とし、つまりパラメータの値は配列である。insert文は普通の挿入語句と違います。
三、SQLiteデータ一括挿入
SQLiteのロット挿入は、トランザクションを開くだけでいいです。この具体的な原理は分かりません。

public sealed class SQLiteBatcher : IBatcherProvider 
  { 
    /// <summary> 
    ///               。 
    /// </summary> 
    public ServiceContext ServiceContext { get; set; } 
 
    /// <summary> 
    ///   <see cref="DataTable"/>             。 
    /// </summary> 
    /// <param name="dataTable">       <see cref="DataTable"/>。</param> 
    /// <param name="batchSize">         。</param> 
    public void Insert(DataTable dataTable, int batchSize = 10000) 
    { 
      Checker.ArgumentNull(dataTable, "dataTable"); 
      if (dataTable.Rows.Count == 0) 
      { 
        return; 
      } 
      using (var connection = ServiceContext.Database.CreateConnection()) 
      { 
        DbTransaction transcation = null; 
        try 
        { 
          connection.TryOpen(); 
          transcation = connection.BeginTransaction(); 
          using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand()) 
          { 
            if (command == null) 
            { 
              throw new BatcherException(new ArgumentException("command")); 
            } 
            command.Connection = connection; 
 
            command.CommandText = GenerateInserSql(ServiceContext.Database, dataTable); 
            if (command.CommandText == string.Empty) 
            { 
              return; 
            } 
 
            var flag = new AssertFlag(); 
            dataTable.EachRow(row => 
              { 
                var first = flag.AssertTrue(); 
                ProcessCommandParameters(dataTable, command, row, first); 
                command.ExecuteNonQuery(); 
              }); 
          } 
          transcation.Commit(); 
        } 
        catch (Exception exp) 
        { 
          if (transcation != null) 
          { 
            transcation.Rollback(); 
          } 
          throw new BatcherException(exp); 
        } 
        finally 
        { 
          connection.TryClose(); 
        } 
      } 
    } 
 
    private void ProcessCommandParameters(DataTable dataTable, DbCommand command, DataRow row, bool first) 
    { 
      for (var c = 0; c < dataTable.Columns.Count; c++) 
      { 
        DbParameter parameter; 
        //      ,        
        if (first) 
        { 
          parameter = ServiceContext.Database.Provider.DbProviderFactory.CreateParameter(); 
          parameter.ParameterName = dataTable.Columns[c].ColumnName; 
          command.Parameters.Add(parameter); 
        } 
        else 
        { 
          parameter = command.Parameters[c]; 
        } 
        parameter.Value = row[c]; 
      } 
    } 
 
    /// <summary> 
    ///        sql  。 
    /// </summary> 
    /// <param name="database"></param> 
    /// <param name="table"></param> 
    /// <returns></returns> 
    private string GenerateInserSql(IDatabase database, DataTable table) 
    { 
      var syntax = database.Provider.GetService<ISyntaxProvider>(); 
      var names = new StringBuilder(); 
      var values = new StringBuilder(); 
      var flag = new AssertFlag(); 
      table.EachColumn(column => 
        { 
          if (!flag.AssertTrue()) 
          { 
            names.Append(","); 
            values.Append(","); 
          } 
          names.Append(DbUtility.FormatByQuote(syntax, column.ColumnName)); 
          values.AppendFormat("{0}{1}", syntax.ParameterPrefix, column.ColumnName); 
        }); 
      return string.Format("INSERT INTO {0}({1}) VALUES ({2})", DbUtility.FormatByQuote(syntax, table.TableName), names, values); 
    } 
  } 
四、MySqlデータ一括挿入

/// <summary> 
  ///   MySql.Data               。 
  /// </summary> 
  public sealed class MySqlBatcher : IBatcherProvider 
  { 
    /// <summary> 
    ///               。 
    /// </summary> 
    public ServiceContext ServiceContext { get; set; } 
 
    /// <summary> 
    ///   <see cref="DataTable"/>             。 
    /// </summary> 
    /// <param name="dataTable">       <see cref="DataTable"/>。</param> 
    /// <param name="batchSize">         。</param> 
    public void Insert(DataTable dataTable, int batchSize = 10000) 
    { 
      Checker.ArgumentNull(dataTable, "dataTable"); 
      if (dataTable.Rows.Count == 0) 
      { 
        return; 
      } 
      using (var connection = ServiceContext.Database.CreateConnection()) 
      { 
        try 
        { 
          connection.TryOpen(); 
          using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand()) 
          { 
            if (command == null) 
            { 
              throw new BatcherException(new ArgumentException("command")); 
            } 
            command.Connection = connection; 
 
            command.CommandText = GenerateInserSql(ServiceContext.Database, command, dataTable); 
            if (command.CommandText == string.Empty) 
            { 
              return; 
            } 
            command.ExecuteNonQuery(); 
          } 
        } 
        catch (Exception exp) 
        { 
          throw new BatcherException(exp); 
        } 
        finally 
        { 
          connection.TryClose(); 
        } 
      } 
    } 
 
    /// <summary> 
    ///        sql  。 
    /// </summary> 
    /// <param name="database"></param> 
    /// <param name="command"></param> 
    /// <param name="table"></param> 
    /// <returns></returns> 
    private string GenerateInserSql(IDatabase database, DbCommand command, DataTable table) 
    { 
      var names = new StringBuilder(); 
      var values = new StringBuilder(); 
      var types = new List<DbType>(); 
      var count = table.Columns.Count; 
      var syntax = database.Provider.GetService<ISyntaxProvider>(); 
      table.EachColumn(c => 
        { 
          if (names.Length > 0) 
          { 
            names.Append(","); 
          } 
          names.AppendFormat("{0}", DbUtility.FormatByQuote(syntax, c.ColumnName)); 
          types.Add(c.DataType.GetDbType()); 
        }); 
 
      var i = 0; 
      foreach (DataRow row in table.Rows) 
      { 
        if (i > 0) 
        { 
          values.Append(","); 
        } 
        values.Append("("); 
        for (var j = 0; j < count; j++) 
        { 
          if (j > 0) 
          { 
            values.Append(", "); 
          } 
          var isStrType = IsStringType(types[j]); 
          var parameter = CreateParameter(database.Provider, isStrType, types[j], row[j], syntax.ParameterPrefix, i, j); 
          if (parameter != null) 
          { 
            values.Append(parameter.ParameterName); 
            command.Parameters.Add(parameter); 
          } 
          else if (isStrType) 
          { 
            values.AppendFormat("'{0}'", row[j]); 
          } 
          else 
          { 
            values.Append(row[j]); 
          } 
        } 
        values.Append(")"); 
        i++; 
      } 
      return string.Format("INSERT INTO {0}({1}) VALUES {2}", DbUtility.FormatByQuote(syntax, table.TableName), names, values); 
    } 
 
    /// <summary> 
    ///           。 
    /// </summary> 
    /// <param name="dbType"></param> 
    /// <returns></returns> 
    private bool IsStringType(DbType dbType) 
    { 
      return dbType == DbType.AnsiString || dbType == DbType.AnsiStringFixedLength || dbType == DbType.String || dbType == DbType.StringFixedLength; 
    } 
 
    /// <summary> 
    ///     。 
    /// </summary> 
    /// <param name="provider"></param> 
    /// <param name="isStrType"></param> 
    /// <param name="dbType"></param> 
    /// <param name="value"></param> 
    /// <param name="parPrefix"></param> 
    /// <param name="row"></param> 
    /// <param name="col"></param> 
    /// <returns></returns> 
    private DbParameter CreateParameter(IProvider provider, bool isStrType, DbType dbType, object value, char parPrefix, int row, int col) 
    { 
      //         ,      ,  ,          (  ' )           
      if ((isStrType && value.ToString().IndexOf('\'') != -1) || dbType == DbType.DateTime) 
      { 
        var name = string.Format("{0}p_{1}_{2}", parPrefix, row, col); 
        var parameter = provider.DbProviderFactory.CreateParameter(); 
        parameter.ParameterName = name; 
        parameter.Direction = ParameterDirection.Input; 
        parameter.DbType = dbType; 
        parameter.Value = value; 
        return parameter; 
      } 
      return null; 
    } 
  } 
MySqlの一括挿入は、値をすべて文のvaluesに書いています。例えば、insert batch(id,name)values(1,'1',2',2',3',3',....10')。
五、テスト
次にテスト用の例を書いて、大量挿入の効果を見てみます。

    public void TestBatchInsert() 
    { 
      Console.WriteLine(TimeWatcher.Watch(() => 
        InvokeTest(database => 
          { 
            var table = new DataTable("Batcher"); 
            table.Columns.Add("Id", typeof(int)); 
            table.Columns.Add("Name1", typeof(string)); 
            table.Columns.Add("Name2", typeof(string)); 
            table.Columns.Add("Name3", typeof(string)); 
            table.Columns.Add("Name4", typeof(string)); 
 
            //  100000    
            for (var i = 0; i < 100000; i++) 
            { 
              table.Rows.Add(i, i.ToString(), i.ToString(), i.ToString(), i.ToString()); 
            } 
 
            //   IBatcherProvider 
            var batcher = database.Provider.GetService<IBatcherProvider>(); 
            if (batcher == null) 
            { 
              Console.WriteLine("       。"); 
            } 
            else 
            { 
              batcher.Insert(table); 
            } 
 
            //  batcher      
            var sql = new SqlCommand("SELECT COUNT(1) FROM Batcher"); 
            Console.WriteLine("     {0}    ", database.ExecuteScalar(sql)); 
 
          }))); 
    } 
以下の表には4つのデータベースが10万本のデータを生成するのに時間がかかります。
データベース
時間を費やす
Ms Sql
00:00:02.376300
Oracle
00:00:01.51559
SQLite
00:00:01.675634
MySql
00:00:05.45166891
以上が本文の全部です。皆さんの勉強に役に立つように、私たちを応援してください。