CPU密集型任务与IO持续的约束并行 [英] Parallelization of CPU bound task continuing with IO bound

查看:327
本文介绍了CPU密集型任务与IO持续的约束并行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图想出一个好办法吗code,做的大数据集处理的并行化,然后将所得数据导入到RavenDb。

数据处理由CPU绑定,并且数据库导入IO约束。

我正在寻找一个解决方案做处理并行的线程数Environment.ProcessorCount。然后将得到的数据应该被导入到RavenDb开X(可以说10)并行的线程池与上述过程。

这里最主要的是我想要的处理继续进行,同时完成数据的导入,使处理下一集将继续在等待导入完成。

另一个问题是需要一个成功导入为一体的民营工作记忆可以方便地到达后丢弃每个批次内存> 5GB。

在code以下就是我这么远。别注意,它不fullfill上面概述的并行化的要求。

  datasupplier.GetDataItems()
    .Partition(BATCHSIZE)
    .AsParallel()
    .WithDegreeOfParallelism(Environment.ProcessorCount)
    .ForAll(批量=>
    {
        Task.Run(()=>
        {
            ...
        }
    }

GetDataItem产生被划分成一个批次的数据集可枚举数据项。 GetDataItem将产生〜200万的项目每个平均约为0.3ms进行处理。

该项目对x64平台最新的.NET 4.5 RC运行。

更新。

我目前的code(如上所示)将获取的项目和批次划分他们。每批并行处理上八个线程(在I7 Environment.ProcessorCount)。处理速度慢,CPU绑定和内存密集型。
当单批的处理完成后,一个任务开始向所得数据异步导入RavenDb。批量导入作业本身就是同步的,是这样的:

 使用(VAR会话= Store.OpenSession())
{
    的foreach(批量VAR数据)
    {
        session.Store(数据);
    }
    session.SaveChanges();
}

有这种方法的几个问题:


  1. 每一批完成任务的时间开始运行导入作业。我要限制并行运行的任务数(例如最大10)。此外,尽管许多任务都开始他们似乎从来没有在并行运行。


  2. 内存分配是一个巨大的问题。一旦批量处理/进口似乎仍保留在内存中。


我在寻找方法来利用上述问题的关心。理想的情况是我想要的:


  • 每个逻辑处理器一个线程执行数据的繁重处理批次。

  • 十左右的并行线程等待完成批量导入RavenDb。

  • 要保持内存分配到最低限度这意味着unallocating批量导入任务完成后。

  • 要不在螺纹上进行批处理的一个运行导入作业。正在处理已完成的批次应平行于下一批运行的导入。

解决方案

  VAR BATCHSIZE = 10000;
变种BC =新BlockingCollection<列表与lt;数据>>();
变种importTask = Task.Run(()=>
{
    bc.GetConsumingEnumerable()
        .AsParallel()
        .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
        .WithMergeOptions(ParallelMergeOptions.NotBuffered)
        .ForAll(批量=>
        {
            使用(VAR会话= Store.OpenSession())
            {
                的foreach(变种i的批次)session.Store(ⅰ);
                session.SaveChanges();
            }
        });
});
变种processTask = Task.Run(()=>
{
    datasupplier.GetDataItems()
        .Partition(BATCHSIZE)
        .AsParallel()
        .WithDegreeOfParallelism(Environment.ProcessorCount)
        .ForAll(批量=>
        {
            bc.Add(batch.Select(I =>新建数据()
            {
                ...
            })了ToList())。
        });
});processTask.Wait();
bc.CompleteAdding();
importTask.Wait();


解决方案

你的任务整体听起来像一个生产者 - 消费者的工作流程。您的批量处理器生产商,而你RavenDB数据进口是生产者的产出的消费者。

考虑使用 BlockingCollection< T> 作为批处理proccesors和你的分贝的进口商之间的连接。分贝进口商将尽快批处理器推完成分批进入阻塞收集醒来,并会回去睡觉时,他们已中招了,并清空集合。

批处理器生产商可以运行全油门,将永远运行并发数据库进口商任务处理previously完成批次。如果您担心该批次的处理器可能会过于超前分贝进口商(B / C DB进口花费显著长于处理每个批次),你可以设置阻塞收集的上限,所以,当他们增加生产者将阻塞超出这个限制,让消费者有机会迎头赶上。

你的一些评论是令人担忧的,虽然。没有什么特别不妥旋转了一个工作实例来异步执行数据库导入到批处理。任务!=线程。创建新任务实例没有创建新线程的同一巨大的开销。

不要搅得试图控制线程太precisely挂断了电话。即使您指定要一样多桶,你有核心,你没有得到独家使用的内核。数以百计的其他线程的其他进程仍然会在你的时间段之间进行调度。指定工作任务使用的逻辑单元,并让第三方物流管理线程池。保存自己控制的虚假感无奈。 ;>

在您的意见,您指示您的任务似乎没有运行异步对方(你是如何确定的?)和各批次完成后,内存似乎没有被释放。我建议你​​放弃一切,直到你可以弄清楚什么是与这两个问题,第一个。你是不是忘记调用Dispose()地方? ,你把所保持的对象一整棵树活着不必要的参考?你测量正确的事情?是并行任务被阻塞数据库或网络I / O序列化?直到这两个问题都解决了这不要紧,你的并行计划是什么。

I'm trying to figure out a good way to do parallelization of code that does processing of big datasets and then imports the resulting data into RavenDb.

The data processing is CPU bound and database import IO bound.

I'm looking for a solution to do the processing in parallel on Environment.ProcessorCount number of threads. The resulting data should then be imported into RavenDb on x (lets say 10) pooled threads in parallel with the above process.

The main thing here is I want the processing to continue while completed data is being imported so that processing the next dataset continues while waiting for the import to complete.

Another issue is the memory for each batch needs to be discarded after a successful import as the private working memory can easily reach >5GB.

The code below is what I've got so far. Do note that it does not fullfill the parallelization requirements outlined above.

datasupplier.GetDataItems()
    .Partition(batchSize)
    .AsParallel()
    .WithDegreeOfParallelism(Environment.ProcessorCount)
    .ForAll(batch =>
    {
        Task.Run(() =>
        {
            ...
        }
    }

GetDataItem yields enumerable data items that are partitioned into a batch dataset. GetDataItem will yield ~2,000,000 items each averaging around 0.3ms for processing.

The project is running on the latest .NET 4.5 RC on a x64 platform.

Update.

My current code (seen above) will fetch items and partition them in batches. Each batch is processed in parallel on eight threads (Environment.ProcessorCount on i7). The processing is slow, cpu-bound and memory intensive. When processing of a single batch is complete, a task is started to asynchronously import the resulting data into RavenDb. The batch import job is itself synchronous and looks like:

using (var session = Store.OpenSession())
{
    foreach (var data in batch)
    {
        session.Store(data);
    }
    session.SaveChanges();
}

There are a few problems with this approach:

  1. Every time a batch is completed a task is started to run the import job. I want to limit the number of tasks that run in parallel (eg. max 10). Additionally even though many tasks are started they seem to never run in parallel.

  2. Memory allocations are a huge problem. Once a batch is processed/imported it seems to still remain in memory.

I'm looking for ways to take care of the issues outlined above. Ideally I want:

  • One thread per logical processor doing heavy lifting processing batches of data.
  • Ten or so parallel threads waiting for completed batches to import into RavenDb.
  • To keep memory allocations to a minimum which means unallocating a batch after the import task is complete.
  • To not run import jobs on one of the threads for batch processing. Import of completed batches should run in parallel to the next batch being processed.

Solution

var batchSize = 10000;
var bc = new BlockingCollection<List<Data>>();
var importTask = Task.Run(() =>
{
    bc.GetConsumingEnumerable()
        .AsParallel()
        .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
        .WithMergeOptions(ParallelMergeOptions.NotBuffered)
        .ForAll(batch =>
        {
            using (var session = Store.OpenSession())
            {
                foreach (var i in batch) session.Store(i);
                session.SaveChanges();
            }
        });
});
var processTask = Task.Run(() =>
{
    datasupplier.GetDataItems()
        .Partition(batchSize)
        .AsParallel()
        .WithDegreeOfParallelism(Environment.ProcessorCount)
        .ForAll(batch =>
        {
            bc.Add(batch.Select(i => new Data()
            {
                ...
            }).ToList());
        });
});

processTask.Wait();
bc.CompleteAdding();
importTask.Wait();

解决方案

Your task overall sounds like a producer-consumer workflow. Your batch processors are producers, and your RavenDB data "import" are the consumers of the output of the producers.

Consider using a BlockingCollection<T> as the connection between your batch proccesors and your db importers. The db importers will wake up as soon as the batch processors push completed batches into the blocking collection, and will go back to sleep when they have "caught up" and emptied the collection.

The batch processor producers can run full throttle and will always be running concurrent with the db importer tasks processing previously completed batches. If you are concerned that the batch processors may get too far ahead of the db importers (b/c db import takes significantly longer than processing each batch) you can set an upper bound on the blocking collection so that the producers will block when they add beyond that limit, giving the consumers a chance to catch up.

Some of your comments are worrisome, though. There's nothing particularly wrong with spinning up a Task instance to perform the db import asynchronously to the batch processing. Task != Thread. Creating new task instances does not have the same monumental overhead of creating new threads.

Don't get hung up on trying to control threads too precisely. Even if you specify that you want exactly as many buckets as you have cores, you don't get exclusive use of those cores. Hundreds of other threads from other processes will still be scheduled in between your time slices. Specify the logical units of work using Tasks and let the TPL manage the thread pool. Save yourself the frustration of a false sense of control. ;>

In your comments, you indicate that your tasks do not appear to be running async to each other (how are you determining this?) and memory does not appear to be released after each batch is finished. I'd suggest dropping everything until you can figure out what is up with those two problems first. Are you forgetting to call Dispose() somewhere? Are you holding onto a reference that is keeping a whole tree of objects alive unnecessarily? Are you measuring the right thing? Are the parallel tasks being serialized by a blocking database or network I/O? Until these two issues are resolved it doesn't matter what your parallelism plan is.

这篇关于CPU密集型任务与IO持续的约束并行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆