いくつかのデータベースの大規模なデータの大量挿入を実現します。
19507 ワード
Sql Serverサポートデータの大量挿入だけは知っていましたが、Oracle、SQLite、MySQLもサポートしていますが、OracleはOrace.DataAccesドライバを使う必要があります。今日はいくつかのデータベースの一括挿入解決方法を貼り付けます。
まず、IProviderに大量挿入を実現するためのプラグインサービスインターフェースIBacher Providerがあります。このインターフェースは前の記事ですでに述べました。
Sql Serverの大量挿入は簡単で、Sql BulkCopyを使えばいいです。以下はこの種類の実現です。
二、Oracleデータ一括挿入
System.Data.OracleClientは大量挿入に対応していないので、Oracle.DataAccessコンポーネントを提供者として使用するしかないです。
三、SQLiteデータ一括挿入
SQLiteのロット挿入は、トランザクションを開くだけでいいです。この具体的な原理は分かりません。
五、テスト
次にテスト用の例を書いて、大量挿入の効果を見てみます。
データベース
時間を費やす
Ms Sql
00:00:02.376300
Oracle
00:00:01.51559
SQLite
00:00:01.675634
MySql
00:00:05.45166891
以上が本文の全部です。皆さんの勉強に役に立つように、私たちを応援してください。
まず、IProviderに大量挿入を実現するためのプラグインサービスインターフェースIBacher Providerがあります。このインターフェースは前の記事ですでに述べました。
/// <summary>
/// 。
/// </summary>
public interface IBatcherProvider : IProviderService
{
/// <summary>
/// <see cref="DataTable"/> 。
/// </summary>
/// <param name="dataTable"> <see cref="DataTable"/>。</param>
/// <param name="batchSize"> 。</param>
void Insert(DataTable dataTable, int batchSize = 10000);
}
一、Sql Serverデータ一括挿入Sql Serverの大量挿入は簡単で、Sql BulkCopyを使えばいいです。以下はこの種類の実現です。
/// <summary>
/// System.Data.SqlClient 。
/// </summary>
public sealed class MsSqlBatcher : IBatcherProvider
{
/// <summary>
/// 。
/// </summary>
public ServiceContext ServiceContext { get; set; }
/// <summary>
/// <see cref="DataTable"/> 。
/// </summary>
/// <param name="dataTable"> <see cref="DataTable"/>。</param>
/// <param name="batchSize"> 。</param>
public void Insert(DataTable dataTable, int batchSize = 10000)
{
Checker.ArgumentNull(dataTable, "dataTable");
if (dataTable.Rows.Count == 0)
{
return;
}
using (var connection = (SqlConnection)ServiceContext.Database.CreateConnection())
{
try
{
connection.TryOpen();
//
var tableName = DbUtility.FormatByQuote(ServiceContext.Database.Provider.GetService<ISyntaxProvider>(), dataTable.TableName);
using (var bulk = new SqlBulkCopy(connection, SqlBulkCopyOptions.KeepIdentity, null)
{
DestinationTableName = tableName,
BatchSize = batchSize
})
{
// , bulk
dataTable.EachColumn(c => bulk.ColumnMappings.Add(c.ColumnName, c.ColumnName), c => !c.AutoIncrement);
bulk.WriteToServer(dataTable);
bulk.Close();
}
}
catch (Exception exp)
{
throw new BatcherException(exp);
}
finally
{
connection.TryClose();
}
}
}
}
以上は事務を使用していません。使用事務は性能に影響があります。事務を使うなら、Sql BulkCopyOptions.UseInternal Transationを設定できます。二、Oracleデータ一括挿入
System.Data.OracleClientは大量挿入に対応していないので、Oracle.DataAccessコンポーネントを提供者として使用するしかないです。
/// <summary>
/// Oracle.Data.Access 。
/// </summary>
public sealed class OracleAccessBatcher : IBatcherProvider
{
/// <summary>
/// 。
/// </summary>
public ServiceContext ServiceContext { get; set; }
/// <summary>
/// <see cref="DataTable"/> 。
/// </summary>
/// <param name="dataTable"> <see cref="DataTable"/>。</param>
/// <param name="batchSize"> 。</param>
public void Insert(DataTable dataTable, int batchSize = 10000)
{
Checker.ArgumentNull(dataTable, "dataTable");
if (dataTable.Rows.Count == 0)
{
return;
}
using (var connection = ServiceContext.Database.CreateConnection())
{
try
{
connection.TryOpen();
using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand())
{
if (command == null)
{
throw new BatcherException(new ArgumentException("command"));
}
command.Connection = connection;
command.CommandText = GenerateInserSql(ServiceContext.Database, command, dataTable);
command.ExecuteNonQuery();
}
}
catch (Exception exp)
{
throw new BatcherException(exp);
}
finally
{
connection.TryClose();
}
}
}
/// <summary>
/// sql 。
/// </summary>
/// <param name="database"></param>
/// <param name="command"></param>
/// <param name="table"></param>
/// <returns></returns>
private string GenerateInserSql(IDatabase database, DbCommand command, DataTable table)
{
var names = new StringBuilder();
var values = new StringBuilder();
// DataTable
var data = table.ToArray();
// ArrayBindCount
command.GetType().GetProperty("ArrayBindCount").SetValue(command, table.Rows.Count, null);
var syntax = database.Provider.GetService<ISyntaxProvider>();
for (var i = 0; i < table.Columns.Count; i++)
{
var column = table.Columns[i];
var parameter = database.Provider.DbProviderFactory.CreateParameter();
if (parameter == null)
{
continue;
}
parameter.ParameterName = column.ColumnName;
parameter.Direction = ParameterDirection.Input;
parameter.DbType = column.DataType.GetDbType();
parameter.Value = data[i];
if (names.Length > 0)
{
names.Append(",");
values.Append(",");
}
names.AppendFormat("{0}", DbUtility.FormatByQuote(syntax, column.ColumnName));
values.AppendFormat("{0}{1}", syntax.ParameterPrefix, column.ColumnName);
command.Parameters.Add(parameter);
}
return string.Format("INSERT INTO {0}({1}) VALUES ({2})", DbUtility.FormatByQuote(syntax, table.TableName), names, values);
}
}
以上の最も重要なステップは、DataTableを配列の配列表現、すなわちobject[]に変換し、前の配列の上付きが列の個数であり、後の配列が行の個数であるため、循環Columnsは後の配列をParameeterの値とし、つまりパラメータの値は配列である。insert文は普通の挿入語句と違います。三、SQLiteデータ一括挿入
SQLiteのロット挿入は、トランザクションを開くだけでいいです。この具体的な原理は分かりません。
public sealed class SQLiteBatcher : IBatcherProvider
{
/// <summary>
/// 。
/// </summary>
public ServiceContext ServiceContext { get; set; }
/// <summary>
/// <see cref="DataTable"/> 。
/// </summary>
/// <param name="dataTable"> <see cref="DataTable"/>。</param>
/// <param name="batchSize"> 。</param>
public void Insert(DataTable dataTable, int batchSize = 10000)
{
Checker.ArgumentNull(dataTable, "dataTable");
if (dataTable.Rows.Count == 0)
{
return;
}
using (var connection = ServiceContext.Database.CreateConnection())
{
DbTransaction transcation = null;
try
{
connection.TryOpen();
transcation = connection.BeginTransaction();
using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand())
{
if (command == null)
{
throw new BatcherException(new ArgumentException("command"));
}
command.Connection = connection;
command.CommandText = GenerateInserSql(ServiceContext.Database, dataTable);
if (command.CommandText == string.Empty)
{
return;
}
var flag = new AssertFlag();
dataTable.EachRow(row =>
{
var first = flag.AssertTrue();
ProcessCommandParameters(dataTable, command, row, first);
command.ExecuteNonQuery();
});
}
transcation.Commit();
}
catch (Exception exp)
{
if (transcation != null)
{
transcation.Rollback();
}
throw new BatcherException(exp);
}
finally
{
connection.TryClose();
}
}
}
private void ProcessCommandParameters(DataTable dataTable, DbCommand command, DataRow row, bool first)
{
for (var c = 0; c < dataTable.Columns.Count; c++)
{
DbParameter parameter;
// ,
if (first)
{
parameter = ServiceContext.Database.Provider.DbProviderFactory.CreateParameter();
parameter.ParameterName = dataTable.Columns[c].ColumnName;
command.Parameters.Add(parameter);
}
else
{
parameter = command.Parameters[c];
}
parameter.Value = row[c];
}
}
/// <summary>
/// sql 。
/// </summary>
/// <param name="database"></param>
/// <param name="table"></param>
/// <returns></returns>
private string GenerateInserSql(IDatabase database, DataTable table)
{
var syntax = database.Provider.GetService<ISyntaxProvider>();
var names = new StringBuilder();
var values = new StringBuilder();
var flag = new AssertFlag();
table.EachColumn(column =>
{
if (!flag.AssertTrue())
{
names.Append(",");
values.Append(",");
}
names.Append(DbUtility.FormatByQuote(syntax, column.ColumnName));
values.AppendFormat("{0}{1}", syntax.ParameterPrefix, column.ColumnName);
});
return string.Format("INSERT INTO {0}({1}) VALUES ({2})", DbUtility.FormatByQuote(syntax, table.TableName), names, values);
}
}
四、MySqlデータ一括挿入
/// <summary>
/// MySql.Data 。
/// </summary>
public sealed class MySqlBatcher : IBatcherProvider
{
/// <summary>
/// 。
/// </summary>
public ServiceContext ServiceContext { get; set; }
/// <summary>
/// <see cref="DataTable"/> 。
/// </summary>
/// <param name="dataTable"> <see cref="DataTable"/>。</param>
/// <param name="batchSize"> 。</param>
public void Insert(DataTable dataTable, int batchSize = 10000)
{
Checker.ArgumentNull(dataTable, "dataTable");
if (dataTable.Rows.Count == 0)
{
return;
}
using (var connection = ServiceContext.Database.CreateConnection())
{
try
{
connection.TryOpen();
using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand())
{
if (command == null)
{
throw new BatcherException(new ArgumentException("command"));
}
command.Connection = connection;
command.CommandText = GenerateInserSql(ServiceContext.Database, command, dataTable);
if (command.CommandText == string.Empty)
{
return;
}
command.ExecuteNonQuery();
}
}
catch (Exception exp)
{
throw new BatcherException(exp);
}
finally
{
connection.TryClose();
}
}
}
/// <summary>
/// sql 。
/// </summary>
/// <param name="database"></param>
/// <param name="command"></param>
/// <param name="table"></param>
/// <returns></returns>
private string GenerateInserSql(IDatabase database, DbCommand command, DataTable table)
{
var names = new StringBuilder();
var values = new StringBuilder();
var types = new List<DbType>();
var count = table.Columns.Count;
var syntax = database.Provider.GetService<ISyntaxProvider>();
table.EachColumn(c =>
{
if (names.Length > 0)
{
names.Append(",");
}
names.AppendFormat("{0}", DbUtility.FormatByQuote(syntax, c.ColumnName));
types.Add(c.DataType.GetDbType());
});
var i = 0;
foreach (DataRow row in table.Rows)
{
if (i > 0)
{
values.Append(",");
}
values.Append("(");
for (var j = 0; j < count; j++)
{
if (j > 0)
{
values.Append(", ");
}
var isStrType = IsStringType(types[j]);
var parameter = CreateParameter(database.Provider, isStrType, types[j], row[j], syntax.ParameterPrefix, i, j);
if (parameter != null)
{
values.Append(parameter.ParameterName);
command.Parameters.Add(parameter);
}
else if (isStrType)
{
values.AppendFormat("'{0}'", row[j]);
}
else
{
values.Append(row[j]);
}
}
values.Append(")");
i++;
}
return string.Format("INSERT INTO {0}({1}) VALUES {2}", DbUtility.FormatByQuote(syntax, table.TableName), names, values);
}
/// <summary>
/// 。
/// </summary>
/// <param name="dbType"></param>
/// <returns></returns>
private bool IsStringType(DbType dbType)
{
return dbType == DbType.AnsiString || dbType == DbType.AnsiStringFixedLength || dbType == DbType.String || dbType == DbType.StringFixedLength;
}
/// <summary>
/// 。
/// </summary>
/// <param name="provider"></param>
/// <param name="isStrType"></param>
/// <param name="dbType"></param>
/// <param name="value"></param>
/// <param name="parPrefix"></param>
/// <param name="row"></param>
/// <param name="col"></param>
/// <returns></returns>
private DbParameter CreateParameter(IProvider provider, bool isStrType, DbType dbType, object value, char parPrefix, int row, int col)
{
// , , , ( ' )
if ((isStrType && value.ToString().IndexOf('\'') != -1) || dbType == DbType.DateTime)
{
var name = string.Format("{0}p_{1}_{2}", parPrefix, row, col);
var parameter = provider.DbProviderFactory.CreateParameter();
parameter.ParameterName = name;
parameter.Direction = ParameterDirection.Input;
parameter.DbType = dbType;
parameter.Value = value;
return parameter;
}
return null;
}
}
MySqlの一括挿入は、値をすべて文のvaluesに書いています。例えば、insert batch(id,name)values(1,'1',2',2',3',3',....10')。五、テスト
次にテスト用の例を書いて、大量挿入の効果を見てみます。
public void TestBatchInsert()
{
Console.WriteLine(TimeWatcher.Watch(() =>
InvokeTest(database =>
{
var table = new DataTable("Batcher");
table.Columns.Add("Id", typeof(int));
table.Columns.Add("Name1", typeof(string));
table.Columns.Add("Name2", typeof(string));
table.Columns.Add("Name3", typeof(string));
table.Columns.Add("Name4", typeof(string));
// 100000
for (var i = 0; i < 100000; i++)
{
table.Rows.Add(i, i.ToString(), i.ToString(), i.ToString(), i.ToString());
}
// IBatcherProvider
var batcher = database.Provider.GetService<IBatcherProvider>();
if (batcher == null)
{
Console.WriteLine(" 。");
}
else
{
batcher.Insert(table);
}
// batcher
var sql = new SqlCommand("SELECT COUNT(1) FROM Batcher");
Console.WriteLine(" {0} ", database.ExecuteScalar(sql));
})));
}
以下の表には4つのデータベースが10万本のデータを生成するのに時間がかかります。データベース
時間を費やす
Ms Sql
00:00:02.376300
Oracle
00:00:01.51559
SQLite
00:00:01.675634
MySql
00:00:05.45166891
以上が本文の全部です。皆さんの勉強に役に立つように、私たちを応援してください。