并行处理Azure blob和批量复制到SQL数据库的设计帮助.C# [英] Design help for parallel processing Azure blob and bulk copy to SQL database. C#

查看:47
本文介绍了并行处理Azure blob和批量复制到SQL数据库的设计帮助.C#的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要从Azure存储中获取blob文件,对其进行读取,获取数据并对其进行处理,然后将其存储到数据库中.从Blob提取的数据数量很多,即每个文件大约有4万条记录.文件夹中有70个这样的文件.

I have a requirement to fetch blob files from Azure storage, read through them, get the data and process it, and store it into a database. The number of data fetched from blob is high, i.e. around 40K records per file. There are 70 files like this in a folder.

这是我的设计方式:

  • 我在具有最大并行度4的blob文件列表上使用 Parallel.Foreach .
  • 在每个循环中,我获取一个blob流( OpenRead 方法),对其进行读取并填充一个数据表.如果数据表大小为10000,我将调用 SqlBulkCopy 并将数据插入数据库.
  • I use Parallel.Foreach on list of blob files with max parallelism 4.
  • In each loop, I fetch stream a blob (OpenRead method), read through it and fill a datatable. If the datatable size is 10000, I will call SqlBulkCopy and insert the data into the database.

在blob的一个文件夹中,有70个文件.

In one folder of blob there are 70 files.

Parallel.Foreach {
    // Stream blob file
    // Create a datatable
    foreach item in file
    {
        AddToDatatable

        if(datatable > 5000)
        {
             BulkCopy to DB.
             Clear datatable
        } 
    }

    // Dispose datatable
} 

我发现一些观察结果是,当我增加并行计数时,处理一个文件增加所花费的时间.是因为我要同时打开多个Blob流吗?同样,多并行导致一次将更多数据存储在内存中.

Some observations I found is, when I increase the parallel count the time taken to process one file increase. Is it because I'm opening multiple blob stream in parallel? Also multiple parallel causes more data to be stored in memory at a time.

我想知道两件事:

  1. 我想尝试一种不同的设计,可以保留一个数据表并从并行的foreach中填充它.然后,如果达到10K记录,我应该将其存储在DB中并清除.我不知道如何实现.

  1. I would like to try a different design where I can keep a single datatable and fill it from the parallel foreach. Then if it reaches 10K records, I should store in DB and clear. I don't know how to implement it.

如果有一种更好的方法来更快地处理文件.

If there's a better approach in terms of processing the files faster.

推荐答案

您当前的方法是很合逻辑的.但是,这并不是最佳选择,因为每个并行工作流程都由异构作业组成,这些作业没有与其他工作流程协调.例如,在给定的时刻,所有四个并行工作流都可能从Azure中获取数据,在另一时刻,所有四个并行工作流都从原始数据构造数据表,而在另一个时刻,所有四个并行工作流都在等待数据库的响应.

Your current approach is quite logical. It is not optimal though, because each parallel workflow is composed of heterogeneous jobs, that are not coordinated with the other workflows. For example it is entirely possible that at a given moment all four parallel workflows are fetching data from Azure, at another moment all four are constructing datatables from raw data, and another moment all four are waiting for a response from the database.

所有这些异构作业具有不同的特征.例如,与数据库的交互可能无法并行进行,向数据库发送4个并发的 SqlBulkCopy 命令实际上可能比一个接一个地发送它们慢.另一方面,在内存中创建数据表可能是高度可并行化的,并且从Azure读取数据可能只会稍微受益于并行性(因为瓶颈可能是Internet连接的速度,而不是Azure服务器的速度).可以肯定的是,只要确保在任何给定时刻所有异构作业都在进行中,就可以在2x-3x之间实现性能提升.与简单的数据相反,这称为任务并行并行性(您当前的设置).

All these heterogeneous jobs have different characteristics. For example the interaction with the database may not be parallelizable, and sending 4 concurrent SqlBulkCopy commands to the database may be actually slower than sending them the one after the other. On the other hand creating datatables in memory is probably highly parallelizable, and fetching data from Azure may be benefited by parallelism only slightly (because the bottleneck could be the speed of your internet connection, and not the speed of the Azure servers). It is quite certain though that you could achieve a performance boost between 2x-3x by just making sure that at any given moment all heterogeneous jobs are in progress. This is called task-parallelism, in contrast to the simpler data-parallelism (your current setup).

要实现任务并行,您需要创建一个管道,数据从一个处理块流向下一个处理块,直到到达最后一个块.在您的情况下,您可能需要3个方块:

To achieve task-parallelism you need to create a pipeline, where the data are flowing from the one processing block to the next, until they reach the final block. In your case you probably need 3 blocks:

  1. 从Azure下载文件并将其拆分为原始记录.
  2. 解析记录并将解析后的数据推送到数据表中.
  3. 将数据表发送到数据库进行存储.

将单个记录从第一个块发送到第二个块可能不是最佳的,因为并行具有开销,并且工作负载越精细,它创建的开销就越大.因此,理想情况下,您需要对工作负载进行分块,然后将记录批处理到数组,然后再将它们发送到下一个块.所有这些都可以使用专门针对此类工作而设计的出色工具 PLINQ 库.

Sending single records from the first block to the second block may not be optimal, because the parallelism has overhead, and the more granular the workload the more overhead it creates. So ideally you would need to chunkify the workload, and batch the records to arrays before sending them to the next block. All this can be implemented with a great tool that is designed for exactly this kind of job, the TPL Dataflow library. It has blocks for transforming, batching, unbatching and whatnot. It is also very flexible and feature-rich regarding the options it offers. But since it has some learning curve, I have something more familiar to suggest as the infrastructure for the pipeline: the PLINQ library.

每当将 AsParallel 运算符添加到查询中时,就会启动一个新的处理块.为了强制数据尽可能快地流到下一个块,需要 WithMergeOptions(ParallelMergeOptions.NotBuffered)运算符.要控制并行度,请使用 WithDegreeOfParallelism ,并且要使它们保持原始顺序,请使用 AsOrdered .为了方便起见,让我们将所有这些合并在一个扩展方法中,以避免一遍又一遍地重复它们:

Any time you add the AsParallel operator to a query, a new processing block is started. To force the data to flow to the next block as fast as possible, the WithMergeOptions(ParallelMergeOptions.NotBuffered) operator is needed. For controlling the degree of parallelism there is the WithDegreeOfParallelism, and to keep them in the original order there is the AsOrdered. Lets combine all these in a single extension method for convenience, to avoid repeating them over and over again:

public static ParallelQuery<TSource> BeginPipelineBlock<TSource>(
    this IEnumerable<TSource> source, int degreeOfParallelism)
{
    return Partitioner
        .Create(source, EnumerablePartitionerOptions.NoBuffering)
        .AsParallel()
        .AsOrdered()
        .WithDegreeOfParallelism(degreeOfParallelism)
        .WithMergeOptions(ParallelMergeOptions.NotBuffered);
}

使用Partitioner 的原因" rel ="nofollow noreferrer> NoBuffering 用于确保PLINQ以其自然顺序枚举 source ,一次枚举一项.没有它,PLINQ将使用一些不适合这种用法的精美分区策略.

The reason for the Partitioner configured with NoBuffering is for ensuring that the PLINQ will enumerate the source in its natural order, one item at a time. Without it the PLINQ utilizes some fancy partitioning strategies, that are not suitable for this usage.

现在您的管道可以这样流畅地构建:

Now your pipeline can be constructed fluently like this:

files
    .BeginPipelineBlock(degreeOfParallelism: 2)
    .SelectMany(file => DownloadFileRecords(file))
    .Buffer(1000)
    .BeginPipelineBlock(degreeOfParallelism: 3)
    .Select(batch => CreateDataTable(batch))
    .BeginPipelineBlock(degreeOfParallelism: 1)
    .ForAll(dataTable => SaveDataTable(dataTable));

Buffer 运算符存在于 System.Interactive 包,并将单个记录合并为批处理:

The Buffer operator exists in the System.Interactive package, and combines single records into batches:

public static IEnumerable<IList<TSource>> Buffer<TSource>(
    this IEnumerable<TSource> source, int count);

名为 Batch MoreLinq 包中存在具有相同功能的a>.如果您不希望依赖,则可以获取源代码并将其直接嵌入到您的项目中.

An operator named Batch with the same functionality exists in the MoreLinq package. If you don't want the dependency you can grab the source code and embed it directly to your project.

重要提示::如果您使用上述技术来构建管道,则应避免使用 degreeOfParallelism:1 配置两个连续的块.这是因为PLINQ是如何工作的.该库不仅依赖于后台线程,还使用当前线程作为辅助线程.因此,如果使用 degreeOfParallelism:1 配置了两个(或多个)连续的管道块,它们都将尝试在当前线程中执行其工作负载,彼此阻塞,从而破坏了任务并行性的全部目的.

Important: If you use the above technique to build the pipeline, you should avoid configuring two consecutive blocks with degreeOfParallelism: 1. This is because of how PLINQ works. This library does not depend only on background threads, but it also uses the current thread as a worker thread. So if two (or more) consecutive pipeline blocks are configured with degreeOfParallelism: 1, they will all attempt to execute their workload in the current thread, blocking each other, and defeating the whole purpose of task-parallelism.

这表明该库并非旨在用作管道基础结构,因此使用它会带来一些限制.因此,如果您的管道中具有 degreeOfParallelism:1 的连续块有意义,则PLINQ不再可行,您应该寻找替代方案.就像前面提到的 TPL Dataflow 库.

This shows that this library is not intended to be used as a pipeline infrastructure, and using it as such imposes some limitations. So if it makes sense for your pipeline to have consecutive blocks with degreeOfParallelism: 1, the PLINQ becomes not a viable option, and you should look for alternatives. Like the aforementioned TPL Dataflow library.

更新:实际上,通过卸载 source的枚举,实际上可以链接具有 degreeOfParallelism:1 的连续块,而无需将它们压缩到单个线程中转到另一个线程.这样,每个块将在不同的线程上运行.下面是基于OffloadEnumeration 方法的实现.channel-1"rel =" nofollow noreferrer> Channel< T> :

Update: It is actually possible to link consecutive blocks having degreeOfParallelism: 1, without squeezing them into a single thread, by offloading the enumeration of the source to another thread. This way each block will run on a different thread. Below is an implementation of a OffloadEnumeration method, that is based on a Channel<T>:

private static IEnumerable<T> OffloadEnumeration<T>(
    IEnumerable<T> source, int boundedCapacity)
{
    var channel = Channel.CreateBounded<T>(boundedCapacity);
    var cts = new CancellationTokenSource();
    var task = Task.Run(async () =>
    {
        try
        {
            foreach (var item in source)
                while (!channel.Writer.TryWrite(item))
                    if (!await channel.Writer.WaitToWriteAsync(cts.Token))
                        throw new ChannelClosedException(); // Should never happen
            channel.Writer.Complete();
        }
        catch (Exception ex) { channel.Writer.Complete(ex); }
    });
    try
    {
        while (channel.Reader.WaitToReadAsync().AsTask().GetAwaiter().GetResult())
            while (channel.Reader.TryRead(out var item))
                yield return item;
    }
    finally { cts.Cancel(); }
}

此方法应在每个块的开头调用:

This method should be invoked at the beginning of each block:

public static ParallelQuery<TSource> BeginPipelineBlock<TSource>(
    this IEnumerable<TSource> source, int degreeOfParallelism)
{
    source = OffloadEnumeration(source, degreeOfParallelism * 10);
    return Partitioner
        .Create(source, EnumerablePartitionerOptions.NoBuffering)
        .AsParallel()
        .AsOrdered()
        .WithDegreeOfParallelism(degreeOfParallelism)
        .WithMergeOptions(ParallelMergeOptions.NotBuffered);
}

这仅在前一个块具有 degreeOfParallelism:1 时才真正有用,但是调用它总是不会增加太多开销(假设每个块的工作量相当大).

This is really only useful when the previous block has degreeOfParallelism: 1, but calling it always shouldn't add much overhead (assuming that the workload of each block is fairly chunky).

这篇关于并行处理Azure blob和批量复制到SQL数据库的设计帮助.C#的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆