不停止序列的反应性扩展超时? [英] Reactive Extensions Timeout that doesn't stop sequence?
问题描述
我正在尝试制作一个 IObservable
返回 true
如果在过去 5 秒内收到了 UDP 消息并且发生超时,则返回 true
返回假.
I'm trying to make an IObservable<bool>
that returns true
if a UDP Message has been received in the last 5 seconds and if a timeout occurs, a false is returned.
到目前为止,我有这个:
So far I have this:
public IObservable<Boolean> GettingUDPMessages(IPEndPoint localEP)
{
var udp = BaseComms.UDPBaseStringListener(localEP)
.Where(msg => msg.Data.Contains("running"))
.Select(s => true);
return Observable
.Timeout(udp, TimeSpan.FromSeconds(5))
.Catch(Observable.Return(false));
}
与此有关的问题是:-
The issues with this are:-
- 一旦返回false,序列就会停止
- 我只需要
true
或false
状态更改.
- Once a false is returned, the sequence stops
- I only really need
true
orfalse
on state changes.
我可以使用 Subject
,但是当没有更多订阅者时,我需要小心处理 UDPBaseStringListener
observable.
I could use a Subject<T>
but I need to be careful to dispose of the UDPBaseStringListener
observable when there are no more subscribers.
更新
每次收到 UDP 消息时,我都希望它返回 true
.如果我在过去 5 秒内没有收到 UDP 消息,我希望它返回 false
.
Every time I get a UDP message I would like it to return a true
. If I haven't received a UDP message in the last 5 seconds, I would like it to return a false
.
推荐答案
正如 Bj0 所指出的,解决方案与BufferWithTime
不会在收到数据点后立即返回数据点,并且在收到数据点后不会重置缓冲区超时.
As pointed out by Bj0, the solution with BufferWithTime
will not return the data point as soon as it is received and the buffer timeout is not reset after receiving a data point.
使用 Rx Extensions 2.0,您可以通过接受超时和大小的新缓冲区重载来解决这两个问题:
With Rx Extensions 2.0, your can solve both problems with a new Buffer overload accepting both a timeout and a size:
static IObservable<Boolean> GettingUDPMessages(IPEndPoint localEP)
{
return BaseComms
.UDPBaseStringListener(localEP)
.Where(msg => msg.Data.Contains("running"))
.Buffer(TimeSpan.FromSeconds(5), 1)
.Select(s => s.Count > 0)
.DistinctUntilChanged();
}
这篇关于不停止序列的反应性扩展超时?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!