我怎样才能使这个 C# 循环更快? [英] How can I make this C# loop faster?

查看:85
本文介绍了我怎样才能使这个 C# 循环更快?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

执行摘要:如果您想继续使用 C#,下面 Reed 的回答是最快的.如果您愿意编组到 C++(我就是),这是一个更快的解决方案.

Executive Summary: Reed's answer below is the fastest if you want to stay in C#. If you're willing to marshal to C++ (which I am), that's a faster solution.

我在 C# 中有两个 55mb ushort 数组.我使用以下循环组合它们:

I have two 55mb ushort arrays in C#. I am combining them using the following loop:

float b = (float)number / 100.0f;
for (int i = 0; i < length; i++)
{
      image.DataArray[i] = 
          (ushort)(mUIHandler.image1.DataArray[i] + 
          (ushort)(b * (float)mUIHandler.image2.DataArray[i]));
}

这段代码,根据前后添加DateTime.Now调用,运行耗时3.5秒.我怎样才能让它更快?

This code, according to adding DateTime.Now calls before and afterwards, takes 3.5 seconds to run. How can I make it faster?

编辑:这里有一些代码,我认为,显示了问题的根源.在全新的 WPF 应用程序中运行以下代码时,我得到以下计时结果:

EDIT: Here is some code that, I think, shows the root of the problem. When the following code is run in a brand new WPF application, I get these timing results:

Time elapsed: 00:00:00.4749156 //arrays added directly
Time elapsed: 00:00:00.5907879 //arrays contained in another class
Time elapsed: 00:00:02.8856150 //arrays accessed via accessor methods

所以当数组直接遍历时,时间比数组在另一个对象或容器内要快得多.这段代码表明,不知何故,我正在使用访问器方法,而不是直接访问数组.即便如此,我似乎能够得到的最快速度也只有半秒.当我用 icc 运行 C++ 代码的第二个列表时,我得到:

So when arrays are walked directly, the time is much faster than if the arrays are inside of another object or container. This code shows that somehow, I'm using an accessor method, rather than accessing the arrays directly. Even so, the fastest I seem to be able to get is half a second. When I run the second listing of code in C++ with icc, I get:

Run time for pointer walk: 0.0743338

在这种情况下,C++ 的速度提高了 7 倍(使用 icc,不确定是否可以使用 msvc 获得相同的性能——我不熟悉那里的优化).有什么方法可以让 C# 接近 C++ 的性能水平,还是应该让 C# 调用我的 C++ 例程?

In this case, then, C++ is 7x faster (using icc, not sure if the same performance can be obtained with msvc-- I'm not as familiar with optimizations there). Is there any way to get C# near that level of C++ performance, or should I just have C# call my C++ routine?

清单 1,C# 代码:

Listing 1, C# code:

public class ArrayHolder
{
    int length;
    public ushort[] output;
    public ushort[] input1;
    public ushort[] input2;
    public ArrayHolder(int inLength)
    {
        length = inLength;
        output = new ushort[length];
        input1 = new ushort[length];
        input2 = new ushort[length];
    }

    public ushort[] getOutput() { return output; }
    public ushort[] getInput1() { return input1; }
    public ushort[] getInput2() { return input2; }
}


/// <summary>
/// Interaction logic for MainWindow.xaml
/// </summary>
public partial class MainWindow : Window
{
    public MainWindow()
    {
        InitializeComponent();


        Random random = new Random();

        int length = 55 * 1024 * 1024;
        ushort[] output = new ushort[length];
        ushort[] input1 = new ushort[length];
        ushort[] input2 = new ushort[length];

        ArrayHolder theArrayHolder = new ArrayHolder(length);

        for (int i = 0; i < length; i++)
        {
            output[i] = (ushort)random.Next(0, 16384);
            input1[i] = (ushort)random.Next(0, 16384);
            input2[i] = (ushort)random.Next(0, 16384);
            theArrayHolder.getOutput()[i] = output[i];
            theArrayHolder.getInput1()[i] = input1[i];
            theArrayHolder.getInput2()[i] = input2[i];
        }

        Stopwatch stopwatch = new Stopwatch(); 
        stopwatch.Start();
        int number = 44;
        float b = (float)number / 100.0f;
        for (int i = 0; i < length; i++)
        {
            output[i] =
                (ushort)(input1[i] +
                (ushort)(b * (float)input2[i]));
        } 
        stopwatch.Stop();

        Console.WriteLine("Time elapsed: {0}",
            stopwatch.Elapsed);
        stopwatch.Reset();

        stopwatch.Start();
        for (int i = 0; i < length; i++)
        {
            theArrayHolder.output[i] =
                (ushort)(theArrayHolder.input1[i] +
                (ushort)(b * (float)theArrayHolder.input2[i]));
        }
        stopwatch.Stop();

        Console.WriteLine("Time elapsed: {0}",
            stopwatch.Elapsed);
        stopwatch.Reset();

        stopwatch.Start();
        for (int i = 0; i < length; i++)
        {
            theArrayHolder.getOutput()[i] =
                (ushort)(theArrayHolder.getInput1()[i] +
                (ushort)(b * (float)theArrayHolder.getInput2()[i]));
        }
        stopwatch.Stop();

        Console.WriteLine("Time elapsed: {0}",
            stopwatch.Elapsed);
    }
}

清单 2,C++ 等效项://looptiming.cpp : 定义控制台应用程序的入口点.//

Listing 2, C++ equivalent: // looptiming.cpp : Defines the entry point for the console application. //

#include "stdafx.h"
#include <stdlib.h>
#include <windows.h>
#include <stdio.h>
#include <iostream>


int _tmain(int argc, _TCHAR* argv[])
{

    int length = 55*1024*1024;
    unsigned short* output = new unsigned short[length];
    unsigned short* input1 = new unsigned short[length];
    unsigned short* input2 = new unsigned short[length];
    unsigned short* outPtr = output;
    unsigned short* in1Ptr = input1;
    unsigned short* in2Ptr = input2;
    int i;
    const int max = 16384;
    for (i = 0; i < length; ++i, ++outPtr, ++in1Ptr, ++in2Ptr){
        *outPtr = rand()%max;
        *in1Ptr = rand()%max;
        *in2Ptr = rand()%max;
    }

    LARGE_INTEGER ticksPerSecond;
    LARGE_INTEGER tick1, tick2;   // A point in time
    LARGE_INTEGER time;   // For converting tick into real time


    QueryPerformanceCounter(&tick1);

    outPtr = output;
    in1Ptr = input1;
    in2Ptr = input2;
    int number = 44;
    float b = (float)number/100.0f;


    for (i = 0; i < length; ++i, ++outPtr, ++in1Ptr, ++in2Ptr){
        *outPtr = *in1Ptr + (unsigned short)((float)*in2Ptr * b);
    }
    QueryPerformanceCounter(&tick2);
    QueryPerformanceFrequency(&ticksPerSecond);

    time.QuadPart = tick2.QuadPart - tick1.QuadPart;

    std::cout << "Run time for pointer walk: " << (double)time.QuadPart/(double)ticksPerSecond.QuadPart << std::endl;

    return 0;
}

编辑 2: 在第二个示例中启用/QxHost 将时间减少到 0.0662714 秒.按照@Reed 的建议修改第一个循环让我回到

EDIT 2: Enabling /QxHost in the second example drops the time down to 0.0662714 seconds. Modifying the first loop as @Reed suggested gets me down to

已用时间:00:00:00.3835017

Time elapsed: 00:00:00.3835017

所以,对于滑块来说仍然不够快.那个时间是通过代码:

So, still not fast enough for a slider. That time is via the code:

        stopwatch.Start();
        Parallel.ForEach(Partitioner.Create(0, length),
         (range) =>
         {
             for (int i = range.Item1; i < range.Item2; i++)
             {
                 output[i] =
                     (ushort)(input1[i] +
                     (ushort)(b * (float)input2[i]));
             }
         });

        stopwatch.Stop();

EDIT 3 根据@Eric Lippert 的建议,我在发行版中重新运行了 C# 中的代码,并且不使用附加的调试器,而是将结果打印到对话框中.它们是:

EDIT 3 As per @Eric Lippert's suggestion, I've rerun the code in C# in release, and, rather than use an attached debugger, just print the results to a dialog. They are:

  • 简单数组:~0.273s
  • 包含的数组:~0.330s
  • 访问器数组:~0.345s
  • 并行阵列:~0.190s

(这些数字来自 5 次运行的平均值)

(these numbers come from a 5 run average)

所以并行解决方案肯定比我之前得到的 3.5 秒快,但仍然比使用非 icc 处理器可实现的 0.074 秒低一点.因此,似乎最快的解决方案是在发行版中进行编译,然后编组为 icc 编译的 C++ 可执行文件,这使得在这里可以使用滑块.

So the parallel solution is definitely faster than the 3.5 seconds I was getting before, but is still a bit under the 0.074 seconds achievable using the non-icc processor. It seems, therefore, that the fastest solution is to compile in release and then marshal to an icc-compiled C++ executable, which makes using a slider possible here.

编辑 4:@Eric Lippert 的另外三个建议:将 for 循环的内部从 length 更改为 array.length,使用双精度,并尝试不安全的代码.

EDIT 4: Three more suggestions from @Eric Lippert: change the inside of the for loop from length to array.length, use doubles, and try unsafe code.

对于这三个,现在是时候了:

For those three, the timing is now:

  • 长度:~0.274s
  • 双打,而不是浮点数:~0.290s
  • 不安全:~0.376s

到目前为止,并行解决方案是大赢家.虽然如果我可以通过着色器添加这些,也许我可以在那里看到某种加速......

So far, the parallel solution is the big winner. Although if I could add these via a shader, maybe I could see some kind of speedup there...

这是额外的代码:

        stopwatch.Reset();

        stopwatch.Start();

        double b2 = ((double)number) / 100.0;
        for (int i = 0; i < output.Length; ++i)
        {
            output[i] =
                (ushort)(input1[i] +
                (ushort)(b2 * (double)input2[i]));
        }

        stopwatch.Stop();
        DoubleArrayLabel.Content += "\t" + stopwatch.Elapsed.Seconds + "." + stopwatch.Elapsed.Milliseconds;
        stopwatch.Reset();

        stopwatch.Start();

        for (int i = 0; i < output.Length; ++i)
        {
            output[i] =
                (ushort)(input1[i] +
                (ushort)(b * input2[i]));
        }

        stopwatch.Stop();
        LengthArrayLabel.Content += "\t" + stopwatch.Elapsed.Seconds + "." + stopwatch.Elapsed.Milliseconds;
        Console.WriteLine("Time elapsed: {0}",
            stopwatch.Elapsed);
        stopwatch.Reset();

        stopwatch.Start();
        unsafe
        {
            fixed (ushort* outPtr = output, in1Ptr = input1, in2Ptr = input2){
                ushort* outP = outPtr;
                ushort* in1P = in1Ptr;
                ushort* in2P = in2Ptr;
                for (int i = 0; i < output.Length; ++i, ++outP, ++in1P, ++in2P)
                {
                    *outP = (ushort)(*in1P + b * (float)*in2P);
                }
            }
        }

        stopwatch.Stop();
        UnsafeArrayLabel.Content += "\t" + stopwatch.Elapsed.Seconds + "." + stopwatch.Elapsed.Milliseconds;
        Console.WriteLine("Time elapsed: {0}",
            stopwatch.Elapsed);

推荐答案

这应该是完全可并行化的.但是,鉴于每个元素需要完成的工作量很少,因此您需要格外小心地处理此问题.

This should be perfectly parallelizable. However, given the small amount of work being done per element, you'll need to handle this with extra care.

执行此操作的正确方法(在 .NET 4 中)是将 Parallel.ForEach 与 Partitioner 结合使用:

The proper way to do this (in .NET 4) would be to use Parallel.ForEach in conjunction with a Partitioner:

float b = (float)number / 100.0f;
Parallel.ForEach(Partitioner.Create(0, length), 
(range) =>
{
   for (int i = range.Item1; i < range.Item2; i++)
   {
      image.DataArray[i] = 
          (ushort)(mUIHandler.image1.DataArray[i] + 
          (ushort)(b * (float)mUIHandler.image2.DataArray[i]));
   }
});

这将有效地在系统中可用的处理核心之间分配工作,并且如果您有多个核心,应该会提供不错的加速.

This will efficiently partition the work across available processing cores in your system, and should provide a decent speedup if you have multiple cores.

话虽如此,这充其量只能通过系统中的内核数量来加速此操作.如果您需要进一步加快速度,您可能需要恢复到并行化和不安全代码的混合.到那时,可能值得考虑尝试实时呈现的替代方法.

That being said, this will, at best, only speed up this operation by the number of cores in your system. If you need to speed it up more, you'll likely need to revert to a mix of parallelization and unsafe code. At that point, it might be worth thinking about alternatives to trying to present this in real time.

这篇关于我怎样才能使这个 C# 循环更快?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆