在同一查询中使用AsParallel()和AsSequential()的影响?C# [英] Impact of using AsParallel() and AsSequential() in the same query? C#

查看:66
本文介绍了在同一查询中使用AsParallel()和AsSequential()的影响?C#的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在其中一本书中浏览了PLINQ,上面写着:

I was going through PLINQ in one of the books and it said:

如果您有一个复杂的查询可以从并行处理中受益但也有一些应按顺序完成的部分,您可以使用AsSequential可以阻止并行处理您的查询.

If you have a complex query that can benefit from parallel processing but also has some parts that should be done sequentially, you can use the AsSequential to stop your query from being processed in parallel.

例如:

var parallelResult = numbers.AsParallel().AsOrdered()
    .Where(i => i % 2 == 0).AsSequential();

我想了解为什么允许使用它,并且对结果有什么影响?它是并行运行的吗?它按顺序运行吗?现在没有任何意义.

I want to understand why is it allowed and what is the impact on the result? Is it running parallel? Is it running sequential? It isn't making any sense right now.

推荐答案

您可以将LINQ查询概念化为具有单个执行计划的原子构造,但是将其概念化是由多个数据流块组成的管道可能会更有用.每个块的输出将成为数据流中下一个块的输入,并且一旦这些块可用,这些块就会同时处理它们.以下一个查询为例,该查询由两个块"组成.由两个 Select 运算符表示.第一块配置为一次(并行)处理3个项目,第二块配置为顺序处理每个项目.对于并行块,每个项目的处理持续时间为1000毫秒,对于顺序块,每个项目的处理持续时间为500毫秒:

You may conceptualize a LINQ query as an atomic construct with a single execution plan, but it may be more helpful to conceptualize is as a pipeline consisting of multiple dataflow blocks. The output of each block becomes the input of the next block in the dataflow, and the blocks are processing items concurrently, as soon as they become available. Take a look for example to the next query, consisting of two "blocks" represented by the two Select operators. The first block is configured to process 3 items at a time (in parallel), while to second block is configured to process each item sequentially. The processing duration of each item is 1000 msec for the parallel block, and 500 msec for the sequential block:

var results = Partitioner
    .Create(Enumerable.Range(1, 10), EnumerablePartitionerOptions.NoBuffering)
    .AsParallel()
    .AsOrdered()
    .WithDegreeOfParallelism(3)
    .WithMergeOptions(ParallelMergeOptions.NotBuffered)
    .Select(x =>
    {
        Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}"
            + $" [{Thread.CurrentThread.ManagedThreadId}] Parallel #{x}");
        Thread.Sleep(1000); // Simulate some CPU-bound work
        return x;
    })
    .AsSequential()
    .Select(x =>
    {
        Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}"
            + $" [{Thread.CurrentThread.ManagedThreadId}] Sequential #{x}");
        Thread.Sleep(500); // Simulate some CPU-bound work
        return x;
    })
    .ToArray();
Console.WriteLine($"Results: {String.Join(", ", results)}");

如果运行此代码,您将得到如下输出:

If you run this code you'll get an output like this:

08:32:17.628 [4] Parallel #2
08:32:17.628 [5] Parallel #1
08:32:17.628 [6] Parallel #3
08:32:18.642 [6] Parallel #5
08:32:18.642 [5] Parallel #4
08:32:18.644 [4] Parallel #6
08:32:18.651 [1] Sequential #1
08:32:19.644 [6] Parallel #7
08:32:19.645 [4] Parallel #8
08:32:19.646 [5] Parallel #9
08:32:19.654 [1] Sequential #2
08:32:20.156 [1] Sequential #3
08:32:20.648 [4] Parallel #10
08:32:20.658 [1] Sequential #4
08:32:21.161 [1] Sequential #5
08:32:21.663 [1] Sequential #6
08:32:22.164 [1] Sequential #7
08:32:22.672 [1] Sequential #8
08:32:23.173 [1] Sequential #9
08:32:23.675 [1] Sequential #10
Results: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10

在完成所有并行处理之前,请注意如何开始顺序处理.为了达到此效果,我使用了配置选项 EnumerablePartitionerOptions.NoBuffering ParallelMergeOptions.NotBuffered ,以防止第一个块缓冲其输入和输出.

Notice how the sequential processing is already started before all parallel processing has been completed. To achieve this effect I used the configuration options EnumerablePartitionerOptions.NoBuffering and ParallelMergeOptions.NotBuffered, to prevent the first block from buffering its input and output.

为完整起见,请使用

For completeness lets rewrite this query using the TPL Dataflow library. The code becomes more verbose and less fluent, but the control of execution becomes more precise, and also asynchronous workflows become available (PLINQ is not async-friendly):

var block1 = new TransformBlock<int, int>(async x =>
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}"
        + $" [{Thread.CurrentThread.ManagedThreadId}] Parallel #{x}");
    await Task.Delay(1000); // Simulate some I/O operation
    return x;
}, new ExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 3,
    EnsureOrdered = true // redundant since EnsureOrdered is the default
});

var block2 = new TransformBlock<int, int>(async x =>
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}"
        + $" [{Thread.CurrentThread.ManagedThreadId}] Sequential #{x}");
    await Task.Delay(500); // Simulate some I/O operation
    return x;
}); // MaxDegreeOfParallelism = 1 is the default

block1.LinkTo(block2, new DataflowLinkOptions() { PropagateCompletion = true });

// Feeding the first block
foreach (var x in Enumerable.Range(1, 10))
{
    await block1.SendAsync(x);
}
block1.Complete();

var results = new List<int>(); // Collecting the results is a bit painful
while (await block2.OutputAvailableAsync())
{
    while (block2.TryReceive(out var result))
    {
        results.Add(result);
    }
}
await block2.Completion;
Console.WriteLine($"Results: {String.Join(", ", results)}");

输出:

08:59:25.102 [6] Parallel #2
08:59:25.102 [4] Parallel #1
08:59:25.102 [7] Parallel #3
08:59:26.127 [7] Parallel #4
08:59:26.129 [6] Parallel #5
08:59:26.143 [4] Parallel #6
08:59:26.147 [5] Sequential #1
08:59:26.648 [5] Sequential #2
08:59:27.129 [6] Parallel #7
08:59:27.129 [7] Parallel #8
08:59:27.144 [4] Parallel #9
08:59:27.149 [5] Sequential #3
08:59:27.650 [5] Sequential #4
08:59:28.131 [6] Parallel #10
08:59:28.152 [5] Sequential #5
08:59:28.653 [5] Sequential #6
08:59:29.155 [5] Sequential #7
08:59:29.659 [5] Sequential #8
08:59:30.160 [5] Sequential #9
08:59:30.674 [5] Sequential #10
Results: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10

这篇关于在同一查询中使用AsParallel()和AsSequential()的影响?C#的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆