使用异步/等待与DataReader的? (无中间缓冲区!) [英] Using async / await with DataReader ? ( without middle buffers!)

查看:222
本文介绍了使用异步/等待与DataReader的? (无中间缓冲区!)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的目标很简单,我想这样做异步I / O调用(使用异步等待) - 但是:

确定。

目前这里是我的code,它的工作是从数据库中读取和项目每行一个 Func键<>

 公开的IEnumerable< T> GetSomeData< T> (字符串SQL,Func键< IDataRecord,T>投影仪)
{
    使用(SqlConnection的_conn =新的SqlConnection(@数据源= ...))
    {
        使用(的SqlCommand _cmd =新的SqlCommand(SQL,_conn))
        {
            _conn.Open();
            _cmd.CommandTimeout = 100000;
            使用(IDataReader的RDR = _cmd.ExecuteReader())
            {
                而(rdr.Read())收益率回报投影机(RDR);
            }
        }
    }
}

那么,什么是投影机?

每个类都有它得到一个函数记录 IDataRecord ),并创建一个实体:

例如:

 公共类MyClass的
{
    公共静态MyClass的MyClassFactory(IDataRecord记录)
    {
        返回新MyClass的
        {
            名称=记录[名称]。的ToString()
            日期E = DateTime.Parse(记录[显示日期E]。的ToString()),
            VAL = decimal.Parse(记录[VAL]。的ToString())
        };
    }
    公共字符串名称{;组; }
    公开日期时间显示日期E {搞定;组; }
    公共小数VAL {搞定;组; }
}

所以在这里, MyClassFactory 将是函数功能

那么,如何我目前运行呢?

  VAR SQL = @SELECT TOP 1000 [名],[显示日期E],[VAL] FROM [WebERP] [DBO] [T]。
 VAR一个= GetSomeData< MyClass的> (SQL,MyClass.MyClassFactory)。凡(...); //注意函数功能

一切ok。

现在开始的问题:

添加异步来的方法产生一个错误:(是的,我知道,IEnumerable的是一个的同步的接口,因此这个问题)

公共异步任务<&IEnumerable的LT; T>> GetSomeData< T> (字符串SQL,Func键< IDataRecord,T>投影仪)


  

不能是迭代器块,因为
  System.Threading.Tasks.Task>
  是不是一个迭代器接口类型。


但在这里这个家伙确实 - :

其中的 DOES 编译。

我如何转换我的code ,以全力支持asynchronse IO调用?

(的条件下:无数据流的依赖,送投影机功能作为参数,没有中间缓冲区)


解决方案

  

我想这样做异步I / O调用(使用异步等待) - 但是:


  
  

      
  • 如果没有(在这个答案等)使用数据流依赖

  •   
  • 无中间缓冲区(不喜欢这个答案)

  •   
  • 的投影机功能应发送作为参数。 (不喜欢这个答案)

  •   

您可能要检查斯蒂芬Toub的的任务,单子和LINQ关于如何处理异步数据序列中的一些伟大的想法。

这不是(还)可以收益等待,但我将是一个verbalist结合此处引用的要求没有列出的IEnumerable 和LINQ。所以,这里的形为两个协同程序(几乎未经测试)。

一个可能的解决方案

数据制作程序(对应于 IEnumarable 收益

 公共异步任务GetSomeDataAsync< T>(
    SQL字符串,Func键< IDataRecord,T>投影仪,ProducerConsumerHub< T>枢纽)
{
    使用(SqlConnection的_conn =新的SqlConnection(@数据源= ...))
    {
        使用(的SqlCommand _cmd =新的SqlCommand(SQL,_conn))
        {
            等待_conn.OpenAsync();
            _cmd.CommandTimeout = 100000;
            使用(VAR RDR =等待_cmd.ExecuteReaderAsync())
            {
                而(等待rdr.ReadAsync())
                    等待hub.ProduceAsync(投影仪(RDR));
            }
        }
    }
}

数据消费程序(对应于的foreach 或LINQ前pression):

 公共异步任务ConsumeSomeDataAsync(SQL字符串)
{
    VAR枢纽=新ProducerConsumerHub< IDataRecord>();
    VAR producerTask = GetSomeDataAsync(SQL,RDR =>阅读器,集线器);    而(真)
    {
        变种nextItemTask = hub.ConsumeAsync();
        等待Task.WhenAny(producerTask,nextItemTask);        如果(nextItemTask.IsCompleted)
        {
            //处理下一个数据项
            Console.WriteLine(等待nextItemTask);
        }        如果(producerTask.IsCompleted)
        {
            //处理序列结束
            等待producerTask;
            打破;
        }
    }
}

协程执行辅助(也可被实现为一对的自定义awaiters 中):

 公共类ProducerConsumerHub< T>
{
    TaskCompletionSource<&空GT; _消费=新TaskCompletionSource<&空GT;();
    TaskCompletionSource< T> _producer =新TaskCompletionSource< T>();    // TODO:让线程安全
    公共异步任务ProduceAsync(T数据)
    {
        _producer.SetResult(数据);
        等待_consumer.Task;
        _消费=新TaskCompletionSource<&空GT;();
    }    公共异步任务< T> ConsumeAsync()
    {
        VAR数据=等待_producer.Task;
        _producer =新TaskCompletionSource< T>();
        _consumer.SetResult(Empty.Value);
        返回的数据;
    }    结构空{公共静态只读空值=默认值(空); }
}

这只是一个想法。它可能是这样一个简单的任务矫枉过正,故能在某些区域(改进象线程安全,竞争条件和处理序列结束而不触及 producerTask )。然而,它说明了如何异步数据检索和处理也可能会被分离出来。

My goal is simple , I want to do Asynchronous I/O calls (using async await) - but :

Ok.

Currently here is my code which it's job is to read from db and project each line to a Func<>

public IEnumerable < T > GetSomeData < T > (string sql, Func < IDataRecord, T > projector)
{
    using(SqlConnection _conn = new SqlConnection(@"Data Source=..."))
    {
        using(SqlCommand _cmd = new SqlCommand(sql, _conn))
        {
            _conn.Open();
            _cmd.CommandTimeout = 100000;
            using(IDataReader rdr = _cmd.ExecuteReader())
            {
                while (rdr.Read())  yield    return projector(rdr);
            }
        }
    }
}

So , what is projector ?

Each class has a function which gets a record ( IDataRecord) and create an entity :

Example :

public class MyClass
{
    public static MyClass MyClassFactory(IDataRecord record)
    {
        return new MyClass
        {
            Name = record["Name"].ToString(),
            Datee = DateTime.Parse(record["Datee"].ToString()),
            val = decimal.Parse(record["val"].ToString())
        };
    }
    public string Name    {   get;   set;  }
    public DateTime Datee    {  get;     set;  }
    public decimal val    {  get;    set;    }
}

So here , MyClassFactory would be the Func

So how I currently run it ?

 var sql = @"SELECT TOP 1000 [NAME],[datee] ,[val]  FROM [WebERP].[dbo].[t]";
 var a = GetSomeData < MyClass > (sql, MyClass.MyClassFactory).Where(...); //notice the Func

All ok.

The problems starts now :

Adding async to the method yields an error : ( Yes I know that Ienumerable is a Synchronous interface hence the problem)

public async Task<IEnumerable < T >> GetSomeData < T > (string sql, Func < IDataRecord, T > projector)

cannot be an iterator block because 'System.Threading.Tasks.Task>' is not an iterator interface type

But this guy here did - :

Which DOES compile.

Question

How can I convert my code to support fully asynchronse IO call ?

(under the conditions : without DataFlow dependency , send projector function as argument , no middle buffers)

解决方案

I want to do Asynchronous I/O calls (using async await) - but :

  • Without using DataFlow dependency ( like in this answer)
  • Without middle buffers( not like this answer)
  • The Projector function should be sent as an argument. ( not like this answer)

You may want to check Stephen Toub's "Tasks, Monads, and LINQ" for some great ideas on how to process asynchronous data sequences.

It's not (yet) possible to combine yield and await, but I'm going to be a verbalist here: the quoted requirements didn't list IEnumerable and LINQ. So, here's a possible solution shaped as two coroutines (almost untested).

Data producer routine (corresponds to IEnumarable with yield):

public async Task GetSomeDataAsync<T>(
    string sql, Func<IDataRecord, T> projector, ProducerConsumerHub<T> hub)
{
    using (SqlConnection _conn = new SqlConnection(@"Data Source=..."))
    {
        using (SqlCommand _cmd = new SqlCommand(sql, _conn))
        {
            await _conn.OpenAsync();
            _cmd.CommandTimeout = 100000;
            using (var rdr = await _cmd.ExecuteReaderAsync())
            {
                while (await rdr.ReadAsync())
                    await hub.ProduceAsync(projector(rdr));
            }
        }
    }
}

Data consumer routine (correspond to foreach or a LINQ expression):

public async Task ConsumeSomeDataAsync(string sql)
{
    var hub = new ProducerConsumerHub<IDataRecord>();
    var producerTask = GetSomeDataAsync(sql, rdr => rdr, hub);

    while (true)
    {
        var nextItemTask = hub.ConsumeAsync();
        await Task.WhenAny(producerTask, nextItemTask);

        if (nextItemTask.IsCompleted)
        {
            // process the next data item
            Console.WriteLine(await nextItemTask);
        }

        if (producerTask.IsCompleted)
        {
            // process the end of sequence
            await producerTask;
            break;
        }
    }
}

Coroutine execution helper (can also be implemented as a pair of custom awaiters):

public class ProducerConsumerHub<T>
{
    TaskCompletionSource<Empty> _consumer = new TaskCompletionSource<Empty>();
    TaskCompletionSource<T> _producer = new TaskCompletionSource<T>();

    // TODO: make thread-safe
    public async Task ProduceAsync(T data)
    {
        _producer.SetResult(data);
        await _consumer.Task;
        _consumer = new TaskCompletionSource<Empty>();
    }

    public async Task<T> ConsumeAsync()
    {
        var data = await _producer.Task;
        _producer = new TaskCompletionSource<T>();
        _consumer.SetResult(Empty.Value);
        return data;
    }

    struct Empty { public static readonly Empty Value = default(Empty); }
}

This is just an idea. It might be an overkill for a simple task like this, and it could be improved in some areas (like thread-safety, race conditions and handling the end of the sequence without touching producerTask). Yet it illustrates how the asynchronous data retrieval and processing could possibly be decoupled.

这篇关于使用异步/等待与DataReader的? (无中间缓冲区!)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆