将 async/await 与 DataReader 一起使用?(没有中间缓冲区!) [英] Using async / await with DataReader ? ( without middle buffers!)

查看:34
本文介绍了将 async/await 与 DataReader 一起使用?(没有中间缓冲区!)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的目标很简单,我想做异步 I/O 调用(使用异步等待) - 但是:

  • 不使用 DataFlow 依赖项( 关于如何处理异步数据序列的一些好主意.

    (还)不可能将 yieldawait 结合起来,但我将在这里做一个口头说明:引用的要求没有列出 IEnumerable 和 LINQ.所以,这里有一个可能的解决方案,形状为两个协程(几乎未经测试).

    数据生成器例程(对应于 IEnumarableyield):

    公共异步任务 GetSomeDataAsync(字符串 sql, Func投影仪、ProducerConsumerHub<T>中心){using (SqlConnection _conn = new SqlConnection(@"Data Source=...")){使用 (SqlCommand _cmd = new SqlCommand(sql, _conn)){等待 _conn.OpenAsync();_cmd.CommandTimeout = 100000;使用 (var rdr = await _cmd.ExecuteReaderAsync()){而(等待 rdr.ReadAsync())等待 hub.ProduceAsync(projector(rdr));}}}}

    数据消费者例程(对应于foreach或LINQ表达式):

    public async Task ConsumeSomeDataAsync(string sql){var hub = new ProducerConsumerHub();var producerTask = GetSomeDataAsync(sql, rdr => rdr, hub);而(真){var nextItemTask = hub.ConsumeAsync();await Task.WhenAny(producerTask, nextItemTask);如果(nextItemTask.IsCompleted){//处理下一个数据项Console.WriteLine(await nextItemTask);}如果(producerTask.IsCompleted){//处理序列的结尾等待生产者任务;休息;}}}

    协程执行助手(也可以实现为一对自定义等待者):

    公共类ProducerConsumerHub{TaskCompletionSource_consumer = new TaskCompletionSource();TaskCompletionSource_producer = new TaskCompletionSource();//TODO: 使线程安全公共异步任务 ProduceAsync(T 数据){_producer.SetResult(data);等待 _consumer.Task;_consumer = new TaskCompletionSource();}公共异步任务<T>消费异步(){var data = await _producer.Task;_producer = new TaskCompletionSource();_consumer.SetResult(Empty.Value);返回数据;}struct Empty { public static readonly Empty Value = default(Empty);}}

    这只是一个想法.对于像这样的简单任务来说,这可能是一种矫枉过正,并且可以在某些方面进行改进(例如线程安全、竞争条件和在不触及 producerTask 的情况下处理序列的末尾).然而,它说明了异步数据检索和处理是如何解耦的.

    My goal is simple , I want to do Asynchronous I/O calls (using async await) - but :

    Ok.

    Currently here is my code which it's job is to read from db and project each line to a Func<>

    public IEnumerable < T > GetSomeData < T > (string sql, Func < IDataRecord, T > projector)
    {
        using(SqlConnection _conn = new SqlConnection(@"Data Source=..."))
        {
            using(SqlCommand _cmd = new SqlCommand(sql, _conn))
            {
                _conn.Open();
                _cmd.CommandTimeout = 100000;
                using(IDataReader rdr = _cmd.ExecuteReader())
                {
                    while (rdr.Read())  yield    return projector(rdr);
                }
            }
        }
    }
    

    So , what is projector ?

    Each class has a function which gets a record ( IDataRecord) and create an entity :

    Example :

    public class MyClass
    {
        public static MyClass MyClassFactory(IDataRecord record)
        {
            return new MyClass
            {
                Name = record["Name"].ToString(),
                Datee = DateTime.Parse(record["Datee"].ToString()),
                val = decimal.Parse(record["val"].ToString())
            };
        }
        public string Name    {   get;   set;  }
        public DateTime Datee    {  get;     set;  }
        public decimal val    {  get;    set;    }
    }
    

    So here , MyClassFactory would be the Func

    So how I currently run it ?

     var sql = @"SELECT TOP 1000 [NAME],[datee] ,[val]  FROM [WebERP].[dbo].[t]";
     var a = GetSomeData < MyClass > (sql, MyClass.MyClassFactory).Where(...); //notice the Func
    

    All ok.

    The problems starts now :

    Adding async to the method yields an error : ( Yes I know that Ienumerable is a Synchronous interface hence the problem)

    public async Task<IEnumerable < T >> GetSomeData < T > (string sql, Func < IDataRecord, T > projector)

    cannot be an iterator block because 'System.Threading.Tasks.Task>' is not an iterator interface type

    But this guy here did - :

    Which DOES compile.

    Question

    How can I convert my code to support fully asynchronse IO call ?

    (under the conditions : without DataFlow dependency , send projector function as argument , no middle buffers)

    解决方案

    I want to do Asynchronous I/O calls (using async await) - but :

    • Without using DataFlow dependency ( like in this answer)
    • Without middle buffers( not like this answer)
    • The Projector function should be sent as an argument. ( not like this answer)

    You may want to check Stephen Toub's "Tasks, Monads, and LINQ" for some great ideas on how to process asynchronous data sequences.

    It's not (yet) possible to combine yield and await, but I'm going to be a verbalist here: the quoted requirements didn't list IEnumerable and LINQ. So, here's a possible solution shaped as two coroutines (almost untested).

    Data producer routine (corresponds to IEnumarable with yield):

    public async Task GetSomeDataAsync<T>(
        string sql, Func<IDataRecord, T> projector, ProducerConsumerHub<T> hub)
    {
        using (SqlConnection _conn = new SqlConnection(@"Data Source=..."))
        {
            using (SqlCommand _cmd = new SqlCommand(sql, _conn))
            {
                await _conn.OpenAsync();
                _cmd.CommandTimeout = 100000;
                using (var rdr = await _cmd.ExecuteReaderAsync())
                {
                    while (await rdr.ReadAsync())
                        await hub.ProduceAsync(projector(rdr));
                }
            }
        }
    }
    

    Data consumer routine (correspond to foreach or a LINQ expression):

    public async Task ConsumeSomeDataAsync(string sql)
    {
        var hub = new ProducerConsumerHub<IDataRecord>();
        var producerTask = GetSomeDataAsync(sql, rdr => rdr, hub);
    
        while (true)
        {
            var nextItemTask = hub.ConsumeAsync();
            await Task.WhenAny(producerTask, nextItemTask);
    
            if (nextItemTask.IsCompleted)
            {
                // process the next data item
                Console.WriteLine(await nextItemTask);
            }
    
            if (producerTask.IsCompleted)
            {
                // process the end of sequence
                await producerTask;
                break;
            }
        }
    }
    

    Coroutine execution helper (can also be implemented as a pair of custom awaiters):

    public class ProducerConsumerHub<T>
    {
        TaskCompletionSource<Empty> _consumer = new TaskCompletionSource<Empty>();
        TaskCompletionSource<T> _producer = new TaskCompletionSource<T>();
    
        // TODO: make thread-safe
        public async Task ProduceAsync(T data)
        {
            _producer.SetResult(data);
            await _consumer.Task;
            _consumer = new TaskCompletionSource<Empty>();
        }
    
        public async Task<T> ConsumeAsync()
        {
            var data = await _producer.Task;
            _producer = new TaskCompletionSource<T>();
            _consumer.SetResult(Empty.Value);
            return data;
        }
    
        struct Empty { public static readonly Empty Value = default(Empty); }
    }
    

    This is just an idea. It might be an overkill for a simple task like this, and it could be improved in some areas (like thread-safety, race conditions and handling the end of the sequence without touching producerTask). Yet it illustrates how the asynchronous data retrieval and processing could possibly be decoupled.

    这篇关于将 async/await 与 DataReader 一起使用?(没有中间缓冲区!)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆