将阻塞调用包装为异步,以更好地重用线程和响应式UI [英] Wrapping blocking calls to be async for better thread reuse and responsive UI
问题描述
我有一个班级,负责通过调用旧版班级来检索产品的可用性.这个旧类本身通过进行BLOCKING网络调用在内部收集产品数据. 请注意,我无法修改旧版API的代码.由于所有产品彼此独立,因此我希望并行化收集信息,而不会创建任何不必要的线程,也不会阻塞在调用此旧版API时被阻塞的线程.在这种背景下,这是我的基本课程.
I have a class that is responsible for retrieving a product availability by making call to a legacy class. This legacy class itself internally collects product data by making BLOCKING network calls. Note that I cannot modify code of legacy API. Since all products are independent to each other, I would like to parallelise collecting the information without creating any unnecessary threads and also not blocking thread that gets blocked on calling this legacy API. With this background here are my basic classes.
class Product
{
public int ID { get; set; }
public int VendorID { get; set; }
public string Name { get; set; }
}
class ProductSearchResult
{
public int ID { get; set; }
public int AvailableQuantity { get; set; }
public DateTime ShipDate { get; set; }
public bool Success { get; set; }
public string Error { get; set; }
}
class ProductProcessor
{
List<Product> products;
private static readonly SemaphoreSlim mutex = new SemaphoreSlim(2);
CancellationTokenSource cts = new CancellationTokenSource();
public ProductProcessor()
{
products = new List<Product>()
{
new Product() { ID = 1, VendorID = 100, Name = "PC" },
new Product() { ID = 2, VendorID = 101, Name = "Tablet" },
new Product() { ID = 3, VendorID = 100, Name = "Laptop" },
new Product() { ID = 4, VendorID = 102, Name = "GPS" },
new Product() { ID = 5, VendorID = 107, Name = "Mars Rover" }
};
}
public async void Start()
{
Task<ProductSearchResult>[] tasks = new Task<ProductSearchResult>[products.Count];
Parallel.For(0, products.Count(), async i =>
{
tasks[i] = RetrieveProductAvailablity(products[i].ID, cts.Token);
});
Task<ProductSearchResult> results = await Task.WhenAny(tasks);
// Logic for waiting on indiviaul tasks and reporting results
}
private async Task<ProductSearchResult> RetrieveProductAvailablity(int productId, CancellationToken cancellationToken)
{
ProductSearchResult result = new ProductSearchResult();
result.ID = productId;
if (cancellationToken.IsCancellationRequested)
{
result.Success = false;
result.Error = "Cancelled.";
return result;
}
try
{
await mutex.WaitAsync();
if (cancellationToken.IsCancellationRequested)
{
result.Success = false;
result.Error = "Cancelled.";
return result;
}
LegacyApp app = new LegacyApp();
bool success = await Task.Run(() => app.RetrieveProductAvailability(productId));
if (success)
{
result.Success = success;
result.AvailableQuantity = app.AvailableQuantity;
result.ShipDate = app.ShipDate;
}
else
{
result.Success = false;
result.Error = app.Error;
}
}
finally
{
mutex.Release();
}
return result;
}
}
鉴于我试图通过同步API包装异步,我有两个问题.
Given that I am trying to wrap async over a synchronous API, I have two questions.
- 通过使用Parallel.For并将Legay API调用包装在Task.Run中,我是否正在创建任何本可以避免而又不会阻塞调用线程的线程,因为我们将在UI中使用此代码.
- 此代码是否仍然看起来是线程安全的?
推荐答案
编译器会向您发出有关async
lambda的警告.仔细阅读;告诉您它不是异步的.在那里使用async
没有意义.另外,请勿使用async void
.
The compiler will give you warnings about your async
lambda. Read it carefully; it's telling you that it's not asynchronous. There's no point in using async
there. Also, do not use async void
.
由于您的基础API处于阻塞状态-无法更改-异步代码不是一种选择.我建议使用几个Task.Run
调用或 Parallel.For
,但不要同时使用.因此,让我们使用并行.实际上,由于您正在转换一个序列,所以我们使用Parallel LINQ.
Since your underlying API is blocking - and there's no way to change that - asynchronous code isn't an option. I'd recommend either using several Task.Run
calls or Parallel.For
, but not both. So let's use parallel. Actually, let's use Parallel LINQ since you're transforming a sequence.
使RetrieveProductAvailablity
异步是没有意义的.除了节流之外,它仅做阻塞工作,并且并行方法具有更自然的节流支持.这使您的方法看起来像这样:
There's no point in making RetrieveProductAvailablity
asynchronous; it's only doing blocking work except for the throttling, and the parallel approach has more natural throttling support. This leaves your method looking like:
private ProductSearchResult RetrieveProductAvailablity(int productId, CancellationToken cancellationToken)
{
... // no mutex code
LegacyApp app = new LegacyApp();
bool success = app.RetrieveProductAvailability(productId);
... // no mutex code
}
然后您可以像这样进行并行处理:
You can then do parallel processing as such:
public void Start()
{
ProductSearchResult[] results = products.AsParallel().AsOrdered()
.WithCancellation(cts.Token).WithDegreeOfParallelism(2)
.Select(product => RetrieveProductAvailability(product.ID, cts.Token))
.ToArray();
// Logic for waiting on indiviaul tasks and reporting results
}
从UI线程中,您可以使用Task.Run
调用:
From your UI thread, you can call the method using Task.Run
:
async void MyUiEventHandler(...)
{
await Task.Run(() => processor.Start());
}
这可以使您的业务逻辑保持整洁(仅同步/并行代码),并且将工作移出UI线程(使用Task.Run
)的责任属于UI层.
This keeps your business logic clean (only synchronous/parallel code), and the responsibility for moving this work off the UI thread (using Task.Run
) belongs to the UI layer.
更新:我添加了对AsOrdered
的调用,以确保结果数组的顺序与产品顺序相同.这可能有必要,也可能没有必要,但是由于保留了原始代码的顺序,因此现在也是如此.
Update: I added a call to AsOrdered
to ensure the results array has the same order as the products sequence. This may or may not be necessary, but since the original code preserved order, this code does now too.
更新:由于您需要在每次检索后更新用户界面,因此您可能应该对每个用户使用Task.Run
而不是AsParallel
:
Update: Since you need to update the UI after every retrieval, you should probably use Task.Run
for each one instead of AsParallel
:
public async Task Start()
{
var tasks = products.Select(product =>
ProcessAvailabilityAsync(product.ID, cts.Token));
await Task.WhenAll(tasks);
}
private SemaphoreSlim mutex = new SempahoreSlim(2);
private async Task ProcessAvailabilityAsync(int id, CancellationToken token)
{
await mutex.WaitAsync();
try
{
var result = await RetrieveProductAvailability(id, token);
// Logic for reporting results
}
finally
{
mutex.Release();
}
}
这篇关于将阻塞调用包装为异步,以更好地重用线程和响应式UI的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!