这两个可观察的操作是否等效? [英] Are these two Observable Operations Equivalent?

查看:93
本文介绍了这两个可观察的操作是否等效?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我不确定为什么,但是由于某些原因,当使用通过concat创建的observable时,我将始终获得从列表中推送的所有值(按预期工作).就像普通订阅一样,似乎某些值永远不会使那些订阅了可观察对象的人(仅在某些条件下)成为可能.

I'm not sure why, but for some reason when using the observable that is created via concat I will always get all values that are pushed from my list (works as intended). Where as with the normal subscribe it seems that some values never make it to those who have subscribed to the observable (only in certain conditions).

这是我正在使用的两种情况.任何人都可以尝试解释为什么在某些情况下订阅第二个版本时未收到所有值吗?它们不相等吗?这里的目的是倒带流.有哪些原因可以解释为什么案例2失败而案例1没有失败.

These are the two cases that I am using. Could anyone attempt to explain why in certain cases when subscribing to the second version not all values are received? Are they not equivalent? The intent here is to rewind the stream. What are some reasons that could explain why Case 2 fails while Case 1 does not.

此处重播只是正在进行中的视频流的列表.

Replay here is just a list of the ongoing stream.

案例1.

let observable = 
        Observable.Create(fun (o:IObserver<'a>) ->
                        let next b =
                            for v in replay do
                                o.OnNext(v.Head)
                            o.OnNext(b)
                            o.OnCompleted()
                        someOtherObs.Subscribe(next, o.OnError, o.OnCompleted))
let toReturn = observable.Concat(someOtherObs).Publish().RefCount()

案例2.

let toReturn = 
    Observable.Create(fun (o:IObserver<'a>) ->
        for v in replay do
            o.OnNext(v.Head)
        someOtherObs.Subscribe(o)
    ).Publish().RefCount()

推荐答案

注意!我没有经常使用F#来使语法100%满意,但我想我知道发生了什么事.

Caveat! I don't use F# regularly enough to be 100% comfortable with the syntax, but I think I see what's going on.

也就是说,这两种情况对我来说都很奇怪,这在很大程度上取决于someOtherObs是如何实现的以及(在线程方面)运行的地方.

That said, both of these cases look odd to me and it greatly depends on how someOtherObs is implemented, and where (in terms of threads) things are running.

您将concat应用于看起来像这样的源流:

You apply concat to a source stream which appears to work like this:

  • 它订阅someOtherObs,并响应第一个事件(a),将重播元素推送给观察者.
  • 然后将事件(a)发送给观察者.
  • 然后完成.至此,流已结束,不再发送任何事件.
  • 如果someOtherObs为空或只有一个错误,它将被传播给观察者.

现在,当此流完成时,会将someOtherObs连接到其上.现在发生的情况有点不可思议-如果someOtherObs很冷,则将第二次发送第一个事件,如果someOtherObs很热,则不会重新发送第一个事件,但是在剩余的事件周围存在潜在的竞争条件下一步,这取决于实现someOtherObs的方式.如果天气炎热,您很容易错过活动.

Now, when this stream completes, someOtherObs is concatenated on to it. What happens now is a little unpreditcable - if someOtherObs is cold, then the first event would be sent a second time, if someOtherObs is hot, then the first event is not resent, but there's a potential race condition around which event of the remainder will go next which depends on how someOtherObs is implemented. You could easily miss events if it's hot.

您重播所有重播事件,然后发送someOtherObs的所有事件-但是如果someOtherObs很热,则再次出现竞争状况,因为您仅在推送重播后才订阅,因此可能会错过一些事件.

You replay all the replay events, and then send all the events of someOtherObs - but again there's a race condition if someOtherObs is hot because you only subscribe after pushing replay, and so might miss some events.

无论哪种情况,对我来说都是一团糟.

In either case, it seems messy to me.

这看起来像是尝试将世界状态(sotw)与实时流进行合并.在这种情况下,您需要先订阅实时流,并缓存所有事件,然后再获取并推送sotw事件.推送sotw后,您推送缓存的事件-小心地删除在sotw中读取的事件的重复数据-直到赶上现场直播时为止,尽管如此,您仍然可以传递现场事件.

This looks like an attempt to do a merge of a state of the world (sotw) and a live stream. In this case, you need to subscribe to the live stream first, and cache any events while you then acquire and push the sotw events. Once sotw is pushed, you push the cached events - being careful to de-dupe events that may been read in the sotw - until you are caught up with live at which point you can just pass live events though.

您常常可以避免使用天真的实现,这些实现会在实时流订阅的OnNext处理程序中刷新实时缓存,从而在刷新时有效地阻塞了源-但如果存在以下情况,您就有对实时源施加过多背压的风险:您拥有悠久的历史和/或实时直播.

You can often get away with naive implementations that flush the live cache in an OnNext handler of the live stream subscription, effectively blocking the source while you flush - but you run the risk of applying too much back pressure to the live source if you have a large history and/or a fast moving live stream.

您需要考虑的一些注意事项将有望使您走上正确的道路.

Some considerations for you to think on that will hopefully set you on the right path.

作为参考,这是我用RX-main nuget软件包在LINQPad中编译的非常简单和简单的C#实现.我过去完成的可用于生产的实现可能会变得非常复杂:

For reference, here is an extremely naïve and simplistic C# implementation I knocked up that compiles in LINQPad with rx-main nuget package. Production ready implementations I have done in the past can get quite complex:

void Main()
{
    // asynchronously produce a list from 1 to 10
    Func<Task<List<int>>> sotw =
        () => Task<List<int>>.Run(() => Enumerable.Range(1, 10).ToList());
    
    // a stream of 5 to 15
    var live = Observable.Range(5, 10);
    
    // outputs 1 to 15
    live.MergeSotwWithLive(sotw).Subscribe(Console.WriteLine);
}

// Define other methods and classes here
public static class ObservableExtensions
{
    public static IObservable<TSource> MergeSotwWithLive<TSource>(
        this IObservable<TSource> live,
        Func<Task<List<TSource>>> sotwFactory)
    {
        return Observable.Create<TSource>(async o =>
        {       
            // Naïve indefinite caching, no error checking anywhere             
            var liveReplay = new ReplaySubject<TSource>();
            live.Subscribe(liveReplay);
            // No error checking, no timeout, no cancellation support
            var sotw = await sotwFactory();
            foreach(var evt in sotw)
            {
                o.OnNext(evt);
            }                               
                        
            // note naive disposal
            // and extremely naive de-duping (it really needs to compare
            // on some unique id)
            // we are only supporting disposal once the sotw is sent            
            return liveReplay.Where(evt => !sotw.Any(s => s.Equals(evt)))
                    .Subscribe(o);                  
        });
    }
}

这篇关于这两个可观察的操作是否等效?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆