Microsoft.EntityFrameworkCore.EntityFrameworkQueryableExtensions.ForEachAsync< T>()的意外行为 [英] Unexpected behaviour with Microsoft.EntityFrameworkCore.EntityFrameworkQueryableExtensions.ForEachAsync<T>()

查看:72
本文介绍了Microsoft.EntityFrameworkCore.EntityFrameworkQueryableExtensions.ForEachAsync< T>()的意外行为的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

以下是复制步骤.下面的程序使用.Net Core控制台应用程序和EF Core将10,000行从一个SQL表复制到另一个SQL表.该程序以100个批次插入记录,(这很重要!)它会为每个插入创建一个新的DbContext实例.

Here are the steps to reproduce. The below program copies 10,000 rows from one SQL table to another using .Net Core console app and EF Core. The program inserts records in 100 batches, and (this is important!) it creates a new instance of DbContext for each insert.

1)创建SQL Server数据库以及"Froms"和"Tos"表:

1) Create SQL Server database, and the "Froms" and "Tos" tables:

create table Froms (
    Id int identity(1, 1) not null,
    Guid [uniqueidentifier] not null,

    constraint [PK_Froms] primary key clustered (Id asc)
)
go

create table Tos (
    Id int not null,
    Guid [uniqueidentifier] not null,

    constraint [PK_Tos] primary key clustered (Id asc)
)
go

2)填充发件人"表:

2) Populate the "Froms" table:

set nocount on
declare @i int = 0

while @i < 10000
begin
    insert Froms (Guid)
    values (newid())

    set @i += 1
end
go

3)创建名称为 TestForEachAsync 的.Net Core控制台应用程序项目.将C#的版本更改为7.1或更高版本( async Main 必需).添加 Microsoft.EntityFrameworkCore.SqlServer nuget程序包.

3) Create .Net Core console app project with the name TestForEachAsync. Change version of C# to 7.1 or later (required for async Main). Add Microsoft.EntityFrameworkCore.SqlServer nuget package.

4)创建课程:

数据库实体

using System;

namespace TestForEachAsync
{
    public class From
    {
        public int Id { get; set; }
        public Guid Guid { get; set; }
    }
}


using System;

namespace TestForEachAsync
{
    public class To
    {
        public int Id { get; set; }
        public Guid Guid { get; set; }
    }
}

DbContext

using Microsoft.EntityFrameworkCore;

namespace TestForEachAsync
{
    public class Context : DbContext
    {
        public DbSet<From> Froms { get; set; }
        public DbSet<To> Tos { get; set; }

        protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
        {
            optionsBuilder.UseSqlServer("YOUR_CONNECTION_STRING");
        }
    }
}

主要

using System;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;

namespace TestForEachAsync
{
    internal class Program
    {
        private static async Task Main(string[] args)
        {
            //Get the "froms"
            var selectContext = new Context();
            var froms = selectContext.Froms.Select(f => new { f.Id, f.Guid });

            int count = 0;
            Task<int> saveChangesTask = null;
            Context insertContext = new Context();
            Context prevInsertContext = null;

            //Iterate through "froms"
            await froms.ForEachAsync(
                async f =>
                {
                    //Add instace of "to" to the context
                    var to = new To { Id = f.Id, Guid = f.Guid };
                    await insertContext.Tos.AddAsync(to);
                    count++;

                    //If another 100 of "to"s has been added to the context...
                    if (count % 100 == 0)
                    {
                        //Wait for the previous 100 "to"s to finish saving to the database
                        if (saveChangesTask != null)
                        {
                            await saveChangesTask;
                        }

                        //Start saving the next 100 "to"s
                        saveChangesTask = insertContext.SaveChangesAsync();

                        //Dispose of the context that was used to save previous 100 "to"s
                        prevInsertContext?.Dispose();

                        //Reassign the context used to save the current 100 "to"s to a "prev" variable,
                        //and set context variable to the new Context instance.
                        prevInsertContext = insertContext;
                        insertContext = new Context();
                    }
                }
            );

            //Wait for second last 100 "to"s to finish saving to the database
            if (saveChangesTask != null)
            {
                await saveChangesTask;
            }

            //Save the last 100 "to"s to the database
            await insertContext.SaveChangesAsync();
            insertContext.Dispose();

            Console.WriteLine("Done");
            Console.ReadKey();
        }
    }
}

5)运行应用程序-您将获得异常该连接不支持MultipleActiveResultSets .似乎正在 insertContext 上启动多个操作,尽管我不知道为什么.

5) Run the app - you get an exception The connection does not support MultipleActiveResultSets. Looks like multiple operations are being started on insertContext, though I do not see why.

6)我找到了解决此问题的两种方法:

6) I found two ways to fix the issue:

  • 将" await froms.ForEachAsync(...)"循环替换为普通"循环 foreach(var in ins){...} ,或者
  • 在异步循环中,将 await saveChangesTask; 替换为 saveChangesTask.Wait();
  • Replace the await froms.ForEachAsync(...) loop with "normal" loop foreach (var f in froms) {...}, or
  • Inside the async loop, replace await saveChangesTask; with saveChangesTask.Wait();

但是有人可以解释一下为什么原始代码无法按我期望的那样工作吗?

But can someone explain please why the original code does not work as I expect?

注意:如果您多次运行该应用程序,请不要忘记在每次运行前截断"Tos"表.

Note: if you run the app multiple times, do not forget to truncate the "Tos" table before each run.

推荐答案

您陷入了将异步lambda传递给期望委托返回void( Action< T> 特定情况下),如 Stephen Toub 所述"https://blogs.msdn.microsoft.com/pfxteam/2012/02/08/potential-pitfalls-to-avoid-when-passing-around-async-lambdas/" rel ="noreferrer">要避免的潜在陷阱传递异步lambda时.这实际上等同于使用 async void 的陷阱,因为您的异步代码根本不是 await -ed,从而破坏了其内部逻辑.

You are falling into typical trap of passing async lambda to a method which expects delegate that returns void (Action<T> in this particular case), as described by Stephen Toub in Potential pitfalls to avoid when passing around async lambdas. It's really an equivalent of using async void with it's pitfalls, because your async code is simply not await-ed, thus breaking it's internal logic.

该解决方案通常是一个特殊的重载,它接受 Func< T,Task> ,而不是 Action< T> .可能应该是EF Core提供的(您可以考虑发布一个请求),但是现在您可以自己用以下方式实现它:

The solution is as usual a special overload which accepts Func<T, Task> instead of Action<T>. Probably it should have been provided by EF Core (you may consider posting a request for that), but for now you can implement it yourself with something like this:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore.Extensions.Internal;

namespace Microsoft.EntityFrameworkCore
{
    public static class AsyncExtensions
    {
        public static Task ForEachAsync<T>(this IQueryable<T> source, Func<T, Task> action, CancellationToken cancellationToken = default) =>
            source.AsAsyncEnumerable().ForEachAsync(action, cancellationToken);

        public static async Task ForEachAsync<T>(this IAsyncEnumerable<T> source, Func<T, Task> action, CancellationToken cancellationToken = default)
        {
            using (var asyncEnumerator = source.GetEnumerator())
                while (await asyncEnumerator.MoveNext(cancellationToken))
                    await action(asyncEnumerator.Current);
        }
    }
}

基本上是 EF核心实现,并添加了 action await .

which is basically the EF Core implementation with added await of the action.

执行完此操作后,您的代码将解析为该方法,并且一切都会按预期进行.

Once you do that, your code will resolve to this method and everything should work as expected.

这篇关于Microsoft.EntityFrameworkCore.EntityFrameworkQueryableExtensions.ForEachAsync&lt; T&gt;()的意外行为的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆