如何将Rx.Nex扩展ForEachAsync与异步操作配合使用 [英] How to use Rx.Nex extension ForEachAsync with async action

查看:80
本文介绍了如何将Rx.Nex扩展ForEachAsync与异步操作配合使用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一些代码,这些代码从SQL向下流数据并将其写入另一个存储中.代码大概是这样的:

I have code which streams data down from SQL and writes it to a different store. The code is approximately this:

using (var cmd = new SqlCommand("select * from MyTable", connection))
{
     using (var reader = await cmd.ExecuteReaderAsync())
     {
         var list = new List<MyData>();
         while (await reader.ReadAsync())
         {
             var row = GetRow(reader);
             list.Add(row);
             if (list.Count == BatchSize)
             {
                 await WriteDataAsync(list);
                 list.Clear();
             }
         }
         if (list.Count > 0)
         {
             await WriteDataAsync(list);
         }
     }
 }

我想为此使用Reactive扩展.理想情况下,代码应如下所示:

I would like to use Reactive extensions for this purpose instead. Ideally the code would look like this:

await StreamDataFromSql()
    .Buffer(BatchSize)
    .ForEachAsync(async batch => await WriteDataAsync(batch));

但是,似乎扩展方法ForEachAsync仅接受同步操作.可以编写一个可以接受异步操作的扩展程序吗?

However, it seems that the extension method ForEachAsync only accepts synchronous actions. Would it be possible to write an extension which would accept an async action?

推荐答案

是否可以编写一个可以接受异步操作的扩展程序?

Would it be possible to write an extension which would accept an async action?

不直接.

Rx订阅必须是同步的,因为Rx是基于推送的系统.数据项到达时,它会遍历您的查询,直到达到最终订阅为止-在这种情况下,该订阅将执行Action.

Rx subscriptions are necessarily synchronous because Rx is a push-based system. When a data item arrives, it travels through your query until it hits the final subscription - which in this case is to execute an Action.

Rx提供的await -able方法正在await本身 序列-即,ForEachAsync就序列而言是异步的(您正在异步等待序列完整),但ForEachAsync中的订阅(对每个元素执行的操作)仍必须是同步的.

The await-able methods provided by Rx are awaiting the sequence itself - i.e., ForEachAsync is asynchronous in terms of the sequence (you are asynchronously waiting for the sequence to complete), but the subscription within ForEachAsync (the action taken for each element) must still be synchronous.

为了在数据管道中进行同步到异步转换,您需要有一个缓冲区. Rx订阅可以(作为生产者)(同步)添加到缓冲区中,而异步消费者正在检索项目并对其进行处理.因此,您需要一个同时支持同步和异步操作的生产者/消费者队列.

In order to do a sync-to-async transition in your data pipeline, you'll need to have a buffer. An Rx subscription can (synchronously) add to the buffer as a producer while an asynchronous consumer is retrieving items and processing them. So, you'd need a producer/consumer queue that supports both synchronous and asynchronous operations.

TPL Dataflow中的各种块类型可以满足此需求.这样的东西就足够了:

The various block types in TPL Dataflow can satisfy this need. Something like this should suffice:

var obs = StreamDataFromSql().Buffer(BatchSize);
var buffer = new ActionBlock<IList<T>>(batch => WriteDataAsync(batch));
using (var subscription = obs.Subscribe(buffer.AsObserver()))
  await buffer.Completion;

请注意,没有背压. StreamDataFromSql可以尽快推送数据,它将被缓冲并存储在ActionBlock的传入队列中.根据数据的大小和类型,这可以快速使用大量内存.

Note that there is no backpressure; as quickly as StreamDataFromSql can push data, it'll be buffered and stored in the incoming queue of the ActionBlock. Depending on the size and type of data, this can quickly use a lot of memory.

这篇关于如何将Rx.Nex扩展ForEachAsync与异步操作配合使用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆