Windows WF 4.5中的集合的真正并发 [英] True concurrency on a collection in Windows WF 4.5

查看:255
本文介绍了Windows WF 4.5中的集合的真正并发的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在修改先前编码的现有Windows Workflow Foundation项目,以便同步运行所有内容。

我有:



在工作流程中我有一个父序列工作流程,其中包含几个基本的工作流程,基本上设置几个服务,并准备运行。 我随后拥有大部分工作流程,其中包含一个ForEach工作流程,该工作流程对大约15000个项目进行操作,每个项目需要大约1-3秒的时间(时间大约为70%CPU ,10%网络延迟,20%数据库查询/访问)。显然这需要WAYYYY太长。我需要将这个时间提高大约5倍(需要大约5-6小时,需要大约1小时)



Delima:



在这个项目之前我从来没有使用过Windows Workflow,所以我非常不熟悉如何实现集合上并发执行的简单实现。



想法:



我阅读了不同的工作流活动, ParallelForEach 工作流活动可能是要走的路。我的想法是,我只是转换我的ForEach工作流活动与ParallelForEach工作流活动,并实现Parallel.Foreach()在任务并行库中工作的方式的并发。不幸的是,这似乎不是如何实现ParallelForEach工作流活动。 ParallelForEach工作流活动似乎只是将每个迭代放在一个堆栈中,并且几乎同步地对它们进行操作,而不是将多个线程上的每个集合的工作安排在多个线程上并进行上下文切换,除非工作流的主体Idle(我不认为它是与I / O上的等待相同的东西)似乎是一个显式的状态,需要在每个MSDN的工作流活动上设置:


ParallelForEach枚举它的值并为
的每个枚举值计算Body,它只计划Body。body
的执行方式取决于
如果Body不
去空闲,它以相反的顺序执行,因为调度的
活动被处理为堆栈,最后计划的活动
执行例如,如果您在
ParallelForEach中有一个{1,2,3,4}的集合,并使用WriteLine作为正文写入值
out。您在控制台中打印出4,3,2,1。这是因为
WriteLine不会空闲,所以在4 WriteLine活动得到
后,他们使用堆栈行为(首先是最后一个)执行。



但是如果你有活动在身体可以去空闲,如
接收活动或延迟活动。然后没有必要等待
他们完成。 ParallelForEach转到下一个计划的主体
活动,并尝试执行它。如果该活动也空闲,
ParallelForEach再次移动下一个body活动。




当使用ParallelForEach工作流活动运行我的想法时,我的运行时间与正常的ForEach Workflow Activity大致相同。我正在考虑使底层的BeginWorkflow方法异步,但我不知道这将是一个好主意或不与Windows WF如何操作。



我需要您的帮助:



有没有人对我如何实现我想达到的结果有任何建议?有没有另一种方法来实现的东西,将尽可能多的线程并行执行foreach工作流的主体?我有8个逻辑处理器,我想利用所有的感觉每个迭代的集合是独立于其他。



任何想法??

解决方案

工作流运行时是单线程的。要真正做并行工作,你必须管理自己的线程(不知何故)。我的猜测是,你的活动只是在Execute方法中执行他们的东西,并且运行时一次只允许一个Execute。



这里是NonblockingNativeActivity类的代码。它对我们有用,我希望它也帮助你。使用此作为您的活动的基类,而不是覆盖Execute,覆盖ExecuteNonblocking。如果您需要使用工作流运行时,您还可以覆盖PrepareToExecute和AfterExecute,但是这些将是单线程的。

 使用系统。文本; 
using System.Activities.Hosting;
using System.Activities;
using System.Diagnostics;
using System.Threading.Tasks;
using System.Threading;

命名空间Sample.Activities
{
///< summary>
///类非阻塞本地活动
///< / summary>
public abstract class NonblockingNativeActivity:NativeActivity
{
private Variable< NoPersistHandle> NoPersistHandle {get;组; }
private Variable< Bookmark>书签{get;组; }

private任务m_Task;
private Bookmark m_Bookmark;
private BookmarkResumptionHelper m_BookmarkResumptionHelper;

///< summary>
///允许活动诱导空闲。
///< / summary>
protected override bool CanInduceIdle
{
get
{
return true;
}
}

///< summary>
///执行准备
///< / summary>
///< param name =context>< / param>
protected virtual void PrepareToExecute(
NativeActivityContext context)
{
}

///< summary>
///执行非阻塞活动
///< / summary>
protected abstract void ExecuteNonblocking();

///< summary>
///执行完成后
///< / summary>
///< param name =context>< / param>
protected virtual void AfterExecute(
NativeActivityContext context)
{
}

///< summary>
///执行活动
///< / summary>
///< param name =context>< / param>
protected override void Execute(NativeActivityContext context)
{

//
//我们必须输入一个NoPersist区域,因为它看起来像我们闲置,而我们的
//任务正在执行,但我们不是真的
//
NoPersistHandle noPersistHandle = NoPersistHandle.Get(context);
noPersistHandle.Enter(context);

//
//设置我们将在任务完成时恢复的书签
//
m_Bookmark = context.CreateBookmark(BookmarkResumptionCallback);
this.Bookmark.Set(context,m_Bookmark);
m_BookmarkResumptionHelper = context.GetExtension< BookmarkResumptionHelper>();

//
//准备执行
//
PrepareToExecute(context);

//
//开始一个任务来执行我们的活动
//
CancellationTokenSource tokenSource = new CancellationTokenSource();
m_Task = Task.Factory.StartNew(ExecuteNonblocking,tokenSource.Token);
m_Task.ContinueWith(TaskCompletionCallback);
}

private void TaskCompletionCallback(任务任务)
{
if(!task.IsCompleted)
{
task.Wait ;
}

//
//恢复书签
//
m_BookmarkResumptionHelper.ResumeBookmark(m_Bookmark,null);
}


private void BookmarkResumptionCallback(NativeActivityContext上下文,书签书签,对象值)
{
var noPersistHandle = NoPersistHandle.Get(context);

if(m_Task.IsFaulted)
{
//
//任务有问题
//
Console.WriteLine Exception from ExecuteNonBlocking task:);
异常ex = m_Task.Exception;
while(ex!= null)
{
Console.WriteLine(ex.Message);
ex = ex.InnerException;
}

//
//如果有一个异常退出无持久句柄并重新抛出。
//
if(m_Task.Exception!= null)
{
noPersistHandle.Exit(context);
throw m_Task.Exception;
}
}

AfterExecute(context);

noPersistHandle.Exit(context);
}

//
// TODO:我们如何处理取消?我们可以通过一个CancellationToekn任务
//,以便我们取消任务,但也许我们可以做得更好吗?
//
///< summary>
///中止活动
///< / summary>
///< param name =context>< / param>
protected override void Abort(NativeActivityAbortContext context)
{
base.Abort(context);
}

///< summary>
///取消活动
///< / summary>
///< param name =context>< / param>
protected override void Cancel(NativeActivityContext context)
{
base.Cancel(context);
}

///< summary>
///注册活动元数据
///< / sum>
///< param name =metadata>< / param>
protected override void CacheMetadata(NativeActivityMetadata metadata)
{
base.CacheMetadata(metadata);
this.NoPersistHandle = new Variable this.Bookmark = new Variable< Bookmark>();
metadata.AddImplementationVariable(this.NoPersistHandle);
metadata.AddImplementationVariable(this.Bookmark);
metadata.RequireExtension< BookmarkResumptionHelper>();
metadata.AddDefaultExtensionProvider< BookmarkResumptionHelper>(()=> new BookmarkResumptionHelper());
}
}
}


I am modifying an existing Windows Workflow Foundation project that was previous coded to run everything synchronously. However, as the data set grew this needed to change to meet performance requirements.

What I have:

Inside the workflow I have a parent Sequence Workflow that contains a few elementary workflows that basically set a few services up and prepares them to run. I then have the bulk of the workflow's work, which consists of a ForEach Workflow that operates on a collection of about 15000 items that take about 1-3 seconds per item to process (timings are around 70% CPU, 10% network latency, 20% database querying/access). Obviously this takes WAYYYY too long. I need to improve this time by about a factor of 5 (takes around 5-6 hours, need to get to about 1 hour)

Delima:

I have never worked with Windows Workflows before this project so I very unfamiliar with how to achieve otherwise simple implementations of concurrent execution on a collection.

Ideas:

I read about the different Workflow Activities and decided that a ParallelForEach Workflow Activity would probably be the way to go. My idea was that I would just switch out my ForEach Workflow Activity with the ParallelForEach Workflow activity and achieve concurrency in the way the Parallel.Foreach() works in the Task Parallel Library. Unfortunately, that does not seem to be how the ParallelForEach Workflow Activity is implemented. Instead of scheduling the work to be done on each collection across multiple threads and context switching when another thread was waiting, the ParallelForEach Workflow Activity seems to just put each iteration in a stack and operates on them almost syncrounously, unless the body of the workflow is "Idle" (which I do not believe is the same thing as "waiting" on I/O. It seems to be an explicit state that needs to be set on a workflow activity-per MSDN:

ParallelForEach enumerates its values and schedules the Body for every value it enumerates on. It only schedules the Body. How the body executes depends on whether the Body goes idle. If the Body does not go idle, it executes in a reverse order because the scheduled activities are handled as a stack, the last scheduled activity executes first. For example, if you have a collection of {1,2,3,4}in ParallelForEach and use a WriteLine as the body to write the value out. You have 4, 3, 2, 1 printed out in the console. This is because WriteLine does not go idle so after 4 WriteLine activities got scheduled, they executed using a stack behavior (first in last out).

But if you have activities in the Body that can go idle, like a Receive activity or Delay activity. Then there is no need to wait for them to complete. ParallelForEach goes to the next scheduled body activity and try to execute it. If that activity goes idle too, ParallelForEach moves on again the next body activity.

Where I am now:

When running my "idea" above with the ParallelForEach Workflow Activity, I achieve about the same running time as the normal ForEach Workflow Activity. I was considering making the underlying BeginWorkflow method async but I'm not sure if that will be a good idea or not with how Windows WF operates.

I need your help:

Does anyone have any suggestions on how I can achieve the results that I am trying to get to? Is there another way to implement something that would execute the body of the foreach workflow in parallel on as many threads as possible? I have 8 logical processor and I want to take advantage of all of them sense each iteration of the collection is independent from the others.

Any Ideas??

解决方案

The Workflow runtime is single threaded. To truly do parallel work, you have to manage your own threads (somehow). My guess is that your activities are simply doing their thing in the Execute method and the runtime will only allow one Execute at a time.

Here is the code for a NonblockingNativeActivity class. It has been useful for us, I hope it helps you as well. Use this as the base class for your activities, instead of overriding Execute, override ExecuteNonblocking. You can also override PrepareToExecute and AfterExecute if you need to work with the Workflow runtime but, those will be single threaded.

using System.Text;
using System.Activities.Hosting;
using System.Activities;
using System.Diagnostics;
using System.Threading.Tasks;
using System.Threading;

namespace Sample.Activities
{
    /// <summary>
    /// Class Non-Blocking Native Activity
    /// </summary>
    public abstract class NonblockingNativeActivity : NativeActivity
    {
        private Variable<NoPersistHandle> NoPersistHandle { get; set; }
        private Variable<Bookmark> Bookmark { get; set; }

        private Task m_Task;
        private Bookmark m_Bookmark;
        private BookmarkResumptionHelper m_BookmarkResumptionHelper;

        /// <summary>
        /// Allows the activity to induce idle. 
        /// </summary>
        protected override bool CanInduceIdle
        {
            get
            {
                return true;
            }
        }

        /// <summary>
        /// Prepars for Execution
        /// </summary>
        /// <param name="context"></param>
        protected virtual void PrepareToExecute(
            NativeActivityContext context)
        {
        }

        /// <summary>
        /// Executes a Non-blocking Activity
        /// </summary>
        protected abstract void ExecuteNonblocking();

        /// <summary>
        /// After Execution Completes
        /// </summary>
        /// <param name="context"></param>
        protected virtual void AfterExecute(
            NativeActivityContext context)
        {
        }

        /// <summary>
        /// Executes the Activity
        /// </summary>
        /// <param name="context"></param>
        protected override void Execute(NativeActivityContext context)
        {

            //
            //  We must enter a NoPersist zone because it looks like we're idle while our
            //  Task is executing but, we aren't really
            //
            NoPersistHandle noPersistHandle = NoPersistHandle.Get(context);
            noPersistHandle.Enter(context);

            //
            //  Set a bookmark that we will resume when our Task is done
            //
            m_Bookmark = context.CreateBookmark(BookmarkResumptionCallback);
            this.Bookmark.Set(context, m_Bookmark);
            m_BookmarkResumptionHelper = context.GetExtension<BookmarkResumptionHelper>();

            //
            //  Prepare to execute
            //
            PrepareToExecute(context);

            //
            //  Start a Task to do the actual execution of our activity
            //
            CancellationTokenSource tokenSource = new CancellationTokenSource();
            m_Task = Task.Factory.StartNew(ExecuteNonblocking, tokenSource.Token);
            m_Task.ContinueWith(TaskCompletionCallback);
        }

        private void TaskCompletionCallback(Task task)
        {
            if (!task.IsCompleted)
            {
                task.Wait();
            }

            //
            //  Resume the bookmark
            //
            m_BookmarkResumptionHelper.ResumeBookmark(m_Bookmark, null);
        }


        private void BookmarkResumptionCallback(NativeActivityContext context, Bookmark bookmark, object value)
        {
            var noPersistHandle = NoPersistHandle.Get(context);

            if (m_Task.IsFaulted)
            {
                //
                //  The task had a problem
                //
                Console.WriteLine("Exception from ExecuteNonBlocking task:");
                Exception ex = m_Task.Exception;
                while (ex != null)
                {
                    Console.WriteLine(ex.Message);
                    ex = ex.InnerException;
                }

                //
                // If there was an exception exit the no persist handle and rethrow.
                //
                if (m_Task.Exception != null)
                {
                    noPersistHandle.Exit(context);
                    throw m_Task.Exception;
                }
            }

            AfterExecute(context);

            noPersistHandle.Exit(context);
        }

        //
        //  TODO: How do we want to handle cancelations?  We can pass a CancellationToekn to the task
        //  so that we cancel the task but, maybe we can do better than that?
        //
        /// <summary>
        /// Abort Activity
        /// </summary>
        /// <param name="context"></param>
        protected override void Abort(NativeActivityAbortContext context)
        {
            base.Abort(context);
        }

        /// <summary>
        /// Cancels the Activity
        /// </summary>
        /// <param name="context"></param>
        protected override void Cancel(NativeActivityContext context)
        {
            base.Cancel(context);
        }

        /// <summary>
        /// Registers Activity Metadata
        /// </summary>
        /// <param name="metadata"></param>
        protected override void CacheMetadata(NativeActivityMetadata metadata)
        {
            base.CacheMetadata(metadata);
            this.NoPersistHandle = new Variable<NoPersistHandle>();
            this.Bookmark = new Variable<Bookmark>();
            metadata.AddImplementationVariable(this.NoPersistHandle);
            metadata.AddImplementationVariable(this.Bookmark);
            metadata.RequireExtension<BookmarkResumptionHelper>();
            metadata.AddDefaultExtensionProvider<BookmarkResumptionHelper>(() => new BookmarkResumptionHelper());
        }
    }
}

这篇关于Windows WF 4.5中的集合的真正并发的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆