如何使用并行的ForEach当地的初始化工作? [英] How does local initialization with Parallel ForEach work?

查看:148
本文介绍了如何使用并行的ForEach当地的初始化工作?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我不确定使用在Parallel.ForEach当地的初始化函数,因为在MSDN文章中所述:的 http://msdn.microsoft.com/en-us/library/dd997393.aspx

I am unsure about the use of the local init function in Parallel.ForEach, as described in the msdn article: http://msdn.microsoft.com/en-us/library/dd997393.aspx

Parallel.ForEach<int, long>(nums, // source collection
   () => 0, // method to initialize the local variable
   (j, loop, subtotal) => // method invoked by the loop on each iteration
   {
      subtotal += nums[j]; //modify local variable 
      return subtotal; // value to be passed to next iteration
   },...

如何()=> 0初始化什么?什么是变量的名称和我怎么可以用它在循环逻辑

How does () => 0 initialize anything? What's the name of the variable and how can I use it in the loop logic?

推荐答案

通过参照的following~~V的 Parallel.ForEach 静态扩展方法超载

With reference to the following overload of the Parallel.ForEach static extension method:

public static ParallelLoopResult ForEach<TSource, TLocal>(
    IEnumerable<TSource> source,
    Func<TLocal> localInit,
    Func<TSource, ParallelLoopState, TLocal, TLocal> taskBody,
    Action<TLocal> localFinally
)



在您的具体例子

行:

() => 0, // method to initialize the local variable



仅仅是一个lambda(匿名函数),这将返回恒整数为零。这是拉姆达与 localInit 参数 Parallel.ForEach 通过 - 因为在lambda返回一个整数,它具有类型 Func键< INT> 键,键入 TLocal 可以推断为 INT 由编译器(同样, TSource 可从作为参数传递的集合类型推断

is simply a lambda (anonymous function) which will return the constant integer zero. This lambda is passed as the localInit parameter to Parallel.ForEach - since the lambda returns an integer, it has type Func<int> and type TLocal can be inferred as int by the compiler (similarly, TSource can be inferred from the type of the collection passed as parameter source)

返回值(0),然后作为第三个参数(名为小计)通过 taskBody 函数功能。这个(0)用于体内循环的初始种子:

The return value (0) is then passed as the 3rd parameter (named subtotal) to the taskBody Func. This (0) is used the initial seed for the body loop:

(j, loop, subtotal) =>
{
    subtotal += nums[j]; //modify local variable 
    return subtotal;     // value to be passed to next iteration
}

此第二拉姆达(传递给 taskBody )被称为N次,其中N是分配给由TPL分区此任务的项数。

This second lambda (passed to taskBody) is called N times, where N is the number of items allocated to this task by the TPL partitioner.

第二个 taskBody 拉姆达以后每次通话将通过小计的新值,有效地计算运行的部分总,完成这个任务。后分配给该任务的所有物品已被添加,第三和最后, localFinally 函数参数将被调用,再次路过小计从 taskBody 。因为几个这样的任务,将并行运行,也将必须对所有的部分总数加起来到最后的'盛大'总的最后一个步骤。但是,因为多个并行任务(在不同的线程)可能是争用 grandTotal 变量,重要的是改变它在一个线程安全的方式完成。

Each subsequent call to the second taskBody lambda will pass the new value of subTotal, effectively calculating a running partial total, for this Task. After all the items assigned to this task have been added, the third and last, localFinally function parameter will be called, again, passing the final value of the subtotal returned from taskBody. Because several such tasks will be operating in parallel, there will also need to be a final step to add up all the partial totals into the final 'grand' total. However, because multiple concurrent tasks (on different Threads) could be contending for the grandTotal variable, it is important that changes to it are done in a thread-safe manner.

(我已经改变了MSDN变量的名称,以使其更清晰)

(I've changed names of the MSDN variables to make it more clear)

long grandTotal = 0;
Parallel.ForEach(nums,            // source collection
  () => 0,                        // method to initialize the local variable
  (j, loop, subtotal) =>          // method invoked by the loop on each iteration
  {
     subtotal += nums[j];         // modify local variable 
     return subtotal;             // value to be passed to next iteration
  },
  // The final value of subtotal is passed to the localFinally function parameter
  (subtotal) => Interlocked.Add(ref grandTotal, subtotal)

在常规

localInit /身体/ localFinally 的Parallel.For /并行的过载.ForEach 允许的一次每个任务的初始化和清理代码来运行,在此之前,和之后(分别)的 taskBody 迭代由任务执行的。

The localInit / body / localFinally overloads of Parallel.For / Parallel.ForEach allow once-per task initialization and cleanup code to be run, before, and after (respectively) the taskBody iterations are performed by the Task.

(注意到对于范围/可枚举传递到并行 / Foreach源将被划分为的IEnumerable<> ,每一个都会被分配一个任务)

(Noting the For range / Enumerable passed to the parallel For / Foreach will be partitioned into batches of IEnumerable<>, each of which will be allocated a Task)

每个任务 localInit 将被调用一次,代码将被重复调用,每一次在批处理项( 0..N 次), localFinally 完成后,将被调用一次。

In each Task, localInit will be called once, the body code will be repeatedly invoked, once per item in batch (0..N times), and localFinally will be called once upon completion.

另外,你可以通过在任务期间所需的任何状态(即在 taskBody localFinally 通过代表)的通用 TLocal localInit Func键返回值 - 我已经叫这个变量低于 taskLocals

In addition, you can pass any state required for the duration of the task (i.e. to the taskBody and localFinally delegates) via a generic TLocal return value from the localInit Func - I've called this variable taskLocals below.

的localInit常见用途:

Common uses of "localInit":


  • 创建和初始化的循环体需要昂贵的资源,如数据库连接或Web服务连接。

  • 保持任务局部变量来保存(无竞争)运行总计或集合

  • 如果您需要从 localInit 到 taskBody localFinally ,你可以使用强类型类的,一个元组LT; ,,> 或者,如果你只使用lambda表达式在 localInit / taskBody / localFinally ,也可以通过一个匿名类传递数据。注意:如果使用从 localInit 重返共享多个任务之间的引用类型,那你就需要考虑这个对象的线程安全性 - 不变性最好

  • Creating and initializing expensive resources needed by the loop body, like a database connection or a web service connection.
  • Keeping Task-Local variables to hold (uncontended) running totals or collections
  • If you need to return multiple objects from localInit to the taskBody and localFinally, you can make use of a strongly typed class, a Tuple<,,> or, if you use only lambdas for the localInit / taskBody / localFinally, you can also pass data via an anonymous class. Note if you use the return from localInit to share a reference type among multiple tasks, that you will need to consider thread safety on this object - immutability is preferable.

localFinally行动的常见用途:

Common uses of the "localFinally" Action:

    用在 taskLocals (如数据库连接,文件句柄
  • 要释放的资源,如 IDisposables ,Web服务客户端等)

  • 要汇总/合并/减少每个任务完成的工作回到共享变量(S)。这些共享变量会争辩,所以线程安全是一个问题:


    • 例如。 Interlocked.Increment 的原始类型,如整数

    • 锁定或类似的会需要写操作

    • 请使用的并发集合,以节省时间和精力。

    • To release resources such as IDisposables used in the taskLocals (e.g. database connections, file handles, web service clients, etc)
    • To aggregate / combine / reduce the work done by each task back into shared variable(s). These shared variables will be contended, so thread safety is a concern:
      • e.g. Interlocked.Increment on primitive types like integers
      • lock or similar will be required for write operations
      • Make use of the concurrent collections to save time and effort.

      taskBody 紧张循环操作的一部分 - 你要优化这个性能。

      The taskBody is the tight part of the loop operation - you'll want to optimize this for performance.

      这是所有最好的一个注释过的例子总结:

      This is all best summarized with a commented example:

      public void MyParallelizedMethod()
      {
          // Shared variable. Not thread safe
          var itemCount = 0; 
      
          Parallel.For(myEnumerable, 
          // localInit - called once per Task.
          () => 
          {
             // Local `task` variables have no contention 
             // since each Task can never run by multiple threads concurrently
             var sqlConnection = new SqlConnection("connstring...");
             sqlConnection.Open();
      
             // This is the `task local` state we wish to carry for the duration of the task
             return new 
             { 
                Conn = sqlConnection,
                RunningTotal = 0
             }
          },
          // Task Body. Invoked once per item in the batch assigned to this task
          (item, loopState, taskLocals) =>
          {
            // ... Do some fancy Sql work here on our task's independent connection
            using(var command = taskLocals.Conn.CreateCommand())
            using(var reader = command.ExecuteReader(...))
            {
              if (reader.Read())
              {
                 // No contention for `taskLocal`
                 taskLocals.RunningTotal += Convert.ToInt32(reader["countOfItems"]);
              }
            }
            // The same type of our `taskLocal` param must be returned from the body
            return taskLocals;
          },
          // LocalFinally called once per Task after body completes
          // Also takes the taskLocal
          (taskLocals) =>
          {
             // Any cleanup work on our Task Locals (as you would do in a `finally` scope)
             if (taskLocals.Conn != null)
               taskLocals.Conn.Dispose();
      
             // Do any reduce / aggregate / synchronisation work.
             // NB : There is contention here!
             Interlocked.Add(ref itemCount, taskLocals.RunningTotal);
          }
      

      和更多的例子:

      例的每个任务无竞争的字典

      < A HREF =http://stackoverflow.com/questions/27774265/c-sharp-multiple-parallel-inserts-in-database>每个任务的数据库连接

      这篇关于如何使用并行的ForEach当地的初始化工作?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆