SQL Server大量挿入データの完璧な解決策


一、Sql Server挿入案の紹介SqlServer一括挿入の方式については、3つの一般的な挿入方式があり、InsertBatchInsertSqlBulkCopy、以下の3つの方式の速度を比較します。
1.一般的なInsert挿入方法

public static void Insert(IEnumerable<Person> persons)
{
  using (var con = new SqlConnection("Server=.;Database=DemoDataBase;User ID=sa;Password=8888;"))
  {
    con.Open();
    foreach (var person in persons)
    {
      using (var com = new SqlCommand(
        "INSERT INTO dbo.Person(Id,Name,Age,CreateTime,Sex)VALUES(@Id,@Name,@Age,@CreateTime,@Sex)",
        con))
      {
        com.Parameters.AddRange(new[]
        {
          new SqlParameter("@Id", SqlDbType.BigInt) {Value = person.Id},
          new SqlParameter("@Name", SqlDbType.VarChar, 64) {Value = person.Name},
          new SqlParameter("@Age", SqlDbType.Int) {Value = person.Age},
          new SqlParameter("@CreateTime", SqlDbType.DateTime)
            {Value = person.CreateTime ?? (object) DBNull.Value},
          new SqlParameter("@Sex", SqlDbType.Int) {Value = (int)person.Sex},
        });
        com.ExecuteNonQuery();
      }
    }
  }
}
2.スプライスBatchInsert挿入文

public static void BatchInsert(Person[] persons)
{
  using (var con = new SqlConnection("Server=.;Database=DemoDataBase;User ID=sa;Password=8888;"))
  {
    con.Open();
    var pageCount = (persons.Length - 1) / 1000 + 1;
    for (int i = 0; i < pageCount; i++)
    {
      var personList = persons.Skip(i * 1000).Take(1000).ToArray();
      var values = personList.Select(p =>
        $"({p.Id},'{p.Name}',{p.Age},{(p.CreateTime.HasValue ? $"'{p.CreateTime:yyyy-MM-dd HH:mm:ss}'" : "NULL")},{(int) p.Sex})");
      var insertSql =
        $"INSERT INTO dbo.Person(Id,Name,Age,CreateTime,Sex)VALUES{string.Join(",", values)}";
      using (var com = new SqlCommand(insertSql, con))
      {
        com.ExecuteNonQuery();
      }
    }
  }
}
3.SqlBulkCopyスキームを挿入する

public static void BulkCopy(IEnumerable<Person> persons)
{
  using (var con = new SqlConnection("Server=.;Database=DemoDataBase;User ID=sa;Password=8888;"))
  {
    con.Open();
    var table = new DataTable();
    table.Columns.AddRange(new []
    {
      new DataColumn("Id", typeof(long)), 
      new DataColumn("Name", typeof(string)), 
      new DataColumn("Age", typeof(int)), 
      new DataColumn("CreateTime", typeof(DateTime)), 
      new DataColumn("Sex", typeof(int)), 
    });
    foreach (var p in persons)
    {
      table.Rows.Add(new object[] {p.Id, p.Name, p.Age, p.CreateTime, (int) p.Sex});
    }

    using (var copy = new SqlBulkCopy(con))
    {
      copy.DestinationTableName = "Person";
      copy.WriteToServer(table);
    }
  }
}
3.3つの案の速度比較
シナリオ

時間
インセンス
1千条
145.4351 ms
Batch Insert
1千条
103.9 ms
Sql BulkCopy
1千条
7.021 ms
インセンス
1万条
151.266 ms
Batch Insert
1万条
8506.274 ms
Sql BulkCopy
1万条
30.129 ms
インセンス
10万本
13875.934 ms
Batch Insert
10万本
8278.9906 ms
Sql BulkCopy
10万本
314.8402 ms
両者の挿入効率の比較は、Insertは明らかにSqlBulkCopyより遅いです。20~40倍ぐらいの性能の差があります。次にSqlBulkCopyをカプセル化して、大量挿入をより便利にします。
二、Sql BulkCopyパッケージコード
1.方法紹介
一括挿入拡張方法の署名
方法
メソッドパラメータ
紹介する
BulkCopy
同期の一括挿入方法
Sql Connection connection
sql server接続先
IEnumerable<T>source
一括挿入が必要なデータソース
string tableble Name=null
表の名前を挿入します。【NULLのデフォルトはエンティティ名です。】
int bulk CopyTimeout=30
一括挿入タイムアウト時間
int batSize=0
データベースに書き込む一連の数(0なら全部を一括で挿入すること)が最適です。個人的には、バッチSize属性を1000行に設定してから、性能を見てみます。できれば、性能が下がるか、タイムアウトするまで、行数を倍にします。もしタイムアウトが1000で発生したら、行数を半分にします。例えば500まで減らします。
Sql BulkCopyOptions options=Sql BulkCopyOptions.Default
一括コピーパラメータ
Sql Transation external Transation=null
実行するトランザクションの対象
BulkCopyAync
非同期の一括挿入方法
Sql Connection connection
sql server接続先
IEnumerable<T>source
一括挿入が必要なデータソース
string tableble Name=null
表の名前を挿入します。【NULLのデフォルトはエンティティ名です。】
int bulk CopyTimeout=30
一括挿入タイムアウト時間
int batSize=0
データベースに書き込む一連の数(0なら全部を一括で挿入すること)が最適です。個人的には、バッチSize属性を1000行に設定してから、性能を見てみます。できれば、性能が下がるか、タイムアウトするまで、行数を倍にします。もしタイムアウトが1000で発生したら、行数を半分にします。例えば500まで減らします。
Sql BulkCopyOptions options=Sql BulkCopyOptions.Default
一括コピーパラメータ
Sql Transation external Transation=null
実行するトランザクションの対象
この方法は主に二つの問題を解決しました。
  • は、手動でDataTableまたはIDataReaderインターフェースの実装クラスを構築することを免除し、手動で構築した変換は比較的に維持しにくい。修正フィールドはこれらの箇所を全部修正しなければならない。特に、エニュメレート・タイプの特殊処理を彼の基本タイプに変換する必要がある(デフォルトint
  • は、SqlBulkCopyオブジェクトを直接作成することなく、データベース列のマッピングを配置し、いくつかの属性の構成
  • を有する。
    この方案も当社で使用して、会社の大量挿入データの需要を満足させます。例えば、第三者の対帳データはこの方法でExpressionが動的にデータ変換関数を生成します。効率は手書きの元コードと同じです。元の手書きコードに比べて、余計な変換損失は小さいです。
    この方式は他のネットワークとは異なるものである。 Listに変換し、DataTableに書き込んでいくのではなく、SqlBulkCopyを実現するリーダパッケージIDataReaderを使用して、Listに1ラインのデータを挿入するごとにデータを変換することができる。SqlBulkCopy案とIDataReader案との比較の利点
    効率が高い:DataTable案はまず完全に変換してから、DataTableによってデータベースに書き込むことができます。SqlBulkCopy案は変換しながらIDataReaderにデータベースに書き込むことができます。
    占有メモリが少ない:SqlBulkCopy案はまず完全に変換してから、DataTableによってデータベースに書き込むことができます。大量のメモリを必要とします。SqlBulkCopy案は変換しながらIDataReaderに渡してデータベースに書き込むことができます。メモリを使いすぎる必要はありません。
    強い:書き込みながら変換するので、SqlBulkCopyからは、データを連続的に挿入する効果があります。
    2.原理の実現
    ①エンティティModelとテーブルマッピング
    データベーステーブルコード
    
    CREATE TABLE [dbo].[Person](
    	[Id] [BIGINT] NOT NULL,
    	[Name] [VARCHAR](64) NOT NULL,
    	[Age] [INT] NOT NULL,
    	[CreateTime] [DATETIME] NULL,
    	[Sex] [INT] NOT NULL,
    PRIMARY KEY CLUSTERED 
    (
    	[Id] ASC
    )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
    ) ON [PRIMARY]
    エンティティコード
    
    public class Person
    {
      public long Id { get; set; }
      public string Name { get; set; }
      public int Age { get; set; }
      public DateTime? CreateTime { get; set; }
      public Gender Sex { get; set; }
    }
    
    public enum Gender
    {
      Man = 0,
      Woman = 1
    }
  • はフィールドマップを作成します。「このフィールドがマッピングされていないと、データが位置を間違えてしまいます。タイプが正しくないとエラーが発生します。」
  • は、マッピングに使用されるEnumerableReaderタイプのSqlBulkCopy属性を作成して完了し、データ列とデータベース内の列のマッピング
  • 
    //        
    using (var copy = new SqlBulkCopy(connection, options, externalTransaction))
    {
      foreach (var column in ModelToDataTable<TModel>.Columns)
      {
        //      
        copy.ColumnMappings.Add(column.ColumnName, column.ColumnName);
      }
    }
    ②エンティティがデータ行に変換される
    データをデータ行に変換するには、ColumnMappings+ で行われます。Expressionは、 の作成に必要なプログラムクラス、属性などの情報を取得するためのものである。Expressionは、高効率変換関数を生成するためのものであり、Expressionのタイプは、静的な汎関数特性を利用して、汎型パラメータのキャッシュ効果を実現する。ModelToDataTable<TModel>の静的構造関数では、変換関数が生成され、変換が必要な属性情報が取得され、静的読み取り専用フィールドに保存され、キャッシュが完了します。
    ③IDaaReaderを使ってデータを挿入する重負荷ModelToDataTable<TModel>は、モデルオブジェクトをローズマリーで読み出し、データラインに変換するためのEnumerableReaderインターフェースを実装した読取クラスである。IDataReaderによる読み取りが可能である。SqlBulkCopyは、3つの方法だけを呼び出すことができます。SqlBulkCopyGetOrdinalRead
  • のうち、GetValueは、最初の行で代表番号を読み取るだけです。(記入が必要です。GetOrdinalタイプのSqlBulkCopy属性)
  • において、ColumnMappings方法は次の行に反復し、Readを呼び出して、モデルオブジェクトをデータラインModelToDataTable<TModel>.ToRowData.Invoke()
  • に変換する。
  • において、object[]は、現在の行の下付き位置を指定する値
  • を取得する方法である。
    3.完全コード
    拡張方法クラス
    
     public static class SqlConnectionExtension
      {
        /// <summary>
        ///     
        /// </summary>
        /// <typeparam name="TModel">       </typeparam>
        /// <param name="source">          </param>
        /// <param name="connection">       </param>
        /// <param name="tableName">     【 NULL       】</param>
        /// <param name="bulkCopyTimeout">      </param>
        /// <param name="batchSize">         【   0         】     【        ,          。     ,   BatchSize     1000   ,         。    ,         (     2000、4000 ),         。  ,       1000,          (  500),        。】</param>
        /// <param name="options">      </param>
        /// <param name="externalTransaction">       </param>
        /// <returns>    </returns>
        public static int BulkCopy<TModel>(this SqlConnection connection,
          IEnumerable<TModel> source,
          string tableName = null,
          int bulkCopyTimeout = 30,
          int batchSize = 0,
          SqlBulkCopyOptions options = SqlBulkCopyOptions.Default,
          SqlTransaction externalTransaction = null)
        {
          //     
          using (var reader = new EnumerableReader<TModel>(source))
          {
            //        
            using (var copy = new SqlBulkCopy(connection, options, externalTransaction))
            {
              //    
              copy.DestinationTableName = tableName ?? typeof(TModel).Name;
              //         
              copy.BatchSize = batchSize;
              //    
              copy.BulkCopyTimeout = bulkCopyTimeout;
              //      【                  ,            】【  :                    】
              foreach (var column in ModelToDataTable<TModel>.Columns)
              {
                //      
                copy.ColumnMappings.Add(column.ColumnName, column.ColumnName);
              }
              //          
              copy.WriteToServer(reader);
              //        
              return reader.Depth;
            }
          }
        }
    
        /// <summary>
        ///     -  
        /// </summary>
        /// <typeparam name="TModel">       </typeparam>
        /// <param name="source">          </param>
        /// <param name="connection">       </param>
        /// <param name="tableName">     【 NULL       】</param>
        /// <param name="bulkCopyTimeout">      </param>
        /// <param name="batchSize">         【   0         】     【        ,          。     ,   BatchSize     1000   ,         。    ,         (     2000、4000 ),         。  ,       1000,          (  500),        。】</param>
        /// <param name="options">      </param>
        /// <param name="externalTransaction">       </param>
        /// <returns>    </returns>
        public static async Task<int> BulkCopyAsync<TModel>(this SqlConnection connection,
          IEnumerable<TModel> source,
          string tableName = null,
          int bulkCopyTimeout = 30,
          int batchSize = 0,
          SqlBulkCopyOptions options = SqlBulkCopyOptions.Default,
          SqlTransaction externalTransaction = null)
        {
          //     
          using (var reader = new EnumerableReader<TModel>(source))
          {
            //        
            using (var copy = new SqlBulkCopy(connection, options, externalTransaction))
            {
              //    
              copy.DestinationTableName = tableName ?? typeof(TModel).Name;
              //         
              copy.BatchSize = batchSize;
              //    
              copy.BulkCopyTimeout = bulkCopyTimeout;
              //      【                  ,            】【  :                    】
              foreach (var column in ModelToDataTable<TModel>.Columns)
              {
                //      
                copy.ColumnMappings.Add(column.ColumnName, column.ColumnName);
              }
              //          
              await copy.WriteToServerAsync(reader);
              //        
              return reader.Depth;
            }
          }
        }
      }
    カプセル化したローズマリーデータリーダー
    
     /// <summary>
      ///         
      /// </summary>
      /// <typeparam name="TModel">    </typeparam>
      public class EnumerableReader<TModel> : IDataReader
      {
        /// <summary>
        ///           
        /// </summary>
        /// <param name="source">   </param>
        public EnumerableReader(IEnumerable<TModel> source)
        {
          _source = source ?? throw new ArgumentNullException(nameof(source));
          _enumerable = source.GetEnumerator();
        }
    
        private readonly IEnumerable<TModel> _source;
        private readonly IEnumerator<TModel> _enumerable;
        private object[] _currentDataRow = Array.Empty<object>();
        private int _depth;
        private bool _release;
    
        public void Dispose()
        {
          _release = true;
          _enumerable.Dispose();
        }
    
        public int GetValues(object[] values)
        {
          if (values == null) throw new ArgumentNullException(nameof(values));
          var length = Math.Min(_currentDataRow.Length, values.Length);
          Array.Copy(_currentDataRow, values, length);
          return length;
        }
    
        public int GetOrdinal(string name)
        {
          for (int i = 0; i < ModelToDataTable<TModel>.Columns.Count; i++)
          {
            if (ModelToDataTable<TModel>.Columns[i].ColumnName == name) return i;
          }
    
          return -1;
        }
    
        public long GetBytes(int ordinal, long dataIndex, byte[] buffer, int bufferIndex, int length)
        {
          if (dataIndex < 0) throw new Exception($"        0!");
          if (bufferIndex < 0) throw new Exception("             0!");
          if (length < 0) throw new Exception("        0!");
          var numArray = (byte[])GetValue(ordinal);
          if (buffer == null) return numArray.Length;
          if (buffer.Length <= bufferIndex) throw new Exception("                    !");
          var freeLength = Math.Min(numArray.Length - bufferIndex, length);
          if (freeLength <= 0) return 0;
          Array.Copy(numArray, dataIndex, buffer, bufferIndex, length);
          return freeLength;
        }
    
        public long GetChars(int ordinal, long dataIndex, char[] buffer, int bufferIndex, int length)
        {
          if (dataIndex < 0) throw new Exception($"        0!");
          if (bufferIndex < 0) throw new Exception("             0!");
          if (length < 0) throw new Exception("        0!");
          var numArray = (char[])GetValue(ordinal);
          if (buffer == null) return numArray.Length;
          if (buffer.Length <= bufferIndex) throw new Exception("                    !");
          var freeLength = Math.Min(numArray.Length - bufferIndex, length);
          if (freeLength <= 0) return 0;
          Array.Copy(numArray, dataIndex, buffer, bufferIndex, length);
          return freeLength;
        }
    
        public bool IsDBNull(int i)
        {
          var value = GetValue(i);
          return value == null || value is DBNull;
        }
        public bool NextResult()
        {
          //        
          if (!_enumerable.MoveNext()) return false;
          //  +1
          Interlocked.Increment(ref _depth);
          //     
          _currentDataRow = ModelToDataTable<TModel>.ToRowData.Invoke(_enumerable.Current);
          return true;
        }
    
        public byte GetByte(int i) => (byte)GetValue(i);
        public string GetName(int i) => ModelToDataTable<TModel>.Columns[i].ColumnName;
        public string GetDataTypeName(int i) => ModelToDataTable<TModel>.Columns[i].DataType.Name;
        public Type GetFieldType(int i) => ModelToDataTable<TModel>.Columns[i].DataType;
        public object GetValue(int i) => _currentDataRow[i];
        public bool GetBoolean(int i) => (bool)GetValue(i);
        public char GetChar(int i) => (char)GetValue(i);
        public Guid GetGuid(int i) => (Guid)GetValue(i);
        public short GetInt16(int i) => (short)GetValue(i);
        public int GetInt32(int i) => (int)GetValue(i);
        public long GetInt64(int i) => (long)GetValue(i);
        public float GetFloat(int i) => (float)GetValue(i);
        public double GetDouble(int i) => (double)GetValue(i);
        public string GetString(int i) => (string)GetValue(i);
        public decimal GetDecimal(int i) => (decimal)GetValue(i);
        public DateTime GetDateTime(int i) => (DateTime)GetValue(i);
        public IDataReader GetData(int i) => throw new NotSupportedException();
        public int FieldCount => ModelToDataTable<TModel>.Columns.Count;
        public object this[int i] => GetValue(i);
        public object this[string name] => GetValue(GetOrdinal(name));
        public void Close() => Dispose();
        public DataTable GetSchemaTable() => ModelToDataTable<TModel>.ToDataTable(_source);
        public bool Read() => NextResult();
        public int Depth => _depth;
        public bool IsClosed => _release;
        public int RecordsAffected => 0;
      }
    モデルオブジェクトからデータ行ツール類へ
    
    /// <summary>
      ///      DataTable   
      /// </summary>
      /// <typeparam name="TModel">    </typeparam>
      public static class ModelToDataTable<TModel>
      {
        static ModelToDataTable()
        {
          //                 
          var propertyList = typeof(TModel).GetProperties().Where(w => w.CanRead).ToArray();
          Columns = new ReadOnlyCollection<DataColumn>(propertyList
            .Select(pr => new DataColumn(pr.Name, GetDataType(pr.PropertyType))).ToArray());
          //          
          ToRowData = BuildToRowDataDelegation(typeof(TModel), propertyList);
        }
    
        /// <summary>
        ///           
        /// </summary>
        /// <param name="type">    </param>
        /// <param name="propertyList">     </param>
        /// <returns>       </returns>
        private static Func<TModel, object[]> BuildToRowDataDelegation(Type type, PropertyInfo[] propertyList)
        {
          var source = Expression.Parameter(type);
          var items = propertyList.Select(property => ConvertBindPropertyToData(source, property));
          var array = Expression.NewArrayInit(typeof(object), items);
          var lambda = Expression.Lambda<Func<TModel, object[]>>(array, source);
          return lambda.Compile();
        }
    
        /// <summary>
        ///         
        /// </summary>
        /// <param name="source">   </param>
        /// <param name="property">    </param>
        /// <returns>         </returns>
        private static Expression ConvertBindPropertyToData(ParameterExpression source, PropertyInfo property)
        {
          var propertyType = property.PropertyType;
          var expression = (Expression)Expression.Property(source, property);
          if (propertyType.IsEnum)
            expression = Expression.Convert(expression, propertyType.GetEnumUnderlyingType());
          return Expression.Convert(expression, typeof(object));
        }
    
        /// <summary>
        ///       
        /// </summary>
        /// <param name="type">    </param>
        /// <returns>    </returns>
        private static Type GetDataType(Type type)
        {
          //             
          if (type.IsEnum)
            return type.GetEnumUnderlyingType();
          //    
          if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(Nullable<>))
            return GetDataType(type.GetGenericArguments().First());
          return type;
        }
    
        /// <summary>
        ///    
        /// </summary>
        public static IReadOnlyList<DataColumn> Columns { get; }
    
        /// <summary>
        ///         
        /// </summary>
        public static Func<TModel, object[]> ToRowData { get; }
    
        /// <summary>
        ///      DataTable
        /// </summary>
        /// <param name="source">  </param>
        /// <param name="tableName">   </param>
        /// <returns>     DataTable</returns>
        public static DataTable ToDataTable(IEnumerable<TModel> source, string tableName = "TempTable")
        {
          //     
          var table = new DataTable(tableName);
          //   
          foreach (var dataColumn in Columns)
          {
            table.Columns.Add(new DataColumn(dataColumn.ColumnName, dataColumn.DataType));
          }
    
          //         
          foreach (var item in source)
          {
            table.Rows.Add(ToRowData.Invoke(item));
          }
    
          //     
          return table;
        }
      }
    三、パッケージコードをテストする
    1.テストコード
    コードを作る
    
    CREATE TABLE [dbo].[Person](
    	[Id] [BIGINT] NOT NULL,
    	[Name] [VARCHAR](64) NOT NULL,
    	[Age] [INT] NOT NULL,
    	[CreateTime] [DATETIME] NULL,
    	[Sex] [INT] NOT NULL,
    PRIMARY KEY CLUSTERED 
    (
    	[Id] ASC
    )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
    ) ON [PRIMARY]
    エンティティコード
    定義されたエンティティの属性名は、GetValue列の名前タイプに対応する必要があります。
    
    public class Person
    {
      public long Id { get; set; }
      public string Name { get; set; }
      public int Age { get; set; }
      public DateTime? CreateTime { get; set; }
      public Gender Sex { get; set; }
    }
    
    public enum Gender
    {
      Man = 0,
      Woman = 1
    }
    テスト方法
    
    //  10    
    var persons = new Person[100000];
    var random = new Random();
    for (int i = 0; i < persons.Length; i++)
    {
      persons[i] = new Person
      {
        Id = i + 1,
        Name = "  " + i,
        Age = random.Next(1, 128),
        Sex = (Gender)random.Next(2),
        CreateTime = random.Next(2) == 0 ? null : (DateTime?) DateTime.Now.AddSeconds(i)
      };
    }
    
    //       
    using (var conn = new SqlConnection("Server=.;Database=DemoDataBase;User ID=sa;Password=8888;"))
    {
      conn.Open();
      var sw = Stopwatch.StartNew();
      //      
      var qty = conn.BulkCopy(persons);
      sw.Stop();
      Console.WriteLine(sw.Elapsed.TotalMilliseconds + "ms");
    }
    一括挿入結果の実行
    226.4767 ms
    任意のボタンを押して続けてください。

    四、コードのダウンロード
    GitHubコードアドレス:https://github.com/liu-zhen-liang/PackagingComponentsSet/tree/main/SqlBulkCopyComponents