批量插入时间比使用 Dapper 预期的要长

在阅读了 这篇文章之后,我决定仔细研究一下我使用 Dapper 的方式。

我在一个空数据库里运行了这个代码

var members = new List<Member>();
for (int i = 0; i < 50000; i++)
{
members.Add(new Member()
{
Username = i.toString(),
IsActive = true
});
}


using (var scope = new TransactionScope())
{
connection.Execute(@"
insert Member(Username, IsActive)
values(@Username, @IsActive)", members);


scope.Complete();
}

只花了20秒。每秒2500刀。不错,但也不是很好,考虑到博客达到了45k 插入/秒。在 Dapper 有更有效的方法吗?

另外,顺便说一下,通过 Visual Studio 调试器运行这段代码需要使用 超过3分钟!,我认为调试器会稍微降低它的速度,但是我真的很惊讶看到这么多。

更新

所以这个

using (var scope = new TransactionScope())
{
connection.Execute(@"
insert Member(Username, IsActive)
values(@Username, @IsActive)", members);


scope.Complete();
}

还有这个

    connection.Execute(@"
insert Member(Username, IsActive)
values(@Username, @IsActive)", members);

都花了20秒。

但这只花了4秒钟!

SqlTransaction trans = connection.BeginTransaction();


connection.Execute(@"
insert Member(Username, IsActive)
values(@Username, @IsActive)", members, transaction: trans);


trans.Commit();
49199 次浏览

使用这种方法,我所能达到的最好成绩是在4秒内打出5万条记录

SqlTransaction trans = connection.BeginTransaction();


connection.Execute(@"
insert Member(Username, IsActive)
values(@Username, @IsActive)", members, transaction: trans);


trans.Commit();

我最近偶然发现了这一点,并注意到 TransactionScope 是在连接打开之后创建的(我假设这一点是因为 Dappers Execute 不打开连接,不像 Query)。根据这里的答案 Q4: https://stackoverflow.com/a/2886326/455904,它不会导致将由 TransactionScope 处理的连接。我的同事做了一些快速测试,在 TransactionScope 之外打开连接大大降低了性能。

因此,改用以下方式应该会奏效:

// Assuming the connection isn't already open
using (var scope = new TransactionScope())
{
connection.Open();
connection.Execute(@"
insert Member(Username, IsActive)
values(@Username, @IsActive)", members);


scope.Complete();
}

对我来说最快的变种:

            var dynamicParameters = new DynamicParameters();
var selects = new List<string>();
for (var i = 0; i < members.Length; i++)
{
var member = members[i];
var pUsername = $"u{i}";
var pIsActive = $"a{i}";
dynamicParameters.Add(pUsername, member.Username);
dynamicParameters.Add(pIsActive, member.IsActive);
selects.Add("select @{pUsername},@{pIsActive}");
}
con.Execute($"insert into Member(Username, IsActive){string.Join(" union all ", selects)}", dynamicParameters);

它生成如下 sql:

INSERT TABLENAME (Column1,Column2,...)
SELECT @u0,@a0...
UNION ALL
SELECT @u1,@a1...
UNION ALL
SELECT @u2,@a2...

这个查询工作得更快,因为 sql 添加了一组行,而不是一次添加一行。瓶颈不是写入数据,而是写入在日志中所做的事情。

此外,还要研究最小日志记录事务的规则。

我发现这些例子都不完整。

下面是一些代码,它们可以在使用后正确地关闭连接,也可以根据这个线程中最新的和更好的答案正确地使用 transactionscope 来提高 Execute 性能。

using (var scope = new TransactionScope())
{
Connection.Open();
Connection.Execute(sqlQuery, parameters);


scope.Complete();
}

只使用一个 insert 语句的 Execute方法永远不会执行批量插入,也不会有效。即使是带有 Transaction的公认答案也不会做 Bulk Insert

如果要执行 Bulk Insert,请使用 SqlBulkCopy https://msdn.microsoft.com/en-us/library/system.data.sqlclient.sqlbulkcopy

你找不到比这更快的了。

衣冠楚楚

免责声明 : 我是该项目的所有者 < a href = “ http://Dapper-Plus.net/”rel = “ nofollow norefrer”> Dapper Plus

这个项目不是免费的,但提供所有批量操作:

  • BulkInsert
  • 批量更新
  • 大容量删除
  • BulkMerge

(在引擎盖下使用 SqlBulkCopy)

还有一些选项,比如输出身份值:

// CONFIGURE & MAP entity
DapperPlusManager.Entity<Order>()
.Table("Orders")
.Identity(x => x.ID);


// CHAIN & SAVE entity
connection.BulkInsert(orders)
.AlsoInsert(order => order.Items);
.Include(x => x.ThenMerge(order => order.Invoice)
.AlsoMerge(invoice => invoice.Items))
.AlsoMerge(x => x.ShippingAddress);

我们的图书馆支持多个供应商:

  • SQL Server
  • SQL 精简版
  • 神使
  • MySql
  • PostgreSQL
  • SQLite
  • 火鸟

我创建了一个扩展方法,它允许您非常快速地进行批量插入。

public static class DapperExtensions
{
public static async Task BulkInsert<T>(
this IDbConnection connection,
string tableName,
IReadOnlyCollection<T> items,
Dictionary<string, Func<T, object>> dataFunc)
{
const int MaxBatchSize = 1000;
const int MaxParameterSize = 2000;


var batchSize = Math.Min((int)Math.Ceiling((double)MaxParameterSize / dataFunc.Keys.Count), MaxBatchSize);
var numberOfBatches = (int)Math.Ceiling((double)items.Count / batchSize);
var columnNames = dataFunc.Keys;
var insertSql = $"INSERT INTO {tableName} ({string.Join(", ", columnNames.Select(e => $"[{e}]"))}) VALUES ";
var sqlToExecute = new List<Tuple<string, DynamicParameters>>();


for (var i = 0; i < numberOfBatches; i++)
{
var dataToInsert = items.Skip(i * batchSize)
.Take(batchSize);
var valueSql = GetQueries(dataToInsert, dataFunc);


sqlToExecute.Add(Tuple.Create($"{insertSql}{string.Join(", ", valueSql.Item1)}", valueSql.Item2));
}


foreach (var sql in sqlToExecute)
{
await connection.ExecuteAsync(sql.Item1, sql.Item2, commandTimeout: int.MaxValue);
}
}


private static Tuple<IEnumerable<string>, DynamicParameters> GetQueries<T>(
IEnumerable<T> dataToInsert,
Dictionary<string, Func<T, object>> dataFunc)
{
var parameters = new DynamicParameters();


return Tuple.Create(
dataToInsert.Select(e => $"({string.Join(", ", GenerateQueryAndParameters(e, parameters, dataFunc))})"),
parameters);
}


private static IEnumerable<string> GenerateQueryAndParameters<T>(
T entity,
DynamicParameters parameters,
Dictionary<string, Func<T, object>> dataFunc)
{
var paramTemplateFunc = new Func<Guid, string>(guid => $"@p{guid.ToString().Replace("-", "")}");
var paramList = new List<string>();


foreach (var key in dataFunc)
{
var paramName = paramTemplateFunc(Guid.NewGuid());
parameters.Add(paramName, key.Value(entity));
paramList.Add(paramName);
}


return paramList;
}
}

然后,要使用这种扩展方法,需要编写如下代码:

await dbConnection.BulkInsert(
"MySchemaName.MyTableName",
myCollectionOfItems,
new Dictionary<string, Func<MyObjectToInsert, object>>
{
{ "ColumnOne", u => u.ColumnOne },
{ "ColumnTwo", u => u.ColumnTwo },
...
});

这是非常原始的,还有进一步改进的空间,例如传入事务或 commandTimeout 值,但是对我来说它很有用。