为什么Parallel.Foreach创造无限的主题? [英] Why does Parallel.Foreach create endless threads?

查看:111
本文介绍了为什么Parallel.Foreach创造无限的主题?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在code以下继续创建线程,即使在队列empty..until最终出现的内存不足异常。如果我更换一个普通的foreach的Parallel.ForEach,这不会发生。任何人都知道的原因,这可能发生吗?

 公共委托无效DataChangedDelegate(DataItem的OBJ);

公共类消费
{
    公共DataChangedDelegate OnCustomerChanged;
    公共DataChangedDelegate OnOrdersChanged;

    私人CancellationTokenSource CTS;
    私人的CancellationToken CT;
    私人BlockingCollection< D​​ataItem的>队列;

    公共消费(BlockingCollection< D​​ataItem的>队列){
        this.queue =队列;
        开始();
    }

    私人无效启动(){
        CTS =新CancellationTokenSource();
        克拉= cts.Token;
        Task.Factory.StartNew(()=>的DoWork(),CT);
    }

    私人无效的DoWork(){

        Parallel.ForEach(queue.GetConsumingPartitioner(),项目=> {
            如果(item.DataType == DataTypes.Customer){
                OnCustomerChanged(项目);
            }否则,如果(item.DataType == DataTypes.Order){
                OnOrdersChanged(项目);
            }
        });
    }
}
 

解决方案

我觉得 Parallel.ForEach()制成,主要用于处理界的集合。而且它不会像期望通过 GetConsumingPartitioner(),返回的集合,其中的MoveNext()块长时间。

问题是, Parallel.ForEach()试图找到并行的最好的程度,所以它开始尽可能多的工作 S作为的TaskScheduler 让它运行。但的TaskScheduler 看到有很多工作 s表示需要很长的时间才能完成,而且他们不是做什么(他们阻止),所以它不断启动新的。

我认为最好的办法是设置 MaxDegreeOfParallelism

作为一种替代方法,如果你愿意用pre释放code,可以使用TPL数据流的 ActionBlock 。在这种情况下的主要区别是, ActionBlock 不会阻止任何线程当没有要处理的物品,所以线程的数目不会得到接近极限的任何地方。

The code below continues to create threads, even when the queue is empty..until eventually an OutOfMemory exception occurs. If i replace the Parallel.ForEach with a regular foreach, this does not happen. anyone know of reasons why this may happen?

public delegate void DataChangedDelegate(DataItem obj);

public class Consumer
{
    public DataChangedDelegate OnCustomerChanged;
    public DataChangedDelegate OnOrdersChanged;

    private CancellationTokenSource cts;
    private CancellationToken ct;
    private BlockingCollection<DataItem> queue;

    public Consumer(BlockingCollection<DataItem> queue) {
        this.queue = queue;
        Start();
    }

    private void Start() {
        cts = new CancellationTokenSource();
        ct = cts.Token;
        Task.Factory.StartNew(() => DoWork(), ct);
    }

    private void DoWork() {

        Parallel.ForEach(queue.GetConsumingPartitioner(), item => {
            if (item.DataType == DataTypes.Customer) {
                OnCustomerChanged(item);
            } else if(item.DataType == DataTypes.Order) {
                OnOrdersChanged(item);
            }
        });
    }
}

解决方案

I think Parallel.ForEach() was made primarily for processing bounded collections. And it doesn't expect collections like the one returned by GetConsumingPartitioner(), where MoveNext() blocks for a long time.

The problem is that Parallel.ForEach() tries to find the best degree of parallelism, so it starts as many Tasks as the TaskScheduler lets it run. But the TaskScheduler sees there are many Tasks that take a very long time to finish, and that they're not doing anything (they block) so it keeps on starting new ones.

I think the best solution is to set the MaxDegreeOfParallelism.

As an alternative, if you are willing to use pre-release code, you could use TPL Dataflow's ActionBlock. The main difference in this case is that ActionBlock doesn't block any threads when there are no items to process, so the number of threads wouldn't get anywhere near the limit.

这篇关于为什么Parallel.Foreach创造无限的主题?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆