我如何等待从RX主题,而不会引入竞争状态的响应? [英] How do I await a response from an RX Subject without introducing a race condition?

查看:99
本文介绍了我如何等待从RX主题,而不会引入竞争状态的响应?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个服务,它允许呼叫者异步发送命令和接收响应。在实际应用中,这些动作都相当断开(有些动作将会发送一个命令,响应将是独立的过程)。

不过,在我的测试中,我需要能够发送一个命令,然后等待继续测试前(第一)的响应。

的答复是使用RX出版,我在code第一次尝试这样的:

  service.SendCommand(BLAH);
等待service.Responses.FirstAsync();

这里的问题是,如果这个AWAIT已经被击中后响应到达 FirstAsync 才有效。如果该服务过程非常快,那么测试将挂在等待

我的下一个试图解决这个问题是调用 FirstAsync()之前发送命令,所以,这将有结果,即使它在等待前到达:

  VAR firstResponse = service.Responses.FirstAsync();
service.SendCommand(嗒嗒);
等待firstResponse;

然而,这仍不能以相同的方式。现在看来似乎只有当等待被击中( GetAwaiter 的叫法),它开始监听;因此完全相同的竞争条件存在。

如果我改变我的主题为 ReplaySubject 与缓冲区(或定时器),那么我可以变通方法这一点;但它没有意义做,在我的生产班;它只会是用于测试。

什么是正确的方式能在RX做到这一点?我如何设置的东西,将收到的第一个事件上的数据流中,将不会引入竞争条件的方法吗?

下面是一个小的测试,说明了单线程的方式的问题。本次测试将indefintely挂起:

  [事实]
公共异步任务MyTest的()
{
    变种x =新的受试对象;布尔>();    //订阅第一布尔(但不要等待它尚未)
    变种firstBool = x.FirstAsync();    //发送第一个布尔
    x.OnNext(真);    //恭候接收第一布尔任务
    变种B =等待firstBool; //< - 挂在这里; presumably因为直到GetAwaiter被称为firstBool没有开始监测?
    Assert.Equal(真,B);
}

我甚至尝试调用replay()在我的测试以为它会缓冲效果;但是,这并不能改变什么:

  [事实]
公共异步任务MyTest的()
{
    变种x =新的受试对象;布尔>();    变种firstBool = x.Replay();    //发送第一个布尔
    x.OnNext(真);    //恭候接收第一布尔任务
    变种B =等待firstBool.FirstAsync(); //< - 仍然挂起这里
    Assert.Equal(真,B);
}


解决方案

您可以做到这一点的 AsyncSubject

  [事实]
公共异步任务MyTest的()
{
    变种x =新的受试对象;布尔>();    VAR firstBool = x.FirstAsync()PublishLast()。 // PublishLast包装一个AsyncSubject
    firstBool.Connect();    //发送第一个布尔
    x.OnNext(真);    //恭候接收第一布尔任务
    变种B =等待firstBool;
    Assert.Equal(真,B);
}

AsyncSubject 基本缓存的onComplete 调用之前的最后一个接收到的值,然后重新播放它。

I have a service that allows a caller to send commands and receive responses asynchronously. In a real application, these actions are fairly disconnected (some action will send a command, and the responses will be process independently).

However, in my tests, I need to be able to send a command and then wait for the (first) response before continuing the test.

The responses are published using RX, and my first attempt at the code was something like this:

service.SendCommand("BLAH");
await service.Responses.FirstAsync();

The problem with this, is that FirstAsync will only work if the response arrives after this await has already been hit. If the service processes very quickly, then the test will hang on the await.

My next attempt to fix this was to call FirstAsync() prior to sending the command, so that it would have the result even if it arrived before awaiting:

var firstResponse = service.Responses.FirstAsync();
service.SendCommand("BLAH");
await firstResponse;

However, this still fails in the same way. It seems like it's only when the await is hit (GetAwaiter is called) that it starts listening; so the exact same race condition exists.

If I change my Subject to a ReplaySubject with a buffer (or timer) then I can "workaround" this; however it doesn't make sense to do that in my production classes; it would only be for testing.

What's the "correct" way to be able to do this in RX? How can I set up something that will receive the first event on a stream in a way that won't introduce a race condition?

Here's a small test that illustrates the issue in a "single-threaded" way. This test will hang indefintely:

[Fact]
public async Task MyTest()
{
    var x = new Subject<bool>();

    // Subscribe to the first bool (but don't await it yet)
    var firstBool = x.FirstAsync();

    // Send the first bool
    x.OnNext(true);

    // Await the task that receives the first bool
    var b = await firstBool; // <-- hangs here; presumably because firstBool didn't start monitoring until GetAwaiter was called?


    Assert.Equal(true, b);
}

I even tried calling Replay() in my test thinking it would buffer the results; but that doesn't change anything:

[Fact]
public async Task MyTest()
{
    var x = new Subject<bool>();

    var firstBool = x.Replay();

    // Send the first bool
    x.OnNext(true);

    // Await the task that receives the first bool
    var b = await firstBool.FirstAsync(); // <-- Still hangs here


    Assert.Equal(true, b);
}

解决方案

You can do this with an AsyncSubject

[Fact]
public async Task MyTest()
{
    var x = new Subject<bool>();

    var firstBool = x.FirstAsync().PublishLast(); // PublishLast wraps an AsyncSubject
    firstBool.Connect();

    // Send the first bool
    x.OnNext(true);

    // Await the task that receives the first bool
    var b = await firstBool;


    Assert.Equal(true, b);
}

AsyncSubject basically caches the last received value before OnComplete is called and then replays it.

这篇关于我如何等待从RX主题,而不会引入竞争状态的响应?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆