多线程实体框架:未关闭连接.连接的当前状态为连接中 [英] Multithreading Entity Framework: The connection was not closed. The connection's current state is connecting

查看:114
本文介绍了多线程实体框架:未关闭连接.连接的当前状态为连接中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

因此,我有一个执行工作流程的Windows服务进程.后端在Entity Framework的顶部使用Repository和UnitofWork Pattern以及Unity和从edmx生成的实体类.我将不必赘述很多细节,但是工作流基本上要经过5个步骤.特定过程可能在任何时间点的任何阶段(按顺序排列).步骤1只是为步骤2生成数据,该步骤将通过长时间运行的过程来验证数据到另一台服务器.然后,在该步骤中生成带有该数据的pdf.对于每个阶段,我们都会生成一个计时器,但是可以配置为允许为每个阶段生成一个以上的计时器.这就提出了问题.当我将处理器添加到特定阶段时,它会随机出现以下错误:

So I have a windows service process that performs a workflow process. The back end uses Repository and UnitofWork Pattern and Unity on top of Entity Framework with the entities class generated from the edmx. I won't go into a whole lot of detail as its not necessary but basically there are 5 steps that the workflow goes through. A particular process might be at any stage at any point in time (in order of course). Step one just generates data for step two, which validates the data via a long running process to another server. Then step there generates a pdf with that data. For each stage we spawn a timer, however it is configurable to allow more than one timer to be spawned for each stage. Therein lays the problem. When I add a processor to a particular stage, it the following error randomly:

连接未关闭.连接的当前状态为正在连接.

The connection was not closed. The connection's current state is connecting.

仔细阅读这似乎很明显,这是因为上下文正在尝试从两个线程访问同一实体.但这就是让我陷入循环的地方.我可以找到的所有信息都表明,每个线程应该使用一个实例上下文.据我所知,这是我正在做的事情(请参见下面的代码).我没有使用单例模式或静态变量或任何东西,因此我不确定是否会发生这种情况或如何避免这种情况.我已在下面发布了我的代码的相关部分,以供您查看.

Reading up on this it seems obvious that this is happening because the context is trying to access the same entity from two threads. But this is where it is kind of throwing me for a loop. All of the information I can find on this states that we should be using a instance context per thread. Which as far as I can tell I am doing (see the code below). I am not using singleton pattern or statics or anything so I am not really sure why this is happening or how to avoid it. I have posted the relevant bits of my code below for your review.

基础存储库:

 public class BaseRepository
{
    /// <summary>
    /// Initializes a repository and registers with a <see cref="IUnitOfWork"/>
    /// </summary>
    /// <param name="unitOfWork"></param>
    public BaseRepository(IUnitOfWork unitOfWork)
    {
        if (unitOfWork == null) throw new ArgumentException("unitofWork");
        UnitOfWork = unitOfWork;
    }


    /// <summary>
    /// Returns a <see cref="DbSet"/> of entities.
    /// </summary>
    /// <typeparam name="TEntity">Entity type the dbset needs to return.</typeparam>
    /// <returns></returns>
    protected virtual DbSet<TEntity> GetDbSet<TEntity>() where TEntity : class
    {

        return Context.Set<TEntity>();
    }

    /// <summary>
    /// Sets the state of an entity.
    /// </summary>
    /// <param name="entity">object to set state.</param>
    /// <param name="entityState"><see cref="EntityState"/></param>
    protected virtual void SetEntityState(object entity, EntityState entityState)
    {
        Context.Entry(entity).State = entityState;
    }

    /// <summary>
    /// Unit of work controlling this repository.       
    /// </summary>
    protected IUnitOfWork UnitOfWork { get; set; }

    /// <summary>
    /// 
    /// </summary>
    /// <param name="entity"></param>
    protected virtual void Attach(object entity)
    {
        if (Context.Entry(entity).State == EntityState.Detached)
            Context.Entry(entity).State = EntityState.Modified;
    }

    protected virtual void Detach(object entity)
    {
        Context.Entry(entity).State = EntityState.Detached;
    }

    /// <summary>
    /// Provides access to the ef context we are working with
    /// </summary>
    internal StatementAutoEntities Context
    {
        get
        {                
            return (StatementAutoEntities)UnitOfWork;
        }
    }
}

StatementAutoEntities是自动生成的EF类.

StatementAutoEntities is the autogenerated EF class.

存储库实现:

public class ProcessingQueueRepository : BaseRepository, IProcessingQueueRepository
{

    /// <summary>
    /// Creates a new repository and associated with a <see cref="IUnitOfWork"/>
    /// </summary>
    /// <param name="unitOfWork"></param>
    public ProcessingQueueRepository(IUnitOfWork unitOfWork) : base(unitOfWork)
    {
    }

    /// <summary>
    /// Create a new <see cref="ProcessingQueue"/> entry in database
    /// </summary>
    /// <param name="Queue">
    ///     <see cref="ProcessingQueue"/>
    /// </param>
    public void Create(ProcessingQueue Queue)
    {
        GetDbSet<ProcessingQueue>().Add(Queue);
        UnitOfWork.SaveChanges();
    }

    /// <summary>
    /// Updates a <see cref="ProcessingQueue"/> entry in database
    /// </summary>
    /// <param name="queue">
    ///     <see cref="ProcessingQueue"/>
    /// </param>
    public void Update(ProcessingQueue queue)
    {
        //Attach(queue);
        UnitOfWork.SaveChanges();
    }

    /// <summary>
    /// Delete a <see cref="ProcessingQueue"/> entry in database
    /// </summary>
    /// <param name="Queue">
    ///     <see cref="ProcessingQueue"/>
    /// </param>
    public void Delete(ProcessingQueue Queue)
    {
        GetDbSet<ProcessingQueue>().Remove(Queue);  
        UnitOfWork.SaveChanges();
    }

    /// <summary>
    /// Gets a <see cref="ProcessingQueue"/> by its unique Id
    /// </summary>
    /// <param name="id"></param>
    /// <returns></returns>
    public ProcessingQueue GetById(int id)
    {
        return (from e in Context.ProcessingQueue_SelectById(id) select e).FirstOrDefault();
    }

    /// <summary>
    /// Gets a list of <see cref="ProcessingQueue"/> entries by status
    /// </summary>
    /// <param name="status"></param>
    /// <returns></returns>
    public IList<ProcessingQueue> GetByStatus(int status)
    {
        return (from e in Context.ProcessingQueue_SelectByStatus(status) select e).ToList();
    }

    /// <summary>
    /// Gets a list of all <see cref="ProcessingQueue"/> entries
    /// </summary>
    /// <returns></returns>
    public IList<ProcessingQueue> GetAll()
    {
        return (from e in Context.ProcessingQueue_Select() select e).ToList();
    }

    /// <summary>
    /// Gets the next pending item id in the queue for a specific work        
    /// </summary>
    /// <param name="serverId">Unique id of the server that will process the item in the queue</param>
    /// <param name="workTypeId">type of <see cref="WorkType"/> we are looking for</param>
    /// <param name="operationId">if defined only operations of the type indicated are considered.</param>
    /// <returns>Next pending item in the queue for the work type or null if no pending work is found</returns>
    public int GetNextPendingItemId(int serverId, int workTypeId, int? operationId)
    {
        var id = Context.ProcessingQueue_GetNextPending(serverId, workTypeId,  operationId).SingleOrDefault();
        return id.HasValue ? id.Value : -1;
    }

    /// <summary>
    /// Returns a list of <see cref="ProcessingQueueStatus_dto"/>s objects with all
    /// active entries in the queue
    /// </summary>
    /// <returns></returns>
    public IList<ProcessingQueueStatus_dto> GetActiveStatusEntries()
    {
        return (from e in Context.ProcessingQueueStatus_Select() select e).ToList();
    }
    /// <summary>
    /// Bumps an entry to the front of the queue 
    /// </summary>
    /// <param name="processingQueueId"></param>
    public void Bump(int processingQueueId)
    {
        Context.ProcessingQueue_Bump(processingQueueId);
    }
}

我们使用Unity进行依赖项注入,例如一些调用代码:

We use Unity for dependency injection, some calling code for example:

#region Members
    private readonly IProcessingQueueRepository _queueRepository;       
    #endregion

    #region Constructors
    /// <summary>Initializes ProcessingQueue services with repositories</summary>
    /// <param name="queueRepository"><see cref="IProcessingQueueRepository"/></param>        
    public ProcessingQueueService(IProcessingQueueRepository queueRepository)
    {
        Check.Require(queueRepository != null, "processingQueueRepository is required");
        _queueRepository = queueRepository;

    }
    #endregion

启动计时器的Windows服务中的代码如下:

The code in the windows service that kicks off the timers is as follows:

            _staWorkTypeConfigLock.EnterReadLock();
        foreach (var timer in from operation in (from o in _staWorkTypeConfig.WorkOperations where o.UseQueueForExecution && o.AssignedProcessors > 0 select o) 
                              let interval = operation.SpawnInternval < 30 ? 30 : operation.SpawnInternval 
                              select new StaTimer
                            {
                                Interval = _runImmediate ? 5000 : interval*1000,
                                Operation = (ProcessingQueue.RequestedOperation) operation.OperationId
                            })
        {
            timer.Elapsed += ApxQueueProcessingOnElapsedInterval;
            timer.Enabled = true;
            Logger.DebugFormat("Queue processing for operations of type {0} will execute every {1} seconds", timer.Operation, timer.Interval/1000);                
        }
        _staWorkTypeConfigLock.ExitReadLock();

StaTimer只是计时器添加操作类型的包装.然后ApxQueueProcessingOnElapsedInterval基本上只是根据操作将工作分配给流程.

StaTimer is just a wrapper on timer adding operation type. ApxQueueProcessingOnElapsedInterval then bascially just assigns work to the process based on the operation.

我还将在我们生成任务的地方添加一些ApxQueueProcessingOnElapsedInterval代码.

I will also add a bit of the ApxQueueProcessingOnElapsedInterval code where we are spawning tasks.

            _staTasksLock.EnterWriteLock();
        for (var x = 0; x < tasksNeeded; x++)
        {
            var t = new Task(obj => ProcessStaQueue((QueueProcessConfig) obj),
                             CreateQueueProcessConfig(true, operation), _cancellationToken);


            _staTasks.Add(new Tuple<ProcessingQueue.RequestedOperation, DateTime, Task>(operation, DateTime.Now,t));

            t.Start();
            Thread.Sleep(300); //so there are less conflicts fighting for jobs in the queue table
        }
        _staTasksLock.ExitWriteLock();

推荐答案

像服务,存储库和上下文之类的东西应该在应用程序的整个生命周期中都有效,但这是不正确的.您可以同时触发多个计时器.这意味着多个线程将并行使用您的服务,并且它们将在其线程中执行服务的代码=上下文在多个线程之间共享=>异常,因为上下文不是线程安全的.

Looks like your service, repository and context are supposed to live for the whole life time of your application but that is incorrect. You can have multiple timers triggered at the same time. That means multiple threads will use your service in parallel and they will execute the code of your service in their thread = context is shared among multiple threads => exception because context is not thread safe.

唯一的选择是对要执行的每个操作使用新的上下文实例.例如,您可以将您的类更改为接受上下文工厂而不是上下文,并为每个操作获取新的上下文.

The only option is to use a new context instance for each operation you want to execute. You can for example change your classes to accept context factory instead of context and get a new context for each operation.

这篇关于多线程实体框架:未关闭连接.连接的当前状态为连接中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆