为什么每个线程在 Parallel.ForEach 中多次调用 localInit Func [英] Why does the localInit Func get called multiple times per thread in Parallel.ForEach

查看:20
本文介绍了为什么每个线程在 Parallel.ForEach 中多次调用 localInit Func的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写一些代码来处理大量数据,我认为让 Parallel.ForEach 为它创建的每个线程创建一个文件会很有用,这样输出就不需要同步(至少对我来说)).

I was writing some code to process a lot of data, and I thought it would be useful to have Parallel.ForEach create a file for each thread it creates so the output doesn't need to be synchronized (by me at least).

看起来像这样:

Parallel.ForEach(vals,
    new ParallelOptions { MaxDegreeOfParallelism = 8 },
    ()=>GetWriter(), // returns a new BinaryWriter backed by a file with a guid name
    (item, state, writer)=>
    {
        if(something)
        {
            state.Break();
            return writer;
        }
        List<Result> results = new List<Result>();

        foreach(var subItem in item.SubItems)
            results.Add(ProcessItem(subItem));

        if(results.Count > 0)
        {
            foreach(var result in results)
                result.Write(writer);
        }
        return writer;
    },
    (writer)=>writer.Dispose());

我期望发生的是最多会创建 8 个文件,并且会在整个运行时持续存在.然后,当整个 ForEach 调用完成时,每个都将被处理.真正发生的是 localInit 似乎为每个项目调用一次,所以我最终得到了数百个文件.编写器也会在处理的每个项目的末尾处理.

What I expected to happen was that up to 8 files would be created and would persist through the entire run time. Then each would be Disposed when the entire ForEach call finishes. What really happens is that the localInit seems to be called once for each item, so I end up with hundreds of files. The writers are also getting disposed at the end of each item that is processed.

这显示了同样的事情发生:

This shows the same thing happening:

var vals = Enumerable.Range(0, 10000000).ToArray();
        long sum = 0;
        Parallel.ForEach(vals,
            new ParallelOptions { MaxDegreeOfParallelism = 8 },
            () => { Console.WriteLine("init " + Thread.CurrentThread.ManagedThreadId); return 0L; },
            (i, state, common) =>
            {
                Thread.Sleep(10);
                return common + i;
            },
                (common) => Interlocked.Add(ref sum, common));

我明白了:

init 10
init 14
init 11
init 13
init 12
init 14
init 11
init 12
init 13
init 11
... // hundreds of lines over < 30 seconds
init 14
init 11
init 18
init 17
init 10
init 11
init 14
init 11
init 14
init 11
init 18

注意:如果我省略 Thread.Sleep 调用,它有时似乎正确"运行.localInit 只为它决定在我的电脑上使用的 4 个线程调用一次.然而,并非每次都如此.

Note: if I leave out the Thread.Sleep call, it sometimes seems to function "correctly". localInit only gets called once each for the 4 threads that it decides to use on my pc. Not every time, however.

这是函数所需的行为吗?导致它这样做的幕后发生了什么?最后,获得我想要的功能 ThreadLocal 的好方法是什么?

Is this the desired behavior of the function? What's going on behind the scenes that causes it to do this? And lastly, what's a good way to get my desired functionality, ThreadLocal?

顺便说一下,这是在 .NET 4.5 上.

This is on .NET 4.5, by the way.

推荐答案

Parallel.ForEach 不像你想象的那样工作.需要注意的是,该方法建立在 Task 类之上,并且 TaskThread 之间的关系不是 1:1.例如,您可以拥有 10 个在 2 个托管线程上运行的任务.

Parallel.ForEach does not work as you think it does. It's important to note that the method is build on top of Task classes and that the relationship between Task and Thread is not 1:1. You can have, for example, 10 tasks that run on 2 managed threads.

尝试在你的方法体中使用这一行而不是当前的:

Try using this line in your method body instead of the current one:

Console.WriteLine("ThreadId {0} -- TaskId {1} ",
                  Thread.CurrentThread.ManagedThreadId, Task.CurrentId);

您应该看到 ThreadId 将在许多不同的任务中重复使用,由它们的唯一 ID 显示.如果您保留或增加对 Thread.Sleep 的调用,您会看到更多.

You should see that the ThreadId will be reused across many different tasks, shown by their unique ids. You'll see this more if you left in, or increased, your call to Thread.Sleep.

Parallel.ForEach 方法如何工作的(非常)基本思想是,它需要您的可枚举创建一系列将运行枚举的进程部分的任务,这是完成的方式很大程度上取决于输入.还有一些特殊的逻辑可以检查任务是否超过了一定的毫秒数而没有完成.如果这种情况属实,那么可能会产生一个新任务来帮助减轻工作量.

The (very) basic idea of how the Parallel.ForEach method works, is that it takes your enumerable creates a series of tasks that will run process sections of the enumeration, the way this is done depends a lot on the input. There is also some special logic that checks for the case of a task exceeding a certain number of milliseconds without completing. If that case is true, then a new task may be spawned to help relieve the work.

如果您查看了 Parallel.ForEach,你会注意到它说它返回每个_task_的本地数据的初始状态,而不是每个线程.

If you looked at the documentation for the localinit function in Parallel.ForEach, you'll notice that it says that it returns the initial state of the local data for each _task_, not each thread.

您可能会问为什么生成了 8 个以上的任务.该答案与上一个类似,可在 ParallelOptions.MaxDegreeOfParallelism.

You might ask why there are more than 8 tasks being spawned. That answer is similar to the last, found in the documentation for ParallelOptions.MaxDegreeOfParallelism.

MaxDegreeOfParallelism 更改为默认值只会限制将使用的 并发 任务的数量.

Changing MaxDegreeOfParallelism from the default only limits how many concurrent tasks will be used.

此限制仅针对并发任务的数量,而不是对在整个处理过程中将创建的任务数量的硬限制.正如我上面提到的,有时会产生一个单独的任务,这会导致您的 localinit 函数被多次调用并将数百个文件写入磁盘.

This limit is only on the number of concurrent tasks, not a hard-limit on the number of tasks that will be created during the entire time it is processing. And as I mentioned above, there are times where a separate task will be spawned, which results in your localinit function being called multiple times and writing hundreds of files to disk.

写入磁盘肯定是一个有一点延迟的操作,尤其是在您使用同步 I/O 时.当磁盘操作发生时,它阻塞了整个线程;Thread.Sleep 也会发生同样的情况.如果 Task 这样做,它将阻塞当前正在运行的线程,并且没有其他任务可以在其上运行.通常在这些情况下,调度程序会产生一个新的 Task 来帮助解决问题.

Writing to disk is certainly a operation with a bit of latency, particularly if you're using synchronous I/O. When the disk operation happens, it blocks the entire thread; the same happens with Thread.Sleep. If a Task does this, it will block the thread it is currently running on, and no other tasks can run on it. Usually in these cases, the scheduler will spawn a new Task to help pick up the slack.

最后,获得所需功能的好方法是什么,ThreadLocal?

And lastly, what's a good way to get my desired functionality, ThreadLocal?

最重要的是,线程局部变量对 Parallel.ForEach 没有意义,因为您没有处理线程;你正在处理任务.本地线程可以在任务之间共享,因为许多任务可以同时使用同一个线程.此外,任务的本地线程可能会在执行过程中发生变化,因为调度程序可以抢占它的运行,然后在不同的线程上继续执行,该线程将具有不同的本地线程.

The bottom line is that thread locals don't make sense with Parallel.ForEach because you're not dealing with threads; you're dealing with tasks. A thread local could be shared between tasks because many tasks can use the same thread at the same time. Also, a task's thread local could change mid-execution, because the scheduler could preempt it from running and then continue its execution on a different thread, which would have a different thread local.

我不确定这样做的最佳方法,但是您可以依靠 localinit 函数传入您想要的任何资源,只允许在一个线程中使用资源一次.您可以使用 localfinally 将其标记为不再使用,因此可用于其他任务获取.这就是这些方法的设计目的;每个方法只对每个生成的任务调用一次(请参阅 Parallel.ForEach MSDN 文档).

I'm not sure the best way to do it, but you could rely on the localinit function to pass in whatever resource you'd like, only allowing a resource to be used in one thread at a time. You can use the localfinally to mark it as no longer in use and thus available for another task to acquire. This is what those methods were designed for; each method is only called once per task that is spawned (see the remarks section of the Parallel.ForEach MSDN documentation).

您也可以自己拆分工作,创建自己的线程集并运行您的工作.然而,在我看来,这不是一个好主意,因为 Parallel 类已经为您完成了这项繁重的工作.

You can also split the work yourself, and create your own set of threads and run your work. However, this is less idea, in my opinion, since the Parallel class already does this heavy lifting for you.

这篇关于为什么每个线程在 Parallel.ForEach 中多次调用 localInit Func的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆