为什么在Parallel.ForEach中每个线程多次调用localInit Func [英] Why does the localInit Func get called multiple times per thread in Parallel.ForEach

查看:82
本文介绍了为什么在Parallel.ForEach中每个线程多次调用localInit Func的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写一些代码来处理大量数据,我认为让Parallel.ForEach为它创建的每个线程创建一个文件会很有用,因此不需要同步输出(至少由我来说) )。

I was writing some code to process a lot of data, and I thought it would be useful to have Parallel.ForEach create a file for each thread it creates so the output doesn't need to be synchronized (by me at least).

看起来像这样:

Parallel.ForEach(vals,
    new ParallelOptions { MaxDegreeOfParallelism = 8 },
    ()=>GetWriter(), // returns a new BinaryWriter backed by a file with a guid name
    (item, state, writer)=>
    {
        if(something)
        {
            state.Break();
            return writer;
        }
        List<Result> results = new List<Result>();

        foreach(var subItem in item.SubItems)
            results.Add(ProcessItem(subItem));

        if(results.Count > 0)
        {
            foreach(var result in results)
                result.Write(writer);
        }
        return writer;
    },
    (writer)=>writer.Dispose());

我希望发生的事情是最多可以创建8个文件,并且在整个运行过程中将持续存在时间。然后,当整个ForEach调用完成时,每个将被处置。实际发生的情况是,对每个项目而言,localInit似乎都被调用过一次,所以我最终得到了数百个文件。在处理的每个项目的末尾,编写者也将处分。

What I expected to happen was that up to 8 files would be created and would persist through the entire run time. Then each would be Disposed when the entire ForEach call finishes. What really happens is that the localInit seems to be called once for each item, so I end up with hundreds of files. The writers are also getting disposed at the end of each item that is processed.

这表明发生了相同的事情:

This shows the same thing happening:

var vals = Enumerable.Range(0, 10000000).ToArray();
        long sum = 0;
        Parallel.ForEach(vals,
            new ParallelOptions { MaxDegreeOfParallelism = 8 },
            () => { Console.WriteLine("init " + Thread.CurrentThread.ManagedThreadId); return 0L; },
            (i, state, common) =>
            {
                Thread.Sleep(10);
                return common + i;
            },
                (common) => Interlocked.Add(ref sum, common));

我看到:

init 10
init 14
init 11
init 13
init 12
init 14
init 11
init 12
init 13
init 11
... // hundreds of lines over < 30 seconds
init 14
init 11
init 18
init 17
init 10
init 11
init 14
init 11
init 14
init 11
init 18

注意:如果我忽略了Thread.Sleep调用,有时似乎可以正确运行。它决定在我的PC上使用的4个线程分别只调用一次localInit。但是,并非每次都如此。

Note: if I leave out the Thread.Sleep call, it sometimes seems to function "correctly". localInit only gets called once each for the 4 threads that it decides to use on my pc. Not every time, however.

这是函数的预期行为吗?导致它执行此操作的幕后发生了什么?最后,什么是获得所需功能ThreadLocal的好方法?

Is this the desired behavior of the function? What's going on behind the scenes that causes it to do this? And lastly, what's a good way to get my desired functionality, ThreadLocal?

顺便说一下,这是在.NET 4.5上。

This is on .NET 4.5, by the way.

推荐答案

Parallel.ForEach 无法正常工作。请务必注意,该方法建立在 Task 类的顶部,并且 Task 线程不是1:1 。例如,您可以有10个任务在2个托管线程上运行。

Parallel.ForEach does not work as you think it does. It's important to note that the method is build on top of Task classes and that the relationship between Task and Thread is not 1:1. You can have, for example, 10 tasks that run on 2 managed threads.

尝试在方法主体中使用此行,而不是当前行:

Try using this line in your method body instead of the current one:

Console.WriteLine("ThreadId {0} -- TaskId {1} ",
                  Thread.CurrentThread.ManagedThreadId, Task.CurrentId);

您应该看到 ThreadId 可在许多不同任务中重复使用,以其唯一ID表示。如果您留下或增加了对 Thread.Sleep 的调用,则会看到更多信息。

You should see that the ThreadId will be reused across many different tasks, shown by their unique ids. You'll see this more if you left in, or increased, your call to Thread.Sleep.

(非常)关于 Parallel.ForEach 方法如何工作的基本思想是,它需要您的枚举创建一系列任务,这些任务将运行枚举的流程部分,这种方式完成工作很大程度上取决于输入。还有一些特殊的逻辑可以检查任务是否超过一定毫秒数而未完成。如果真是这样,那么可能会产生一个新任务来帮助减轻工作负担。

The (very) basic idea of how the Parallel.ForEach method works, is that it takes your enumerable creates a series of tasks that will run process sections of the enumeration, the way this is done depends a lot on the input. There is also some special logic that checks for the case of a task exceeding a certain number of milliseconds without completing. If that case is true, then a new task may be spawned to help relieve the work.

如果您查看了 localinit Parallel.ForEach 函数code> ,您会注意到它说返回每个_task _ 而不是每个线程的本地数据的初始状态

If you looked at the documentation for the localinit function in Parallel.ForEach, you'll notice that it says that it returns the initial state of the local data for each _task_, not each thread.

您可能会问,为什么要生成8个以上的任务。该答案与在 ParallelOptions.MaxDegreeOfParallelism

You might ask why there are more than 8 tasks being spawned. That answer is similar to the last, found in the documentation for ParallelOptions.MaxDegreeOfParallelism.


更改 MaxDegreeOfParallelism (默认)仅限制将使用多少并发任务。

Changing MaxDegreeOfParallelism from the default only limits how many concurrent tasks will be used.

此限制仅针对并发任务数,而不是对整个处理期间将要创建的任务数的硬性限制。正如我上面提到的,有时会生成单独的任务,这会导致多次调用 localinit 函数并将数百个文件写入磁盘。

This limit is only on the number of concurrent tasks, not a hard-limit on the number of tasks that will be created during the entire time it is processing. And as I mentioned above, there are times where a separate task will be spawned, which results in your localinit function being called multiple times and writing hundreds of files to disk.

写入磁盘肯定是有点延迟的操作,尤其是在使用同步I / O的情况下。当磁盘操作发生时,它会阻塞整个线程。 Thread.Sleep 也是如此。如果 Task 执行此操作,它将阻塞当前正在运行的线程,并且无法在其上运行其他任务。通常在这种情况下,调度程序会产生一个新的 Task 来帮助解决这一问题。

Writing to disk is certainly a operation with a bit of latency, particularly if you're using synchronous I/O. When the disk operation happens, it blocks the entire thread; the same happens with Thread.Sleep. If a Task does this, it will block the thread it is currently running on, and no other tasks can run on it. Usually in these cases, the scheduler will spawn a new Task to help pick up the slack.


最后,什么是获得所需功能ThreadLocal的好方法?

And lastly, what's a good way to get my desired functionality, ThreadLocal?

最重要的是,线程本地人不会使用 Parallel.ForEach 没有意义,因为您不处理线程;您正在处理任务。线程本地可以在任务之间共享,因为许多任务可以同时使用同一线程。此外,任务的线程本地线程可能会在执行过程中更改,因为调度程序可以抢先运行它,然后在另一个线程上继续执行,而该线程将具有不同的线程本地线程。

The bottom line is that thread locals don't make sense with Parallel.ForEach because you're not dealing with threads; you're dealing with tasks. A thread local could be shared between tasks because many tasks can use the same thread at the same time. Also, a task's thread local could change mid-execution, because the scheduler could preempt it from running and then continue its execution on a different thread, which would have a different thread local.

我不确定执行此操作的最佳方法,但是您可以依靠 localinit 函数传递所需的任何资源,仅允许使用资源一次只能在一个线程中使用。您可以使用 localfinally 将其标记为不再使用,从而可用于其他任务。这就是那些方法的目的所在;每个方法仅对每个产生的任务调用一次(请参见 Parallel.ForEach MSDN文档)。

I'm not sure the best way to do it, but you could rely on the localinit function to pass in whatever resource you'd like, only allowing a resource to be used in one thread at a time. You can use the localfinally to mark it as no longer in use and thus available for another task to acquire. This is what those methods were designed for; each method is only called once per task that is spawned (see the remarks section of the Parallel.ForEach MSDN documentation).

您还可以自己拆分工作,并创建自己的工作组线程并运行您的工作。但是,在我看来,这不是个好主意,因为 Parallel 类已经为您完成了繁重的工作。

You can also split the work yourself, and create your own set of threads and run your work. However, this is less idea, in my opinion, since the Parallel class already does this heavy lifting for you.

这篇关于为什么在Parallel.ForEach中每个线程多次调用localInit Func的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆