如何取消相互依赖的并行任务的数据 [英] How do I cancel parallel tasks that are dependent on each other for data

查看:105
本文介绍了如何取消相互依赖的并行任务的数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

大家好,



我对取消任务有疑问。请阅读以下说明。任何建议都可以帮我解决问题。



我有3个任务。让我们称之为task1,task2,& TASK3。除数据外,它们中的每一个都是独立的。即task1将数据放入queue1,task2读取它。 task2将一些其他已处理的数据放入queue2,task3读取它。 task3做了一些最后的工作。此链应该继续,直到task1成功完成。完成task1后,在处理各自的队列后,task2和task3应该会成功完成。如果由于任何原因任务停止在中间(例如由于任务正在运行的方法中的异常),那么其他两个任务也应该立即停止。



为了成功完成,我使用整数变量。我在task1完成时递增它。其他两个任务读取它并在清空自己的队列后顺利退出。



对于错误的退出,我可以使用相同的整数变量。但我正在寻找一个更好的解决方案,以便代码看起来更干净。



请参阅下面的代码:



我通过在Method2中抛出异常来模拟错误。发生此错误后,task2停止,task1运行完成,task3无限等待队列中的更多数据,从而阻止应用程序完成。



有没有办法任务出现故障/完成时通知其他任务?

例如

1.当task1出现故障/完成时通知task2和task3

2.当task2出现故障/完成时通知task1和task3

3.通知task1任务3出现故障/完成时的任务2



Hi All,

I have a question regarding cancellation of tasks. Please read below for the explanation. Any suggestion would help me solve the problem.

I have 3 tasks. Lets call them task1, task2, & task3. Each of them are independent except for the data. i.e. task1 puts data into a queue1 and task2 reads it. task2 puts some other processed data into queue2 and task3 reads it. task3 does some final work. This chain should continue till task1 completes successfully. Upon completion of task1 and after processing their respective queues task2 and task3 should run into successful completion. If for any reason a task stops in-between (say due to an exception in the method the task is running), then the other two tasks should also stop immediately.

For successful completion I am using an integer variable. I increment it when task1 completes. The other 2 tasks read it and exit smoothly after emptying their own queues.

For erroneous exits I can use the same integer variable. But I am looking at a better solution so that code looks cleaner.

See the code below:

I have simulated error by throwing exception in Method2. After this error occurs, task2 stops, task1 runs to completion and task3 infinitely waits for more data in the queue thus blocking the application to finish.

Is there a way to notify other tasks when a task has faulted/completed?
E.g.
1. Notify task2 and task3 when task1 faults/completes
2. Notify task1 and task3 when task2 faults/completes
3. Notify task1 and task2 when task3 faults/completes

using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Drawing;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;
using System.Xml.Linq;
using System.Xml.XPath;
using System.Diagnostics;

namespace Designer.UpdateManager.Dependency
{
	public class StringDependencyCalculator
	{
		#region Fields
        private ConcurrentQueue<int> queue1 = new ConcurrentQueue<int>();
		private ConcurrentQueue<string> queue2 = new ConcurrentQueue<string>();
		private int foundAllDependencies = 0;
		private int convertedAllDependencies = 0;
		private int maxLimit;
		#endregion

        #region Constructor

        public static void Main(string[] args)
		{
			if(args.Length != 1)
			{
				Console.WriteLine("Invalid no. of arguments");
				Console.WriteLine("Usage: TaskParallelization.exe <maxLimit>");
			}

			int maxLimit = 0;
			if(int.TryParse(args[0], out maxLimit))
			{
				StringDependencyCalculator sdc = new StringDependencyCalculator();
				sdc.CalculateDependency(maxLimit);
			}
			else
			{
				Console.WriteLine("Invalid no. of arguments");
				Console.WriteLine("Usage: TaskParallelization.exe <maxLimit>");
			}
		}

		#endregion

        #region Methods

        public void CalculateDependency(int maxLimit)
		{
			this.maxLimit = maxLimit;
			Stopwatch stopWatch = new Stopwatch();
			stopWatch.Start();

			try
			{
				Task[] tasks = new Task[]
				{
					Task.Factory.StartNew(() => Method1()),
                    Task.Factory.StartNew(() => Method2()),
                    Task.Factory.StartNew(() => Method3())
				};

				Task.WaitAll(tasks);
			}
			catch (AggregateException ex)
			{
				StringBuilder messageStringBuilder = new StringBuilder();
				foreach (Exception exception in ex.InnerExceptions)
				{
					messageStringBuilder.AppendLine(exception.Message);
				}

				Console.WriteLine(messageStringBuilder.ToString());
			}

			finally
			{
				stopWatch.Stop();
				Console.WriteLine("Elapsed time - " + (stopWatch.ElapsedMilliseconds / 1000) + "s");
				stopWatch = null;
			}
		}

		private void Method1()
		{
			Random r = new Random();

			for (int i = 0; i < maxLimit; i++)
			{
				queue1.Enqueue(i * r.Next(100));
			}

			Interlocked.Increment(ref foundAllDependencies);
		}

		private void Method2()
		{
			for (int i = 0; i < maxLimit; i++)
			{
				int dequedItem = 0;
				while (!queue1.TryDequeue(out dequedItem))
				{
					// Spin to find any element.
					if (foundAllDependencies > 0 && !queue1.Any())
						break;
				}

				queue2.Enqueue(dequedItem.ToString());

				// Simulate erroneous condition
				if(i == 10)
					throw new InvalidDataException("10 is not a valid value");
			}

			Interlocked.Increment(ref convertedAllDependencies);
		}

		private void Method3()
		{
			for (int i = 0; i < maxLimit; i++)
			{
				string dequedItem = string.Empty;
				while (!queue2.TryDequeue(out dequedItem))
				{
					// Spin to find any element.
					if (foundAllDependencies > 0 && convertedAllDependencies > 0 && !queue2.Any())
						break;
				}

				if(string.IsNullOrEmpty(dequedItem))
					continue;

				Console.WriteLine(dequedItem);
			}
		}
		
		#endregion
    }
}

推荐答案





您要做的就是完成。但是在这个快速回答部分无法回答。这是一个非常广泛的话题。以下是一些帮助您理解概念并实施解决方案的文章。



基于上一个任务的结果处理任务

< a href =http://blogs.msdn.com/b/pfxteam/archive/2012/08/02/processing-tasks-as-they-complete.aspx> http://blogs.msdn.com/b /pfxteam/archive/2012/08/02/processing-tasks-as-they-complete.aspx [ ^ ]



连续多项任务连续

https:// msdn.microsoft.com/en-us/library/dd537612(v=vs.110).aspx [ ^ ]



任务取消

http://blogs.msdn.com/b/csharpfaq/archive/2010/07/19/parallel-programming-task-cancellation.aspx [ ^ ]
Hi,

What you are trying to do can be done. But it cannot be answered in this quick answers section. It is a pretty vast topic. Here are some articles that help you understand the concept and implement a solution.

Process task based on the outcome of the previous task
http://blogs.msdn.com/b/pfxteam/archive/2012/08/02/processing-tasks-as-they-complete.aspx[^]

Chain Multiple Tasks with Continuations
https://msdn.microsoft.com/en-us/library/dd537612(v=vs.110).aspx[^]

Task cancellation
http://blogs.msdn.com/b/csharpfaq/archive/2010/07/19/parallel-programming-task-cancellation.aspx[^]


这篇关于如何取消相互依赖的并行任务的数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆