异步巨大的数据流 [英] Async with huge data streams

查看:156
本文介绍了异步巨大的数据流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们使用IEnumerables从数据库返回的数据集巨大的:

We use IEnumerables to return huge datasets from database:

public IEnumerable<Data> Read(...)
{
    using(var connection = new SqlConnection(...))
    {
        // ...
        while(reader.Read())
        {
            // ...
            yield return item;
        }
    }
}

现在我们要使用异步方法做同样的。然而,有对异步无IEnumerables,所以我们要收集数据到一个列表中,直到整个数据集装:

Now we want to use async methods to do the same. However there is no IEnumerables for async, so we have to collect data into a list until the entire dataset is loaded:

public async Task<List<Data>> ReadAsync(...)
{
    var result = new List<Data>();
    using(var connection = new SqlConnection(...))
    {
        // ...
        while(await reader.ReadAsync().ConfigureAwait(false))
        {
            // ...
            result.Add(item);
        }
    }
    return result;
}

这会消耗巨大的服务器资源的数量,因为所有数据必须在列表中返回之前。什么是最好的和易于使用的异步替代方案IEnumerables与大数据流的工作吗?我想,以避免存储所有内存中的数据在处理。

This will consume a huge amount of resources on server, because all data must be in the list before return. What is the best and easy to use async alternative for IEnumerables to work with large data streams? I would like to avoid storing all the data in memory while processing.

推荐答案

最简单的方法是使用的 TPL数据流 。所有你需要做的就是配置 ActionBlock 处理的处理(并行,如果你愿意的话)和发送的项目到一个一个异步。结果
我也建议设置 BoundedCapacity 这将扼杀读者从数据库读取数据时的处理无法处理的速度。

The easiest option is using TPL Dataflow. All you need to do is configure an ActionBlock that handles the processing (in parallel if you wish) and "sends" the items into it one by one asynchronously.
I would also suggest setting a BoundedCapacity which will throttle the reader reading from the database when the processing can't handle the speed.

var block = new ActionBlock<Data>(
    data => ProcessDataAsync(data),
    new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 1000,
        MaxDegreeOfParallelism = Environment.ProcessorCount
    });

using(var connection = new SqlConnection(...))
{
    // ...
    while(await reader.ReadAsync().ConfigureAwait(false))
    {
        // ...
       await block.SendAsync(item);
    }
}

您也可以使用无扩展的,但这是一个更加复杂和健壮的框架可能比你需要的。

You can also use Reactive Extensions, but that's a more complicated and robust framework than you probably need.

这篇关于异步巨大的数据流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆