对 Publish().Refcount() 行为的困惑 [英] Confusion over behavior of Publish().Refcount()

查看:43
本文介绍了对 Publish().Refcount() 行为的困惑的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这里有一个简单的程序,可以显示不同单词中的字母数.它按预期工作.

I've got a simple program here that displays the number of letters in various words. It works as expected.

static void Main(string[] args) {
    var word = new Subject<string>();
    var wordPub = word.Publish().RefCount();
    var length = word.Select(i => i.Length);
    var report =
        wordPub
        .GroupJoin(length,
            s => wordPub,
            s => Observable.Empty<int>(),
            (w, a) => new { Word = w, Lengths = a })
        .SelectMany(i => i.Lengths.Select(j => new { Word = i.Word, Length = j }));
    report.Subscribe(i => Console.WriteLine($"{i.Word} {i.Length}"));
    word.OnNext("Apple");
    word.OnNext("Banana");
    word.OnNext("Cat");
    word.OnNext("Donkey");
    word.OnNext("Elephant");
    word.OnNext("Zebra");
    Console.ReadLine();
}

输出为:

Apple 5
Banana 6
Cat 3
Donkey 6
Elephant 8
Zebra 5

我使用了 Publish().RefCount() 因为wordpub"两次包含在report"中.没有它,当首先发出一个单词时,报告的一部分将通过回调得到通知,然后报告的另一部分将得到通知,通知加倍.这就是发生的事情;输出最终有 11 个项目而不是 6 个.至少我认为是这样.我认为在这种情况下使用 Publish().RefCount() 作为同时更新报告的两个部分.

I used the Publish().RefCount() because "wordpub" is included in "report" twice. Without it, when a word is emitted first one part of the report would get notified by a callback, and then the other part of report would be notified, double the notifications. That is kindof what happens; the output ends up having 11 items rather than 6. At least that is what I think is going on. I think of using Publish().RefCount() in this situation as simultaneously updating both parts of the report.

但是,如果我将长度函数更改为也使用已发布的源代码:

However if I change the length function to ALSO use the published source like this:

var length = wordPub.Select(i => i.Length);

然后输出是这样的:

Apple 5
Apple 6
Banana 6
Cat 3
Banana 3
Cat 6
Donkey 6
Elephant 8
Donkey 8
Elephant 5
Zebra 5

为什么长度函数不能也使用相同的发布源?

Why can't the length function also use the same published source?

推荐答案

这是一个需要解决的巨大挑战!发生这种情况的条件如此微妙.提前为冗长的解释道歉,但请耐心等待!

This was a great challenge to solve! So subtle the conditions that this happens. Apologies in advance for the long explanation, but bear with me!

TL;DR

对已发布源的订阅按顺序处理,但在任何其他直接对未发布源的订阅之前处理.即你可以插队!使用 GroupJoin 订阅顺序对于确定窗口何时打开和关闭很重要.

Subscriptions to the published source are processed in order, but before any other subscription directly to the unpublished source. i.e. you can jump the queue! With GroupJoin subscription order is important to determine when windows open and close.

我首先担心的是您正在发布引用一个主题.这应该是一个空操作.Subject 没有订阅费用.

My first concern would be that you are publish refcounting a subject. This should be a no-op. Subject<T> has no subscription cost.

因此,当您删除 Publish().RefCount() 时:

So when you remove the Publish().RefCount() :

var word = new Subject<string>();
var wordPub = word;//.Publish().RefCount();
var length = word.Select(i => i.Length);

然后你会遇到同样的问题.

then you get the same issue.

然后我查看 GroupJoin(因为我的直觉表明 Publish().Refcount() 是一个红鲱鱼).对我来说,仅靠眼球很难合理化,所以我也依靠简单的调试我已经使用了几十次 - TraceLog 扩展方法.

So then I look to the GroupJoin (because my intuition suggests that Publish().Refcount() is a red herring). For me, eyeballing this alone was too hard to rationalise, so I lean on a simple debugging too I have used dozens of times of the years - a Trace or Log extension method.

public interface ILogger
{
    void Log(string input);
}
public class DumpLogger : ILogger
{
    public void Log(string input)
    {
        //LinqPad `Dump()` extension method. 
        //  Could use Console.Write instead.
        input.Dump();
    }
}


public static class ObservableLoggingExtensions
{
    private static int _index = 0;

    public static IObservable<T> Log<T>(this IObservable<T> source, ILogger logger, string name)
    {
        return Observable.Create<T>(o =>
        {
            var index = Interlocked.Increment(ref _index);
            var label = $"{index:0000}{name}";
            logger.Log($"{label}.Subscribe()");
            var disposed = Disposable.Create(() => logger.Log($"{label}.Dispose()"));
            var subscription = source
                .Do(
                    x => logger.Log($"{label}.OnNext({x.ToString()})"),
                    ex => logger.Log($"{label}.OnError({ex})"),
                    () => logger.Log($"{label}.OnCompleted()")
                )
                .Subscribe(o);

            return new CompositeDisposable(subscription, disposed);
        });
    }
}

当我将日志添加到您提供的代码时,它看起来像这样:

When I add the logging to your provided code it looks like this:

var logger = new DumpLogger();

var word = new Subject<string>();
var wordPub = word.Publish().RefCount();
var length = word.Select(i => i.Length);
var report =
    wordPub.Log(logger, "lhs")
    .GroupJoin(word.Select(i => i.Length).Log(logger, "rhs"),
        s => wordPub.Log(logger, "lhsDuration"),
        s => Observable.Empty<int>().Log(logger, "rhsDuration"),
        (w, a) => new { Word = w, Lengths = a })
    .SelectMany(i => i.Lengths.Select(j => new { Word = i.Word, Length = j }));
report.Subscribe(i => ($"{i.Word} {i.Length}").Dump("OnNext"));
word.OnNext("Apple");
word.OnNext("Banana");
word.OnNext("Cat");
word.OnNext("Donkey");
word.OnNext("Elephant");
word.OnNext("Zebra");

这将在我的日志中输出如下内容

This will then output in my log something like the following

使用 Publish().RefCount() 记录

0001lhs.Subscribe()             
0002rhs.Subscribe()             
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe()     
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()

    OnNext
    Apple 5 

0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe()     
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose()       
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()

    OnNext
    Banana 6 
...

但是,当我删除用法 Publish().RefCount() 时,新的日志输出如下:

However when I remove the usage Publish().RefCount() the new log output is as follows:

没有主题的日志

0001lhs.Subscribe()                 
0002rhs.Subscribe()                 
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe()         
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()

    OnNext
    Apple 5 

0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe()         
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()

    OnNext
    Apple 6 

    OnNext
    Banana 6 

0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose()
...

这为我们提供了一些见解,但是当我们开始使用逻辑订阅列表注释日志时,问题才真正变得清晰.

This gives us some insight, however when the issue really becomes clear is when we start annotating our logs with a logical list of subscriptions.

在带有 RefCount 的原始(工作)代码中,我们的注释可能如下所示

In the original (working) code with the RefCount our annotations might look like this

//word.Subsribers.Add(wordPub)

0001lhs.Subscribe()             //wordPub.Subsribers.Add(0001lhs)
0002rhs.Subscribe()             //word.Subsribers.Add(0002rhs)
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe()     //wordPub.Subsribers.Add(0003lhsDuration)
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()

    OnNext
    Apple 5 

0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe()     //wordPub.Subsribers.Add(0005lhsDuration)
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose()       //wordPub.Subsribers.Remove(0003lhsDuration)
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()

    OnNext
    Banana 6 

所以在这个例子中,当 word.OnNext("Banana"); 被执行时,观察者链是按这个顺序链接的

So in this example, when word.OnNext("Banana"); is executed the chain of observers is linked in this order

  1. wordPub
  2. 0002rhs

但是wordPub 有子订阅!所以真正的订阅列表看起来像

However, wordPub has child subscriptions! So the real subscription list looks like

  1. wordPub
  1. wordPub
  1. 0001lhs
  2. 0003lhsDuration
  3. 0005lhsDuration

  • 0002rhs
  • 如果我们只注释主题日志,我们就会看到微妙之处

    If we annotate the Subject only log we see where the subtlety lies

    0001lhs.Subscribe()                 //word.Subsribers.Add(0001lhs)
    0002rhs.Subscribe()                 //word.Subsribers.Add(0002rhs)
    0001lhs.OnNext(Apple)
    0003lhsDuration.Subscribe()         //word.Subsribers.Add(0003lhsDuration)
    0002rhs.OnNext(5)
    0004rhsDuration.Subscribe()
    0004rhsDuration.OnCompleted()
    0004rhsDuration.Dispose()
    
        OnNext
        Apple 5 
    
    0001lhs.OnNext(Banana)
    0005lhsDuration.Subscribe()         //word.Subsribers.Add(0005lhsDuration)
    0002rhs.OnNext(6)
    0006rhsDuration.Subscribe()
    0006rhsDuration.OnCompleted()
    0006rhsDuration.Dispose()
    
        OnNext
        Apple 6 
    
        OnNext
        Banana 6 
    
    0003lhsDuration.OnNext(Banana)
    0003lhsDuration.Dispose()
    

    所以在这个例子中,当 word.OnNext("Banana"); 被执行时,观察者链是按这个顺序链接的

    So in this example, when word.OnNext("Banana"); is executed the chain of observers is linked in this order

    1. 0001lhs
    2. 0002rhs
    3. 0003lhsDuration
    4. 0005lhsDuration
    

    由于 0003lhsDuration 订阅在 0002rhs 之后被激活,它不会看到终止窗口的香蕉"值,直到 rhs 之后> 已发送值,因此在仍然打开的窗口中产生它.

    As the 0003lhsDuration subscription is activated after the 0002rhs, it wont see the "Banana" value to terminate the window, until after the rhs has been sent the value, thus yielding it in the still open window.

    正如@francezu13k50 指出的,解决您的问题的显而易见且简单的方法是使用 word.Select(x => new { Word = x, Length = x.Length });,但是我认为您已经为我们提供了您真正问题的简化版本(赞赏),我理解为什么这不合适.但是,因为我不知道您真正的问题空间是什么,所以我不确定要向您建议什么来提供解决方案,除了您拥有当前代码的解决方案,现在您应该知道它为什么会这样工作.

    As @francezu13k50 points out the obvious and simple solution to your problem is to just use word.Select(x => new { Word = x, Length = x.Length });, but as I think you have given us a simplified version of your real problem (appreciated) I understand why this isn't suitable. However, as I dont know what your real problem space is I am not sure what to suggest to you to provide a solution, except that you have one with your current code, and now you should know why it works the way it does.

    这篇关于对 Publish().Refcount() 行为的困惑的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆