如何解决 Publish().RefCount() 行为的不一致问题? [英] How to fix the inconsistency of the Publish().RefCount() behavior?

查看:29
本文介绍了如何解决 Publish().RefCount() 行为的不一致问题?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

最近我偶然发现了一个 有趣的声明 Enigmativity 关于发布RefCount 运算符:

Recently I stumbled upon an interesting statement by Enigmativity about the Publish and RefCount operators:

您正在使用危险的 .Publish().RefCount() 运算符对,它创建了一个在完成后无法订阅的序列.

You're using the dangerous .Publish().RefCount() operator pair which creates a sequence that can't be subscribed to after it completes.

此声明似乎反对 Lee Campbell 对这些运营商的评估.引用他的书 Rx 介绍:

This statement seems to oppose Lee Campbell's assessment about these operators. Quoting from his book Intro to Rx:

Publish/RefCount 对对于获取冷 observable 并将其作为热 observable 序列共享给后续观察者非常有用.

The Publish/RefCount pair is extremely useful for taking a cold observable and sharing it as a hot observable sequence for subsequent observers.

一开始我不相信Enigmativity的说法是正确的,所以我试图反驳.我的实验表明 Publish().RefCount() 可以确实不一致.再次订阅已发布的序列可能会导致对源序列的新订阅,也可能不会,这取决于源序列在连接时是否已完成.如果已完成,则不会重新订阅.如果未完成,则将重新订阅.以下是此行为的演示:

Initially I didn't believe that Enigmativity's statement was correct, so I tried to refute it. My experiments revealed that the Publish().RefCount() can be indeed inconsistent. Subscribing a second time to a published sequence can cause a new subscription to the source sequence, or not, depending on whether the source sequence was completed while connected. If it was completed, then it won't be resubscribed. If it was not completed, then it will be resubscribed. Here is a demonstration of this behavior:

var observable = Observable
    .Create<int>(o =>
    {
        o.OnNext(13);
        o.OnCompleted(); // Commenting this line alters the observed behavior
        return Disposable.Empty;
    })
    .Do(x => Console.WriteLine($"Producer generated: {x}"))
    .Finally(() => Console.WriteLine($"Producer finished"))
    .Publish()
    .RefCount()
    .Do(x => Console.WriteLine($"Consumer received #{x}"))
    .Finally(() => Console.WriteLine($"Consumer finished"));

observable.Subscribe().Dispose();
observable.Subscribe().Dispose();

在这个例子中,observable 由三部分组成.首先是生成单个值然后完成的生产部分.然后遵循发布机制(Publish+RefCount).最后是观察生产者发出的值的消费部分.observable 订阅了两次.预期的行为是每个订阅都会收到一个值.但这不是发生的事情!这是输出:

In this example the observable is composed by three parts. First is the producing part that generates a single value and then completes. Then follows the publishing mechanism (Publish+RefCount). And finally comes the consuming part that observes the values emitted by the producer. The observable is subscribed twice. The expected behavior would be that each subscription will receive one value. But this is not what happens! Here is the output:

Producer generated: 13
Consumer received #13
Producer finished
Consumer finished
Consumer finished

(试试小提琴)

如果我们注释 o.OnCompleted(); 行,这里是输出.这种微妙的变化导致了预期和可取的行为:

And here is the output if we comment the o.OnCompleted(); line. This subtle change results to a behavior that is expected and desirable:

Producer generated: 13
Consumer received #13
Producer finished
Consumer finished
Producer generated: 13
Consumer received #13
Producer finished
Consumer finished

在第一种情况下,生产者(Publish().RefCount() 之前的部分)只订阅了一次.第一个消费者收到了发出的值,但第二个消费者什么也没收到(除了来自 OnCompleted 通知).在第二种情况下,生产者订阅了两次.每次产生一个值,每个消费者得到一个值.

In the first case the cold producer (the part before the Publish().RefCount()) was subscribed only once. The first consumer received the emitted value, but the second consumer received nothing (except from an OnCompleted notification). In the second case the producer was subscribed twice. Each time it generated a value, and each consumer got one value.

我的问题是:我们如何解决这个问题?我们如何修改 Publish 运算符或 RefCount 或两者,以使它们的行为始终一致且合乎需要?以下是理想行为的规范:

My question is: how can we fix this? How can we modify either the Publish operator, or the RefCount, or both, in order to make them behave always consistently and desirably? Below are the specifications of the desirable behavior:

  1. 发布的序列应将直接来自源序列的所有通知传播给其订阅者,而不是其他任何通知.
  2. 当发布序列的当前订阅者数量从 0 增加到 1 时,它应该订阅源序列.
  3. 只要至少有一个订阅者,发布的序列就应该与源保持连接.
  4. 当已发布序列的当前订阅者数量为零时,它应该从源取消订阅.

我要求提供上述功能的自定义 PublishRefCount 运算符,或使用内置运算符实现所需功能的方法.

I am asking for either a custom PublishRefCount operator that offers the functionality described above, or for a way to achieve the desirable functionality using the built-in operators.

顺便说一句 存在类似的问题,这就问为什么会发生这种情况.我的问题是如何修复它.

Btw a similar question exists, that asks why this happens. My question is about how to fix it.

更新:回想起来,上述规范导致了不稳定的行为,使得竞态条件不可避免.不能保证对已发布序列的两次订阅将导致对源序列的一次订阅.源序列可能在两个订阅之间完成,导致第一个订阅者取消订阅,导致 RefCount 运算符取消订阅,导致下一个订阅者对源进行新订阅.内置 .Publish().RefCount() 的行为可防止这种情况发生.

Update: In retrospect, the above specification results to an unstable behavior that makes race-conditions unavoidable. There is no guarantee that two subscriptions to the published sequence will result to a single subscription to the source sequence. The source sequence may complete between the two subscriptions, causing the unsubscription of the first subscriber, causing the unsubscription of the RefCount operator, causing a new subscription to the source for the next subscriber. The behavior of the built-in .Publish().RefCount() prevents this from happening.

道德教训是 .Publish().RefCount() 序列没有被破坏,但它不可重用.它不能可靠地用于多个连接/断开会话.如果您想要第二个会话,您应该创建一个新的 .Publish().RefCount() 序列.

The moral lesson is that the .Publish().RefCount() sequence is not broken, but it's not reusable. It cannot be used reliably for multiple connect/disconnect sessions. If you want a second session, you should create a new .Publish().RefCount() sequence.

推荐答案

Lee 做了一个 干得好 解释了 IConnectableObservable,但是 Publish 没有很好地解释.这是一种非常简单的动物,很难解释.我假设你理解 IConnectableObservable:

Lee does a good job explaining IConnectableObservable, but Publish isn't explained that well. It's a pretty simple animal, just hard to explain. I'll assume you understand IConnectableObservable:

如果我们简单而懒惰地重新实现零参数Publish函数,它看起来像这样:

If we to re-implement the zero-param Publish function simply and lazily, it would look something like this:

//  For illustrative purposes only: don't use this code
public class PublishObservable<T> : IConnectableObservable<T>
{
    private readonly IObservable<T> _source;
    private readonly Subject<T> _proxy = new Subject<T>();
    private IDisposable _connection;
    
    public PublishObservable(IObservable<T> source)
    {
        _source = source;
    }
    
    public IDisposable Connect()
    {
        if(_connection == null)
            _connection = _source.Subscribe(_proxy);
        var disposable = Disposable.Create(() =>
        {
            _connection.Dispose();
            _connection = null;
        });
        return _connection;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        var _subscription = _proxy.Subscribe(observer);
        return _subscription;
    }
}

public static class X
{
    public static IConnectableObservable<T> Publish<T>(this IObservable<T> source)
    {
        return new PublishObservable<T>(source);
    }
}

Publish 创建一个单一的代理 Subject 订阅源 observable.代理可以根据连接订阅/取消订阅源:调用Connect,代理订阅源.在一次性连接上调用 Dispose 并且代理从源取消订阅.从中得出的重要想法是,有一个 Subject 代理与源的任何连接.您不能保证只订阅一个源,但可以保证一个代理和一个并发连接.您可以通过连接/断开连接获得多个订阅.

Publish creates a single proxy Subject which subscribes to the source observable. The proxy can subscribe/unsubscribe to source based on the connection: Call Connect, and proxy subscribes to source. Call Dispose on the connection disposable and the proxy unsubscribes from source. The important think to take-away from this is that there is a single Subject that proxies any connection to the source. You're not guaranteed only one subscription to source, but you are guaranteed one proxy and one concurrent connection. You can have multiple subscriptions via connecting/disconnecting.

RefCount 处理调用 Connect 的部分:这是一个简单的重新实现:

RefCount handles the calling Connect part of things: Here's a simple re-implementation:

//  For illustrative purposes only: don't use this code
public class RefCountObservable<T> : IObservable<T>
{
    private readonly IConnectableObservable<T> _source;
    private IDisposable _connection;
    private int _refCount = 0;

    public RefCountObservable(IConnectableObservable<T> source)
    {
        _source = source;
    }
    public IDisposable Subscribe(IObserver<T> observer)
    {
        var subscription = _source.Subscribe(observer);
        var disposable = Disposable.Create(() =>
        {
            subscription.Dispose();
            DecrementCount();
        });
        if(++_refCount == 1)
            _connection = _source.Connect();
            
        return disposable;
    }

    private void DecrementCount()
    {
        if(--_refCount == 0)
            _connection.Dispose();
    }
}
public static class X
{
    public static IObservable<T> RefCount<T>(this IConnectableObservable<T> source)
    {
        return new RefCountObservable<T>(source);
    }

}

多一点代码,但仍然很简单:如果 refcount 上升到 1,则在 ConnectableObservable 上调用 Connect,如果下降到 0,则断开连接.

A bit more code, but still pretty simple: Call Connect on the ConnectableObservable if refcount goes up to 1, disconnect if it goes down to 0.

将两者放在一起,你会得到一对保证只有一个并发订阅源可观察,通过一个持久的Subject代理.Subject 只会在有 >0 个下游订阅时订阅源.

Put the two together, and you get a pair that guarantee that there will only be one concurrent subscription to a source observable, proxied through one persistent Subject. The Subject will only be subscribed to the source while there is >0 downstream subscriptions.

鉴于该介绍,您的问题中有很多误解,所以我将一一介绍:

Given that introduction, there's a lot of misconceptions in your question, so I'll go over them one by one:

... Publish().RefCount() 确实可能不一致.订阅一秒钟发布序列的时间可能会导致新订阅源序列与否,取决于源序列是否为连接时完成.如果它完成了,那么它不会重新订阅.如果未完成,则将重新订阅.

... Publish().RefCount() can be indeed inconsistent. Subscribing a second time to a published sequence can cause a new subscription to the source sequence, or not, depending on whether the source sequence was completed while connected. If it was completed, then it won't be resubscribed. If it was not completed, then it will be resubscribed.

.Publish().RefCount() 只会在一种条件下重新订阅源:当它从零订阅者变为 1 时.如果订阅者计数从 0 到 1 到 0 到1 出于任何原因,那么您最终将重新订阅.源 observable 完成将导致 RefCount 发出 OnCompleted,并且其所有观察者取消订阅.因此,对 RefCount 的后续订阅将触发重新订阅源的尝试.自然地,如果源正确地观察了可观察契约,它会立即发出 OnCompleted ,就是这样.

.Publish().RefCount() will subscribe anew to source under one condition only: When it goes from zero subscribers to 1. If the count of subscribers goes from 0 to 1 to 0 to 1 for any reason then you will end up re-subscribing. The source observable completing will cause RefCount to issue an OnCompleted, and all of its observers unsubscribe. So subsequent subscriptions to RefCount will trigger an attempt to resubscribe to source. Naturally if source is observing the observable contract properly it will issue an OnCompleted immediately and that will be that.

[查看 OnCompleted 的示例 observable...] 该 observable 被订阅了两次.这预期行为是每个订阅都会收到一个价值.

[see sample observable with OnCompleted...] The observable is subscribed twice. The expected behavior would be that each subscription will receive one value.

没有.预期的行为是代理 Subject 在发出 OnCompleted 后将重新发出 OnCompleted 到任何后续订阅尝试.由于您的源 observable 在您的第一个订阅结束时同步完成,第二个订阅将尝试订阅一个已经发出 OnCompletedSubject.这应该导致 OnCompleted,否则 Observable 合约将被破坏.

No. The expected behavior is that the proxy Subject after issuing an OnCompleted will re-emit an OnCompleted to any subsequent subscription attempt. Since your source observable completes synchronously at the end of your first subscription, the second subscription will be attempting to subscribe to a Subject that has already issued an OnCompleted. This should result in an OnCompleted, otherwise the Observable contract would be broken.

[将没有 OnCompleted 的示例 observable 作为第二种情况...] 在第一种情况是冷生产者(之前的部分Publish().RefCount()) 只订阅了一次.第一消费者收到发出的值,但第二个消费者什么也没收到(除了 OnCompleted 通知).在第二种情况下生产者被订阅了两次.每次它生成一个值,并且每个消费者都有一个价值.

[see sample observable without OnCompleted as second case...] In the first case the cold producer (the part before the Publish().RefCount()) was subscribed only once. The first consumer received the emitted value, but the second consumer received nothing (except from an OnCompleted notification). In the second case the producer was subscribed twice. Each time it generated a value, and each consumer got one value.

这是正确的.由于代理 Subject 从未完成,后续对 source 的重新订阅将导致冷 observable 重新运行.

This is correct. Since the proxy Subject never completed, subsequent re-subscriptions to source will result in the cold observable re-running.

我的问题是:我们如何解决这个问题?[..]

My question is: how can we fix this? [..]

  1. 发布的序列应该将所有直接来自源序列的通知传播给它的订阅者,并且没有否则.
  2. 当发布序列的当前订阅者数量从 0 增加到 1 时,它应该订阅源序列.
  3. 只要至少有一个订阅者,发布的序列就应该与源保持连接.
  4. 当发布的序列当前订阅者数量为零时,它应该从源取消订阅.

上述所有情况目前都发生在 .Publish.RefCount 目前只要您没有完成/错误.我不建议实施一个改变这一点的操作符,从而破坏 Observable 契约.

All of the above currently happens with .Publish and .RefCount currently as long as you don't complete/error. I don't suggest implementing an operator that changes that, breaking the Observable contract.

编辑:

我认为与 Rx 混淆的第一大来源是 Hot/Cold observables.由于 Publish 可以预热"冷 observable,因此它会导致令人困惑的边缘情况也就不足为奇了.

I would argue the #1 source of confusion with Rx is Hot/Cold observables. Since Publish can 'warm-up' cold observables, it's no surprise that it should lead to confusing edge cases.

首先,关于可观察合约.Observable 契约更简洁地说明了 OnNext 永远不能跟在 OnCompleted/OnError 之后,并且应该只有一个 OnCompleted OnError 通知.这确实留下了尝试订阅终止的 observable 的边缘情况:尝试订阅终止的 observable 会立即收到终止消息.这是否违反合同?也许吧,但据我所知,这是图书馆里唯一的合同作弊者.另一种选择是订阅死空气.这对任何人都没有帮助.

First, a word on the observable contract. The Observable contract stated more succinctly is that an OnNext can never follow an OnCompleted/OnError, and there should be only one OnCompleted or OnError notification. This does leave the edge case of attempts to subscribe to terminated observables: Attempts to subscribe to terminated observables result in receiving the termination message immediately. Does this break the contract? Perhaps, but it's the only contract cheat, to my knowledge, in the library. The alternative is a subscription to dead air. That doesn't help anybody.

这如何与热/冷 observables 联系起来?不幸的是,令人困惑.订阅冰冷的 observable 会触发整个 observable 管道的重建.这意味着 subscribe-to-already-termination 规则仅适用于热 observables.Cold observables 总是重新开始.

How does this tie into hot/cold observables? Unfortunately, confusingly. A subscription to an ice-cold observable triggers a re-construction of the entire observable pipeline. This means that subscribe-to-already-terminated rule only applies to hot observables. Cold observables always start anew.

考虑这段代码,其中 o 是一个冷可观察对象.:

Consider this code, where o is a cold observable.:

var o = Observable.Interval(TimeSpan.FromMilliseconds(100))
    .Take(5);
var s1 = o.Subscribe(i => Console.WriteLine(i.ToString()));
await Task.Delay(TimeSpan.FromMilliseconds(600));
var s2 = o.Subscribe(i => Console.WriteLine(i.ToString()));

就合约而言,s1 后面的 observable 和 s2 后面的 observable 是完全不同的.因此,即使它们之间存在延迟,并且您最终会在 OnCompleted 之后看到 OnNext,但这不是问题,因为它们是完全不同的可观察对象.

For the purposes of the contract, the observable behind s1 and observable behind s2 are entirely different. So even though there's a delay between them, and you'll end up seeing OnNext after OnCompleted, that's not a problem, because they are entirely different observables.

它变得棘手的地方是预热的Publish 版本.如果您在上面的代码中将 .Publish().RefCount() 添加到 o 的末尾...

Where it get's sticky is with a warmed-up Publish version. If you were to add .Publish().RefCount() to the end of o in the code above...

  • 不改变任何其他内容,s2 将立即终止不打印任何内容.
  • 将延迟更改为 400 左右,s2 将打印最后两个数字.
  • s1 更改为仅 .Take(2)s2 将重新开始打印 0 到 4.
  • Without changing anything else, s2 would terminate immediately printing nothing.
  • Change the delay to 400 or so, and s2 would print the last two numbers.
  • Change s1 to only .Take(2), and s2 would start over again printing 0 through 4.

更糟的是,Shroedinger 的猫效应:如果你在 o 上设置一个观察者来观察整个时间会发生什么,这会改变引用计数,影响功能!观察它,改变行为.调试噩梦.

Making this nastiness worse, is the Shroedinger's cat effect: If you set up an observer on o to watch what would happen the whole time, that changes the ref-count, affecting the functionality! Watching it, changes the behavior. Debugging nightmare.

这是尝试预热"冷可观测值的危险.它只是不能很好地工作,尤其是对于 Publish/RefCount.

This is the hazard of attempting to 'warm-up' cold observables. It just doesn't work well, especially with Publish/RefCount.

我的建议是:

  1. 不要试图加热冷的 observables.
  2. 如果您需要与冷或热观察者共享订阅,请坚持@Enigmativity 严格使用选择器 Publish 版本的一般规则
  3. 如果必须,请对 Publish/RefCount observable 进行虚拟订阅.这至少提供了一致的 Refcount >= 1,减少了量子活动效应.
  1. Don't try to warm up cold observables.
  2. If you need to share a subscription, with either cold or hot observables, stick with @Enigmativity's general rule of strictly using the selector Publish version
  3. If you must, have a dummy subscription on a Publish/RefCount observable. This at least provides a consistent Refcount >= 1, reducing the quantum activity effect.

这篇关于如何解决 Publish().RefCount() 行为的不一致问题?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆