使用EmguCV的多线程C#代码中的死锁 [英] Deadlock in a multithreaded C# code which uses EmguCV

查看:136
本文介绍了使用EmguCV的多线程C#代码中的死锁的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

你好
我编写了一个C#代码,该代码调用EmguCV方法来提取图像的特征点.
当我想提高程序速度时,我将其设置为多线程.
我使用了一个名为NUMBER_OF_CONCURRENT_THREADS的变量来保持可以同时完成的并发线程数.例如,在具有4个核心的cpu的系统中,我将此数字设置为3(一个主(控制器)线程和3个额外的线程).我还使用了信号灯进行同步.这些线程.

我的问题是死锁,在我运行程序时会不确定地发生.

我的代码如下:

Hello
I have wrote a C# code which calls EmguCV methods to extract feature points of images.
As i want to increase speed of program, i made it multithreaded.
I used a variable named NUMBER_OF_CONCURRENT_THREADS to keep number of concurrent threads which can be done simultaneously. for example, in a system with a cpu that have 4 cores, i set this number to be 3(one main(controller) thread and 3 extra ones). I also used a semaphore to sync. these threads.

My problem is deadlock which occurred non-deterministically when i run program.

my code is as follows:

const int NUMBER_OF_CONCURRENT_THREADS = 3;//other than main thread

Semaphore findDesctriptorSemaphore = new Semaphore(0, 10000);//empty at first

public void FindDesctriptors(int imageCount/*1-based images*/)
{
   if (imageCount <= 0)
      return;


   ParameterizedThreadStart pts = new ParameterizedThreadStart(FindDesctriptor);

   int q = 0;
   for (; q < imageCount; q++)
   {
      if (q >= (NUMBER_OF_CONCURRENT_THREADS))
         findDesctriptorSemaphore.WaitOne();

      //Parallel computing
      Thread t = new Thread(pts);
      t.Name = "desc " + (q + 1).ToString();
      t.Priority = ThreadPriority.Highest;
      t.Start(q + 1);
   }


   if (imageCount > NUMBER_OF_CONCURRENT_THREADS)
      //wait for all thread to complete
      for (int i = 0; i < NUMBER_OF_CONCURRENT_THREADS; i++)
         findDesctriptorSemaphore.WaitOne();
   else
      //wait for all thread to complete
      for (int i = 0; i < q; i++)
         findDesctriptorSemaphore.WaitOne();
}


void FindDesctriptor(object param/*1-based*/)
{
   int id = (int)param;

   Image<Gray, Byte> img = new Image<Gray, Byte>(id.ToString() + ".jpg");

   SURFFeature[] features = Extract_SURF_Features(img)/*a function that only calls emgucv methods to extract feature points*/;

   descs[id - 1] = features;

   findDesctriptorSemaphore.Release();//once

}



有人可以帮我吗?



Can anyone help me, please?

推荐答案

我将使用两个ResetEvent's信号量似乎是导致您出现问题的原因.切换导致死锁,除非您向逻辑中添加一些信号,否则您将无能为力.

与使用信号来解决此问题相比,一种简单的方法是利用两个ResetEvents提供所需的有状态性.

实际上,我也已经成功实现了不使用ResetEvents的代码,但确实使用了ThreadPool和锁定,这似乎也可以工作...

以我的WebServer类为例.

在单个ListenHandler上接收到请求:
ListenHandler.aspx [ WebServer.cs [ HttpContextAdapter.cs [ WebClient.cs [ WebServerAPI.js [ WebMouse.cs [
I would use two ResetEvent''s it seems the semaphore is what is causing you problems. The switching is causing the deadlock and there is nothing you can do unless you add some signals to the logic.

An easier approach than using signals to fix this is to utilize two ResetEvents to provide the statefullness you require.

I actually have also successfully implemented code which does not use the ResetEvents but does use the ThreadPool and locking and that also seems to work...

Take my WebServer class for example.

Requests are received on a single ListenHandler:
ListenHandler.aspx[View Source]

The ListenHandler then brokers the requests either using the ThreadPool or the current thread intelligently.

The requests are brokered to the WebServer class:
WebServer.cs[View Source]

At that point a HttpContextAdapter class is utilized to make various types of HttpRequests ubiquitous across the implementation.
HttpContextAdapter.cs[View Source]

Once there is an adapter created a WebClient is created to represent the client and the request on the WebServer.

WebClient.cs[View Source]

Clients basically interact with the browser and the following JavaScript API is utilized to allow the actions to be sent to the server and thus back to other clients if required.

WebServerAPI.js[View Source]

WebMouse is one of my favorites because it allows multiple WebClients (users) to share mouse interactions on a single WebPage or even an application across the web using the WebServer.

WebMouse.cs[View Source]

I even have WebChat''s and WebShare''s which hold WebObject''s. The WebMouse class is derived from WebObject and is usually stored in a WebShare.

If you need any further assistance please disambiguate your question and we will do our best to help!


我无事可做,所以我写了一个简单的示例来帮助您利用AsyncResult模式作为锁定.

I had nothing to do so I wrote a quick example to help you utilize the AsyncResult pattern as well as locking.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Collections;

namespace CodeProjectHelpProject
{
    //An example program
    class Program
    {
        /// <summary>
        /// A small class to derive from exception to ensure filtering is managable.
        /// </summary>
        class OperatingException : Exception
        {
            public OperatingException(string ex) : base(ex) { }
            public OperatingException(string ex, Exception innerEx) : base(ex, innerEx) { }
        }

        /// <summary>
        /// A small class to utilize during a program which will assist in multi threading
        /// </summary>
        class OperationManager
        {
            /// <summary>
            /// The history of operation results sorted by date and time
            /// </summary>
            public readonly SortedDictionary<datetime,> ResultHistory = new SortedDictionary<datetime,>();

            /// <summary>
            /// The currently executing operation
            /// </summary>
            public OperationResult CurrentOperation { get; private set; }

            /// <summary>
            /// Begins the process of executing a new operation on this instance of the OperationManager
            /// </summary>
            /// <param name="newOperation">The class which is passes for state observation and added to the ResultHistory upon any action</param>
            /// <param name="cancelExisting">A signal which indicates if there is a operation already executing to cancel the operation</param>
            /// <param name="resumePoint">A WaitCallback which may be used to give signal to external code</param>
            public void Begin(OperationResult newOperation, bool cancelExisting = false, WaitCallback resumePoint = null)
            {
                //If there is a CurrentOperation and there is no operation which is expected to be canceled indicate so by throwing an OperatingException.
                if (CurrentOperation != null && !cancelExisting) throw new OperatingException("The Operation Manager does not support multiple operations and there is currently an operation pending");
                else
                {
                    //Obtain a lock around this OperationManager
                    lock (this)
                    {
                        //Lock the CurrentOperation
                        lock (CurrentOperation)
                        {
                            //If there is a value assigned to the CurrentOperation and it is not completed
                            if (CurrentOperation != null && !CurrentOperation.Signal)
                            {
                                //Cancel the CurrentOperation
                                CurrentOperation.Cancel();                                
                            }
                        }

                        //Add the result to history
                        AddHistoryResult(CurrentOperation);

                        //Assign the newOperation to the CurrentOperation
                        CurrentOperation = newOperation;

                        //Create a scope to enclose the exposed logic
                        try
                        {
                            //Begin the CurrentOperation
                            CurrentOperation.Begin();
                        }
                        catch
                        {
                            //On exception of the resumePoint is not null
                            if (resumePoint != null)
                            {
                                //Add the history result
                                AddHistoryResult(newOperation);

                                //Invoke it passing the CurrentOperation
                                resumePoint.Invoke(CurrentOperation);

                                //Set the CurrentOperation to nil.
                                CurrentOperation = null;
                            }
                        }
                    }
                }
            }

            private void AddHistoryResult(OperationResult what, DateTime? when = null)
            {
                //If there is no what to add return
                if (what == null) return;
                try
                {
                    //Guard when to it's value or DateTime.UtcNow
                    when = when ?? DateTime.UtcNow;
                    //Add the result to the history statefully
                    IDictionary securePointer = ResultHistory;
                    lock (securePointer.SyncRoot)
                    {
                        //Attempt to add what at when
                        ResultHistory.Add(when.Value, what);
                    }
                    //Remove the reference
                    securePointer = null;
                }
                catch
                {
                    //Catch and propagate the exception
                    throw;
                }
            }

        }

        /// <summary>
        /// A small class which is useful for storing operating results on.
        /// Currently it only contains the Signal Member which is used in this example to display completeness of the operation in question.
        /// </summary>
        sealed class OperationResult : IAsyncResult
        {
            /// <summary>
            /// The manager in which this result was created for
            /// </summary>
            private OperationManager manager;

            public OperationResult(OperationManager manager)
            {
                // TODO: Complete member initialization
                this.manager = manager;
            }
            public bool Signal { get; set; }

            /// <summary>
            /// Do something here if desired
            /// </summary>
            internal void Cancel()
            {
                lock (this)
                {
                    this.Signal = true;
                }
            }

            /// <summary>
            /// Called when the Operation Begins
            /// </summary>
            internal void Begin()
            {
                lock (this)
                {
                    if (this.Signal) return;
                    else
                    {
                        try
                        {
                            //Call another method or override in a derived class etc.
                            new Thread(() =>
                            {
                                //Do work in here and add to this object statefully
                                this.Signal = true;
                                return;
                            }).Start();
                        }
                        catch(Exception ex)
                        {
                            //Pass to the propagation mechanism
                            PropagateException(ex);
                        }
                    }
                }
            }

            /// <summary>
            /// Does the needful thing
            /// </summary>
            /// <param name="ex">The exception</param>
            private void PropagateException(Exception ex)
            {
                throw new OperatingException("An exception occured", ex);
            }

            public object AsyncState
            {
                get { throw new NotImplementedException(); }
            }

            public WaitHandle AsyncWaitHandle
            {
                get { throw new NotImplementedException(); }
            }

            public bool CompletedSynchronously
            {
                get { throw new NotImplementedException(); }
            }

            public bool IsCompleted
            {
                get { return !Signal; }
            }
        }

        /// <summary>
        /// Logic for cancelation will occur in this function.
        /// This function may be called from inside any function and the invocation of this function should indicate the current action is being canceled.
        /// </summary>
        /// <param name="parameter"></param>
        static void OnCancel(object parameter)
        {
            //Whatever you need
        }

        static void AfterProcessing(IAsyncResult result)
        {
            //Checks and sh*t
        }

        static void Main(string[] args)
        {
            //Create an OperationManager for this static execution
            OperationManager manager = new OperationManager();

            //We create a callback which can be invoked from inside the logic, the logic points to the OnCancel Method.
            WaitCallback resumePoint = new WaitCallback(OnCancel);
            
            //We create a result to indicate when the operation has completed
            var result = new OperationResult(manager);

            //Begin the operation
            manager.Begin(result, false, resumePoint);

            //Save the old thread priority
            ThreadPriority oldPriority = Thread.CurrentThread.Priority; 

            //While there is no signal
            while (result.Signal)
            {
                //Reduce priority
                Thread.CurrentThread.Priority = ThreadPriority.Lowest;
                //Do a big wait which is different depending on the year
                Thread.SpinWait((int)DateTime.Now.Ticks / DateTime.UtcNow.Year);
            }

            //Set the oldPriority to the currentThread
            Thread.CurrentThread.Priority = oldPriority;

            //Do work after processing
            AfterProcessing(result);

            //An example of this could work without the OperationManager
            //resumePoint.BeginInvoke(resumePoint, new AsyncCallback(AfterProcessing), result);

            //It would utilize the ThreadPool
            //ThreadPool.QueueUserWorkItem(resumePoint);

        }
    }
}



让我知道你的喜好!

它不是100%完整的,但它应该为您构建合适的同步设计模式提供合适的基础.

例如,由于创建线程的内在内存开销,我要做的一件事是利用ThreadPool而不是Threads,但是您的外部代码可能需要线程本地存储,并且我可以看到这是有好处的,因为使用快速调用的旧代码有多少已执行.

另一个方法是将resumePoint移到OperationResult,然后在需要时从Begin或任何其他Method调用它.现在,我有目的地完成了分离,因为我希望您在使用代码时学到一两个东西.

示例中显示了很多内容,如果您还有其他问题,您将确切知道在哪里提问!

真诚的
V//



Let me know how you like it!

It is not 100% complete but it should provide you a suitable base to construct a properly synchronized design pattern.

For instance one thing I would do is utilize the ThreadPool rather than Threads due to the intrinsic memory overhead of creating threads however your external code may require thread local storage and I can see how this may be beneficial due to how old code using fast calls are executed.

Another would be to move the resumePoint to the OperationResult and then invocate it from the Begin or any other Method when required. Right now there is separation which I have purposefully done because I want you to learn a thing or two as you work with the code.

There is a lot being shown in the example and if you have further questions you know exactly where to ask them!

Sincerely,
V//


我立即可以看到的问题是,您允许多个线程调用同一个库.虽然我不熟悉所讨论的库,但我想说的是,当您释放信号量时,未管理的代码仍在其他线程中执行.如果库正在使用共享的本地内存,则可能是导致您出现问题的原因.
我看到的另一件事是,如前所述释放信号量时,其他线程仍在其范围内执行(托管和未管理的执行清理),如果不仔细观察,这会导致内存泄漏,最重要的是会导致运行时损坏对象由于析构函数调用,它们跨应用程序边界被封送.

我将利用一个类,并通过引用"ref"传递该类,这将确保本地旧版代码可以像在本机执行中一样对对象进行更改.

我将实现IDisposable,并利用示例中给出的IAsyncResult.

如果您没有向我们提供与正在接收的错误有关的更多信息,或者可能没有给我们提供更多代码来运行并查看结果,那么我此时只能做些模糊的推测.

我可以提出的一种对您有帮助的立即解决方案是,将文件分解为几个更易于管理的块.

每个块使用一个线程,并将每个块的额外数据插入一个新块,该块可以在后处理之前运行.

例如

从文件创建字节[].

将Byte []拆分为大块.

让Remainder = new List< byte []>();

设置存储(例如字典或列表)

调用库.

检查剩余插槽中是否有每个片段遗留的多余Byte []数据,再次排序并运行过程.

最后检查存储结果.

然后,用户实际上可以在操作过程中窥视存储空间,而不会干扰搜索操作.

v//
The problem I can immediately see is that you are allowing multiple threads to invoke the same library. While I am not familiar with the library in question I would say that when you release the semaphore the unman-aged code is still executing in other threads. If the library is utilizing shared local memory then this is possibly what is causing you problems.

Another thing I see is that when the semaphore is released as previously stated there other threads are still executing withing their scope (managed and unmangaged performing cleanup), this causes memory leaks if not watched carefully and on top of that causes the runtime to mangle objects which are being marshaled across application boundaries due to destructor invocation.

I would utilize a class and pass the class by reference "ref" this will ensure that the local legacy code can make changes to the object just like it would in native execution.

I would implement IDisposable and I would utilize IAsyncResult as given in the example.

Without you giving us more information relevant to the errors you are receiving and possibly giving us more code to run and see the results all I can do at this time is vaguely speculate.

One immediate solution I can pose which would definitely help you is to break the file down into several more manageable chunks.

Use a thread per chunk and interleave the extra data per chunk to a new chunk which can be run before post processing.

E.g.

Create Byte[] From File.

Split Byte[] into chunks.

Let Remainder = new List<byte[]>();

Setup storage(e.g. Dictionary or List)

Invoke library.

Check Remainder slots for extra Byte[] data left over from each fragment, sort and run process again.

Finally check storage for results.

Users can then actually peek into storage during operation and not interfere with the search operation.

v//


这篇关于使用EmguCV的多线程C#代码中的死锁的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆