对于TPL数据流:如何在阻塞直到处理完所有输入之前如何使用TransformBlock产生的所有输出? [英] For a TPL Dataflow: How do I get my hands on all the output produced by a TransformBlock while blocking until all inputs have been processed?

查看:90
本文介绍了对于TPL数据流:如何在阻塞直到处理完所有输入之前如何使用TransformBlock产生的所有输出?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在向单个数据库同步提交一系列 select 语句(查询-成千上万个),并且每个查询取回一个 DataTable (注意:该程序这样就可以知道它仅在运行时进行扫描的数据库模式,因此可以使用 DataTables ).该程序在客户端计算机上运行,​​并连接到远程计算机上的数据库.运行如此多的查询需要很长时间.因此,假设异步或并行执行它们可以加快处理速度,我正在研究 TPL Dataflow(TDF).我想使用 TDF 库,因为它似乎可以处理与编写多线程代码有关的所有问题,否则这些问题需要手工完成.

I'm submitting a series of select statements (queries - thousands of them) to a single database synchronously and getting back one DataTable per query (Note: This program is such that it has knowledge of the DB schema it is scanning only at run time, hence the use of DataTables). The program runs on a client machine and connects to DBs on a remote machine. It takes a long time to run so many queries. So, assuming that executing them async or in parallel will speed things up, I'm exploring TPL Dataflow (TDF). I want to use the TDF library because it seems to handle all of the concerns related to writing multi-threaded code that would otherwise need to be done by hand.

显示的代码基于 http://blog.i3arnon.com/2016/05/23/tpl-dataflow/.它的最小值,只是为了帮助我了解 TDF 的基本操作.请知道我已经阅读了许多博客,并编写了许多迭代文章来尝试破解此错误.

The code shown is based on http://blog.i3arnon.com/2016/05/23/tpl-dataflow/. Its minimal and is just to help me understand the basic operations of TDF. Please do know I've read many blogs and coded many iterations trying to crack this nut.

尽管如此,在当前的迭代中,我有一个问题和一个问题:

None-the-less, with this current iteration, I have one problem and a question:

问题

该代码位于 button click 方法内(使用UI,用户选择计算机,sql实例和数据库,然后启动扫描).带有 await 运算符的两行在构建时返回错误:'await'运算符只能在异步方法中使用.考虑使用'async'修饰符标记此方法,并将其返回类型更改为'Task'.我无法更改按钮单击方法的返回类型.我是否需要以某种方式将 button click 方法与 async-await 代码隔离?

The code is inside a button click method (Using a UI, a user selects a machine, a sql instance, and a database, and then kicks off the scan). The two lines with the await operator return an error at build time: The 'await' operator can only be used within an async method. Consider marking this method with the 'async' modifier and changing its return type to 'Task'. I can't change the return type of the button click method. Do I need to somehow isolate the button click method from the async-await code?

问题

尽管我已经找到了描述 TDF 基本知识的beau-coup文章,但我找不到如何在每次调用 TransformBlock 产生(即 DataTable ).尽管我想提交查询 async ,但是我确实需要阻止直到所有提交给 TransformBlock 的查询完成.如何获得由 TransformBlock 生成的一系列 DataTable 并阻塞直到所有查询完成?

Although I've found beau-coup write-ups describing the basics of TDF, I can't find an example of how to get my hands on the output that each invocation of the TransformBlock produces (i.e., a DataTable). Although I want to submit the queries async, I do need to block until all queries submitted to the TransformBlock are completed. How do I get my hands on the series of DataTables produced by the TransformBlock and block until all queries are complete?

注意:我承认我现在只有一个街区.至少,我将添加一个取消块,所以确实需要/想要使用TPL.

private async Task ToolStripButtonStart_Click(object sender, EventArgs e)
{

    UserInput userInput = new UserInput
    {
        MachineName = "gat-admin",
        InstanceName = "",
        DbName = "AdventureWorks2014",
    };

    DataAccessLayer dataAccessLayer = new DataAccessLayer(userInput.MachineName, userInput.InstanceName);

    //CreateTableQueryList gets a list of all tables from the DB and returns a list of 
    // select statements, one per table, e.g., SELECT * from [schemaname].[tablename]
    IList<String> tableQueryList = CreateTableQueryList(userInput);

    // Define a block that accepts a select statement and returns a DataTable of results
    // where each returned record is: schemaname + tablename + columnname + column datatype + field data
    // e.g., if the select query returns one record with 5 columns, then a datatable with 5 
    // records (one per field) will come back 

    var transformBlock_SubmitTableQuery = new TransformBlock<String, Task<DataTable>>(
        async tableQuery => await dataAccessLayer._SubmitSelectStatement(tableQuery),
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 2,
        });

    // Add items to the block and start processing
    foreach (String tableQuery in tableQueryList)
    {
        await transformBlock_SubmitTableQuery.SendAsync(tableQuery);
    }

    // Enable the Cancel button and disable the Start button.
    toolStripButtonStart.Enabled = false;
    toolStripButtonStop.Enabled = true;

    //shut down the block (no more inputs or outputs)
    transformBlock_SubmitTableQuery.Complete();

    //await the completion of the task that procduces the output DataTable
    await transformBlock_SubmitTableQuery.Completion;
}

public async Task<DataTable> _SubmitSelectStatement(string queryString )
{
    try
    {

        .
        .
        await Task.Run(() => sqlDataAdapter.Fill(dt));

        // process dt into the output DataTable I need

        return outputDt;
    }
    catch
    {
        throw;
    }

}

推荐答案

检索 TransformBlock 的输出的正确方法是使用方法 TryReceive .这有点混乱,因此您可以考虑通过将以下扩展方法复制粘贴到项目的某些静态类中,从而从应用程序代码中隐藏这种复杂性:

The correct way to retrieve the output of a TransformBlock is to perform a nested loop using the methods OutputAvailableAsync and TryReceive. It is a bit messy, so you could consider hiding this complexity from your application code by copy-pasting the extension method below in some static class of your project:

public static async Task<List<T>> ToListAsync<T>(this IReceivableSourceBlock<T> block,
    CancellationToken cancellationToken = default)
{
    var list = new List<T>();
    while (await block.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
    {
        while (block.TryReceive(out var item))
        {
            list.Add(item);
        }
    }
    await block.Completion.ConfigureAwait(false); // Propagate possible exception
    return list;
}

然后您可以使用 ToListAsync 方法,如下所示:

Then you could use the ToListAsync method like this:

private async Task ToolStripButtonStart_Click(object sender, EventArgs e)
{
    var transformBlock = new TransformBlock<string, DataTable>(async query => //...
    //...
    transformBlock.Complete();

    foreach (DataTable dataTable in await transformBlock.ToListAsync())
    {
        // Do something with the dataTable
    }
}

如果您已将项目升级到C#8,那么您还可以选择以流式方式检索输出,作为

If you have upgraded your project to C# 8 then you have also the option to retrieve the output in a streaming fashion, as an IAsyncEnumerable:

public static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(
    this IReceivableSourceBlock<T> block,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    while (await block.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
    {
        while (block.TryReceive(out var item))
        {
            yield return item;
        }
    }
    await block.Completion.ConfigureAwait(false); // Propagate possible exception
}

通过这种方式,您可以在煮熟每个 DataTable 之后立即使用它们,而不必等待所有查询的处理.要使用 IAsyncEnumerable ,您只需将 await 移到 foreach 之前:

This way you will be able to get your hands to each DataTable immediately after it has been cooked, without having to wait for the processing of all queries. To consume an IAsyncEnumerable you simply move the await before the foreach:

await foreach (DataTable dataTable in transformBlock.ToAsyncEnumerable())
{
    // Do something with the dataTable
}

这篇关于对于TPL数据流:如何在阻塞直到处理完所有输入之前如何使用TransformBlock产生的所有输出?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆