将阻塞调用包装为异步,以更好地重用线程和响应式UI [英] Wrapping blocking calls to be async for better thread reuse and responsive UI

查看:54
本文介绍了将阻塞调用包装为异步,以更好地重用线程和响应式UI的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个班级,负责通过调用旧版班级来检索产品的可用性.这个旧类本身通过进行BLOCKING网络调用在内部收集产品数据. 请注意,我无法修改旧版API的代码.由于所有产品彼此独立,因此我希望并行化收集信息,而不会创建任何不必要的线程,也不会阻塞在调用此旧版API时被阻塞的线程.在这种背景下,这是我的基本课程.

I have a class that is responsible for retrieving a product availability by making call to a legacy class. This legacy class itself internally collects product data by making BLOCKING network calls. Note that I cannot modify code of legacy API. Since all products are independent to each other, I would like to parallelise collecting the information without creating any unnecessary threads and also not blocking thread that gets blocked on calling this legacy API. With this background here are my basic classes.

class Product
    {
        public int ID { get; set; }
        public int  VendorID { get; set; }
        public string Name { get; set; }
    }

    class ProductSearchResult
    {
        public int ID { get; set; }
        public int AvailableQuantity { get; set; }
        public DateTime ShipDate { get; set; }
        public bool Success { get; set; }
        public string Error { get; set; }
    }

class ProductProcessor
    {
        List<Product> products;
        private static readonly SemaphoreSlim mutex = new SemaphoreSlim(2);
        CancellationTokenSource cts = new CancellationTokenSource();
        public ProductProcessor()
        {
            products = new List<Product>()
            {
                new Product() { ID = 1, VendorID = 100, Name = "PC" },
                new Product() { ID = 2, VendorID = 101, Name = "Tablet" },
                new Product() { ID = 3, VendorID = 100, Name = "Laptop" },
                new Product() { ID = 4, VendorID = 102, Name = "GPS" },
                new Product() { ID = 5, VendorID = 107, Name = "Mars Rover" }
            };

        }

        public async void Start()
        {
            Task<ProductSearchResult>[] tasks = new Task<ProductSearchResult>[products.Count];
            Parallel.For(0, products.Count(), async i =>
            {
                tasks[i] = RetrieveProductAvailablity(products[i].ID, cts.Token);

            });



            Task<ProductSearchResult> results = await Task.WhenAny(tasks);

            // Logic for waiting on indiviaul tasks and reporting results

        }

        private async Task<ProductSearchResult> RetrieveProductAvailablity(int productId, CancellationToken cancellationToken)
        {
            ProductSearchResult result = new ProductSearchResult();
            result.ID = productId;

            if (cancellationToken.IsCancellationRequested)
            {
                result.Success = false;
                result.Error = "Cancelled.";
                return result;
            }

            try
            {
                await mutex.WaitAsync();
                if (cancellationToken.IsCancellationRequested)
                {
                    result.Success = false;
                    result.Error = "Cancelled.";
                    return result;
                }

                LegacyApp app = new LegacyApp();
                bool success = await Task.Run(() => app.RetrieveProductAvailability(productId));
                if (success)
                {
                    result.Success = success;
                    result.AvailableQuantity = app.AvailableQuantity;
                    result.ShipDate = app.ShipDate;
                }
                else
                {
                    result.Success = false;
                    result.Error = app.Error;
                }
            }
            finally
            {
                mutex.Release();
            }

            return result;

        }

    }

鉴于我试图通过同步API包装异步,我有两个问题.

Given that I am trying to wrap async over a synchronous API, I have two questions.

  1. 通过使用Parallel.For并将Legay API调用包装在Task.Run中,我是否正在创建任何本可以避免而又不会阻塞调用线程的线程,因为我们将在UI中使用此代码.
  2. 此代码是否仍然看起来是线程安全的?

推荐答案

编译器会向您发出有关async lambda的警告.仔细阅读;告诉您它不是异步的.在那里使用async没有意义.另外,请勿使用async void.

The compiler will give you warnings about your async lambda. Read it carefully; it's telling you that it's not asynchronous. There's no point in using async there. Also, do not use async void.

由于您的基础API处于阻塞状态-无法更改-异步代码不是一种选择.我建议使用几个Task.Run调用 Parallel.For,但不要同时使用.因此,让我们使用并行.实际上,由于您正在转换一个序列,所以我们使用Parallel LINQ.

Since your underlying API is blocking - and there's no way to change that - asynchronous code isn't an option. I'd recommend either using several Task.Run calls or Parallel.For, but not both. So let's use parallel. Actually, let's use Parallel LINQ since you're transforming a sequence.

使RetrieveProductAvailablity异步是没有意义的.除了节流之外,它仅做阻塞工作,并且并行方法具有更自然的节流支持.这使您的方法看起来像这样:

There's no point in making RetrieveProductAvailablity asynchronous; it's only doing blocking work except for the throttling, and the parallel approach has more natural throttling support. This leaves your method looking like:

private ProductSearchResult RetrieveProductAvailablity(int productId, CancellationToken cancellationToken)
{
  ... // no mutex code
  LegacyApp app = new LegacyApp();
  bool success = app.RetrieveProductAvailability(productId);
  ... // no mutex code
}

然后您可以像这样进行并行处理:

You can then do parallel processing as such:

public void Start()
{
  ProductSearchResult[] results = products.AsParallel().AsOrdered()
      .WithCancellation(cts.Token).WithDegreeOfParallelism(2)
      .Select(product => RetrieveProductAvailability(product.ID, cts.Token))
      .ToArray();
  // Logic for waiting on indiviaul tasks and reporting results
}

从UI线程中,您可以使用Task.Run 调用:

From your UI thread, you can call the method using Task.Run:

async void MyUiEventHandler(...)
{
  await Task.Run(() => processor.Start());
}

这可以使您的业务逻辑保持整洁(仅同步/并行代码),并且将工作移出UI线程(使用Task.Run)的责任属于UI层.

This keeps your business logic clean (only synchronous/parallel code), and the responsibility for moving this work off the UI thread (using Task.Run) belongs to the UI layer.

更新:我添加了对AsOrdered的调用,以确保结果数组的顺序与产品顺序相同.这可能有必要,也可能没有必要,但是由于保留了原始代码的顺序,因此现在也是如此.

Update: I added a call to AsOrdered to ensure the results array has the same order as the products sequence. This may or may not be necessary, but since the original code preserved order, this code does now too.

更新:由于您需要在每次检索后更新用户界面,因此您可能应该对每个用户使用Task.Run而不是AsParallel:

Update: Since you need to update the UI after every retrieval, you should probably use Task.Run for each one instead of AsParallel:

public async Task Start()
{
  var tasks = products.Select(product =>
      ProcessAvailabilityAsync(product.ID, cts.Token));
  await Task.WhenAll(tasks);
}

private SemaphoreSlim mutex = new SempahoreSlim(2);
private async Task ProcessAvailabilityAsync(int id, CancellationToken token)
{
  await mutex.WaitAsync();
  try
  {
    var result = await RetrieveProductAvailability(id, token);
    // Logic for reporting results
  }
  finally
  {
    mutex.Release();
  }
}

这篇关于将阻塞调用包装为异步,以更好地重用线程和响应式UI的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆