异步检查值而不会阻塞线程? [英] Asynchronously checking a value without bogging down a thread?

查看:89
本文介绍了异步检查值而不会阻塞线程?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的应用程序中有一个模式,我想发出一些命令或开始一些IO工作并等待它完成,或者有某种机制来知道它是否已完成.

I have a pattern in my application where I would like to send out some command or start some IO work and wait for it to be completed, or have some mechanism to know if it has completed.

为此,我计划使用 async / await 模式.我认为大多数人都已经达到了编码的目的.

For this I plan to use the async/await pattern. I assume most people have come to the point in coding where they reach this pattern.

while(something)
    DoNothing();

DoNothing()通常会消耗一些CPU时间,或者只是完全停止程序.我认为解决此问题的正确方法是使用 async / await 模式.

Where DoNothing() usually ends up eating up some CPU time or just stopping the program altogether. I assume the proper way to get around this is with the async/await pattern.

就我而言,我想出了以下简单方法

In my case I have come up with the following simple method

public override async Task<Boolean> PerformProcessingAsync()
{
    StartSomeIOProcessing();
    while (TheIOProcessingResult == null)
        await Task.Yield();
    return true;
}

哪个开始一些IO处理,然后等待实例化结果.同时,尽管它调用 Task.Yield 返回到调用上下文,该上下文可以继续工作,还可以将此方法的延续(下一次while循环迭代)放到调用堆栈中.

Which starts some IO processing and then waits for the result to be instantiated. In the meantime though it calls Task.Yield to return to the calling context which can then continue work and also places a continuation of this method (the next while loop iteration) onto the call stack.

这是正确的解释吗,这是解决上述情况的正确方法吗?

编辑:在我要面对的更具体的情况下,我还维护了另一个执行IO工作的库,基本上是对 SerialPort 对象进行读写.该库通过对特定端口进行读取的 ConcurrentQueue 进行工作.一个 Task 坐在此队列的末尾",并随着工作的进行而消耗工作.

In a more specific situation that I am facing... I also maintain another library that performs IO work, basically reading and writing to SerialPort objects. This library works by having a ConcurrentQueue of reads or writes that it processes with respect to a specific port. A Task "sits at the end" of this queue and consume the work as it goes.

大多数阅读工作只是查询一些值,解析它并触发事件 NewData(double myNewData),UI会监听该事件.在UI中,保存了 SerialPort 数据的状态表示.在状态表示中,将处理 NewData 事件,该事件将更新其对应值.

Most reading work simply queries some value, parses it, and fires an event NewData(double myNewData) which is listened to by the UI. In the UI a state representation is held of the SerialPorts data. In the state representation the NewData event is handled which updates the value of whatever it corresponds to.

以相同的方式完成写入,但是完成后不会触发任何事件,只需将端口写入即可.判断是否成功的唯一方法是等待读取以更新状态.(不幸的是,由于硬件的行为方式,没有更好的方法可以做到这一点.)

Writing is done in the same manner but there is no event fired upon completion, the port is simply written to. The only way to tell if it was successful is to wait for the reads to update the state. (Sadly because of how the hardware acts there is no better way to do this).

我当前的应用程序利用此库执行其IO工作.读操作会定期发送到库,以使UI保持端口中的新值...当用户单击按钮时,写操作将发送到库,以从硬件中命令某些内容.

My current application makes use of this library to perform its IO work. Reads are periodically sent to the library to keep the UI with fresh values from the ports... When a user clicks a button writes are sent off to the library to command something from the hardware.

我处于一种情况下,想要确保写入是通过编程方式发生的.我能想到的唯一方法是将写入发送到库,然后等待读取以更新写入效果的数据.

I am in the situation where I want to assure that the write has happened programmatically. The only way I can think to do this is to send the write off to the library, and wait for the read to update the data the write effects.

因此循环.

推荐答案

我正要确保以编程方式进行写操作.我能想到的唯一方法是将写入发送到库,然后等待读取以更新写入效果的数据.

I am in the situation where I want to assure that the write has happened programmatically. The only way I can think to do this is to send the write off to the library, and wait for the read to update the data the write effects.

是的,这可能是唯一的方法,但是有比旋转等待看读取更新是否发生更好的方法.

Yes, that likely is the only way to do it, but there are better ways than spin waiting to see if the read update happens.

这是处理处理的示例方式,就像我们在聊天中所讨论的那样我们做到了.总之,您有一个 IDataRequest 队列,在这些请求中,它们包含一个 TaskCompletionSource< T> ,代表发送数据的完成.向设备发出请求并获得响应后,就可以设置完成源的结果.我将其与您现有的基于事件的实现结合在一起,但是说实话,我将放弃该事件,仅让调用者在等待 RequestTemp()的结果后对UI进行更新.

Here is a example way to handle the processing like we talked in the chat we did. In summary, you have a queue of IDataRequest, in those request's they hold a TaskCompletionSource<T> that represents the completion of sending the data. Once you do a request to the device and get a response you set the result of the completion source. I combined it with your existing event based implmentation, but honestly I would drop the event's and just have the callers do the update to the UI after awaiting the result of RequestTemp().

public interface IDataRequest
{
    bool TrySetException(Exception ex);
}

public abstract class DataRequest<T> : IDataRequest
{
    public TaskCompletionSource<T> RequestTask { get; } = new TaskCompletionSource<T>();

    public bool TrySetException(Exception ex)
    {
        return RequestTask.TrySetException(ex);
    }
}

public class TempRequest : DataRequest<double>
{
}

public class RpmRequest : DataRequest<int>
{
}

public sealed class DeviceManager : IDisposable
{

    private readonly Task _workerThread;
    private readonly BlockingCollection<IDataRequest> _queue;
    private readonly SerialPort _serialPort;

    public DeviceManager()
    {
        _queue = new BlockingCollection<IDataRequest>();
        _workerThread = Task.Factory.StartNew(ProcessQueue, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
        _serialPort = //...
    }
    
    public event EventHandler<double> TempUpdate;
    public event EventHandler<int> RpmUpdate;

    public Task<double> RequestTemp()
    {
        var request = new TempRequest();
        _queue.Add(request);
        return request.RequestTask.Task;
    }

    public Task<int> RequestRpm()
    {
        var request = new RpmRequest();
        _queue.Add(request);
        return request.RequestTask.Task;
    }
    public void Dispose()
    {
        _queue.CompleteAdding();
        _workerThread.Wait();
    }

    private void ProcessQueue()
    {
        foreach (var dataRequest in _queue.GetConsumingEnumerable())
        {
            try
            {
                if (dataRequest is TempRequest)
                {
                    DoTempRequest((TempRequest)dataRequest);
                }
                else if (dataRequest is RpmRequest)
                {
                    DoRpmRequest((RpmRequest)dataRequest);
                }
                else
                {
                    throw new NotSupportedException($"A Request of type {dataRequest.GetType()} is not supported.");
                }
            }
            catch (Exception ex)
            {
                dataRequest.TrySetException(ex);
            }
        }
    }

    private void DoTempRequest(TempRequest dataRequest)
    {
        _serialPort.WriteLine("Temp ?");
        var line = _serialPort.ReadLine();
        double result;

        //I am deliberately using Parse instead of TryParse so responses that 
        //fail to parse will throw and get their exception propagated up via the 
        //catch in ProcessQueue().
        result = double.Parse(line);
        
        //Sends the info back to the caller saying it is done and what the result was.
        dataRequest.RequestTask.TrySetResult(result);

        //Raises the event so subscribers know the new value.
        OnTempUpdate(result);
    }

    private void DoRpmRequest(RpmRequest dataRequest)
    {
        _serialPort.WriteLine("RPM ?");
        var line = _serialPort.ReadLine();
        int result;
        result = int.Parse(line);
        
        dataRequest.RequestTask.TrySetResult(result);
        OnRpmUpdate(result);
        
    }

    private void OnTempUpdate(double result)
    {
        TempUpdate?.Invoke(this, result);
    }

    private void OnRpmUpdate(int result)
    {
        RpmUpdate?.Invoke(this, result);
    }
}

这篇关于异步检查值而不会阻塞线程?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆