将TaskCompletionSource用作WaitHandle替代品是否可以接受? [英] Is it acceptable to use TaskCompletionSource as a WaitHandle substitute?

查看:62
本文介绍了将TaskCompletionSource用作WaitHandle替代品是否可以接受?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的代码使用ConcurrentQueue处理到远程主机的TCP连接,以存储传出消息.它旨在在单个线程中运行.连接的生存期包含在RunAsync中,而单独的对象包含连接的公共状态":

My code handles a TCP connection to a remote host, with a ConcurrentQueue to store outgoing messages. It's intended to run in a single thread. The lifetime of the connection is contained within RunAsync while a separate object contains the "public state" of the connection:

class PublicState
{
    internal readonly ConcurrentQueue<Message> OutgoingMessageQueue = new ConcurrentQueue<Message>();
    internal TaskCompletionSource<Object> OutgoingMessageTcs = null;

    internal readonly TaskCompletionSource<Object> ConnectedTcs = new TaskCompletionSource<Object>();

    public void EnqueueMessages(IEnumerable<Message> messages)
    {
        foreach( Message m in messages ) this.OutgoingMessageQueue.Enqueue( m);
        if( this.OutgoingMessageTcs == null ) this.OutgoingMessageTcs = new TaskCompletionSource<Object>();
        this.OutgoingMessageTcs.SetResult( null );
    }
}

static async Task RunAsync(IPEndPoint endPoint, PublicState state)
{
    using( TcpClient tcp = new TcpClient() )
    {
        await tcp.ConnectAsync( endPoint.Address, endPoint.Port ).ConfigureAwait(false);

        Byte[] reusableBuffer = new Byte[ 4096 ];

        using( NetworkStream ns = tcp.GetStream() )
        {
            state.ConnectedTcs.SetResult( null );

            Task<Int32> nsReadTask = null;

            while( tcp.Connected )
            {
                if( !state.writeQueue.IsEmpty )
                {
                    await WriteMessagesAsync( ... ).ConfigureAwait( false );
                }

                if( ns.DataAvailable )
                {
                    await ReadMessagesAsync( ... ).ConfigureAwait( false );
                }

                // Wait for new data to arrive from remote host or for new messages to send:
                if( state.OutgoingMessageTcs == null ) state.OutgoingMessageTcs = new TaskCompletionSource<Object>();
                if( nsReadTask == null ) nsReadTask = ns.ReadAsync( reusableBuffer, 0, 0 ).ConfigureAwait( false );

                Task c = await Task.WhenAny( state.OutgoingMessageTcs,  nsReadTask ).ConfigureAwait( false );
                if( c == state.OutgoingMessageTcs.Task ) state.OutgoingMessageTcs = null;
                else if( c == nsReadTask ) nsReadTask = null;
            }
        }
    }
}

像这样使用:

public async Task Main(String[] args)
{
    PublicState state = new PublicState();
    Task clientTask = Client.RunAsync( new IPEndPoint(args[0]), state );

    await state.ConnectedTcs.Task; // awaits until TCP connection is established

    state.EnqueueMessage( new Message("foo") );
    state.EnqueueMessage( new Message("bar") );
    state.EnqueueMessage( new Message("baz") );

    await clientTask; // awaits until the TCP connection is closed
}

此代码有效,但我不喜欢它:感觉就像我在使用TaskCompletionSource,它表示一个实际的Task或某种后台操作,而我实际上是在使用TaskCompletionSource一种便宜的EventWaitHandle.我不使用EventWaitHandle,因为它是IDisposable(我不想冒泄漏本机资源的风险),并且它缺少WaitAsyncWaitOneAsync方法.我可以使用SemaphoreSlim(可以等待,但包装了EventWaitHandle),但是我的代码并不能真正代表信号灯的良好使用.

This code works, but I don't like it: it feels like I'm using TaskCompletionSource which is meant to represent an actual Task or some kind of background operation, whereas I'm really using TaskCompletionSource as a kind of cheap EventWaitHandle. I'm not using EventWaitHandle because it's IDisposable (I don't want to risk leaking native resources) and it lacks a WaitAsync or WaitOneAsync method. I could use SemaphoreSlim (which is awaitable, but wraps an EventWaitHandle) but my code doesn't really represent a good use of a semaphore.

我对TaskCompletionSource<T>的使用是否可以接受,或者在将项目添加到OutgoingMessageQueue时是否有更好的方法在"RunAsync"中等待"执行?

Is my use of TaskCompletionSource<T> acceptable, or is there a better way to "un-await" execution in RunAsync when an item is added to OutgoingMessageQueue?

我觉得这是错误"的另一个原因是TaskCompletionSource<T>只能使用一次,然后需要替换.我渴望避免不必要的分配.

Another reason I feel it's "wrong" is that TaskCompletionSource<T> can only be used once, then it needs replacing. I'm keen to avoid extraneous allocations.

推荐答案

如果我对您的理解正确-TPL BufferBlock可能就是您所需要的.当前Enqueue的模拟为Post,您可以通过ReceiveAsync扩展方法接收下一个值.

If I understood you correctly - TPL BufferBlock might be what you need. Analog of current Enqueue is Post, and you can receive next value via ReceiveAsync extension method.

因此,使用BufferBlock,您的代码将变成这样:

So with BufferBlock your code becomes something like this:

class PublicState {
    internal readonly BufferBlock<Message> OutgoingMessageQueue = new BufferBlock<Message>();
    internal readonly TaskCompletionSource<Object> ConnectedTcs = new TaskCompletionSource<Object>();

    public void EnqueueMessage(Message message) {
        this.OutgoingMessageQueue.Post(message);
    }
}

static async Task RunAsync(IPEndPoint endPoint, PublicState state) {
    using (TcpClient tcp = new TcpClient()) {
        await tcp.ConnectAsync(endPoint.Address, endPoint.Port).ConfigureAwait(false);

        Byte[] reusableBuffer = new Byte[4096];

        using (NetworkStream ns = tcp.GetStream()) {
            state.ConnectedTcs.SetResult(null);

            Task<Int32> nsReadTask = null;
            Task<Message> newMessageTask = null;
            while (tcp.Connected) {
                // Wait for new data to arrive from remote host or for new messages to send:
                if (nsReadTask == null)
                    nsReadTask = ns.ReadAsync(reusableBuffer, 0, 0);
                if (newMessageTask == null)
                    newMessageTask = state.OutgoingMessageQueue.ReceiveAsync();
                var completed = await Task.WhenAny(nsReadTask, newMessageTask).ConfigureAwait(false);
                if (completed == newMessageTask) {
                    var result = await newMessageTask;
                    // do stuff
                    newMessageTask = null;
                }
                else {
                    var bytesRead = await nsReadTask;
                    nsReadTask = null;
                }
            }
        }
    }
}

作为一个奖励,该版本(我认为)是线程安全的,而当前版本则不是,因为您正在使用OutgoingMessageTcs从潜在的多个线程(RunAsyncEnqueueMessages调用者的线程).

As a bonus, this version is (I think) thread-safe, while your current version is not, because you are doing non-thread-safe things with OutgoingMessageTcs from potentially multiple threads (thread of RunAsync and thread of EnqueueMessages caller).

如果由于某种原因您不喜欢BufferBlock-您可以以完全相同的方式使用Nito.AsyncEx nuget包中的AsyncCollection.初始化变为:

If for some reason you don't like BufferBlock - you can use AsyncCollection from Nito.AsyncEx nuget package in exactly the same way. Initialization becomes:

internal readonly AsyncCollection<Message> OutgoingMessageQueue = new AsyncCollection<Message>(new ConcurrentQueue<Message>());

并获取:

if (newMessageTask == null)
   newMessageTask = state.OutgoingMessageQueue.TakeAsync();

这篇关于将TaskCompletionSource用作WaitHandle替代品是否可以接受?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆