为什么在所有初始订阅者都断开连接后,RefCount无法正常工作?(redux) [英] Why is RefCount not working after all initial subscribers disconnect? (redux)

查看:42
本文介绍了为什么在所有初始订阅者都断开连接后,RefCount无法正常工作?(redux)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

根据Lee Campbell的要求,这是

As requested by Lee Campbell, this is a follow-on question to this original. It is intended to present the question in the context of the use case I was attempting to solve.

我有一个 WebApiService ,它包装了原始Web API并提供令牌管理.也就是说,它会跟踪身份验证令牌,并将其传递给原始API.这是 WebApiService 中的公共方法之一的示例:

I have a WebApiService that wraps a raw web API and provides token management. That is, it keeps track of the authentication token, passing it through to the raw API. Here's an example of one of the public methods in the WebApiService:

public IObservable<User> UpdateUserAsync(int id, UpdateUserRequest request) =>
    this
        .EnsureAuthenticatedAsync()
        .SelectMany(
            _ =>
                this
                    .rawWebApi
                    .UpdateUserAsync(this.TokenValue, id, request));

您可以在转发到原始Web API之前,先调用 EnsureAuthenticatedAsync ,然后使用 this.TokenValue 传递令牌.

As you can, it simply calls EnsureAuthenticatedAsync prior to forwarding onto the raw web API, passing in the token using this.TokenValue.

EnsureAuthenticatedAsync 方法如下所示:

public IObservable<Unit> EnsureAuthenticatedAsync() =>
    this
        .Token
        .FirstAsync()
        .SelectMany(token => string.IsNullOrWhiteSpace(token) ? this.authenticate : Observable.Return(Unit.Default));

我最初的问题是因为我尝试编写身份验证管道(上面的 this.authenticate ).请注意,这是用单个可观察对象替换整个 EnsureAuthenticatedAsync 方法的第一步.

My original question was spurred by my attempts to write the authentication pipeline (this.authenticate in the above). Note that this was the first step towards replacing the entire EnsureAuthenticatedAsync method with a single observable.

对于 authenticate ,我想要一个可观察到的内容:

For authenticate, I wanted an observable that:

  1. 什么都不会做,直到有人订阅(冷/懒)
  2. 即使一次有多个订阅者,它也只能工作一次
  3. 如果所有订阅者都断开连接,
  4. 会再次执行
  1. does nothing until someone subscribes (cold/lazy)
  2. only does its work once, even if there are multiple subscribers at once
  3. does its work again if all subscribers disconnect

为此,我想到了这样的东西:

To that end, I came up with something like this:

this.authenticate = Observable
    .Defer(() =>
        Observable
            .Create<Unit>(
                async (observer, cancellationToken) =>
                {
                    while (!cancellationToken.IsCancellationRequested)
                    {
                        var result = await this
                            .authenticationService
                            .AuthenticateAsync(this);

                        if (result.WasSuccessful)
                        {
                            observer.OnNext(Unit.Default);
                            observer.OnCompleted();

                            return;
                        }
                    }
                }))
    .Publish()
    .RefCount();

这里的想法是允许任意数量的同时调用 WebApiService 方法,以导致执行单个身份验证循环.一旦通过身份验证,所有订户将完成,任何将来的订户将意味着我们需要重新进行身份验证,从而重新执行延迟的可观察对象.

The idea here is to allow any number of simultaneous calls to WebApiService methods to result in a single authentication loop being executed. Once authenticated, all subscribers will complete and any future subscriber would mean we need to re-authenticate again and thus re-execute the deferred observable.

当然,以上可观察对象与我原始问题中的简化对象存在相同的问题:一旦延迟可观察对象完成一次, Publish 将立即完成任何将来的可观察对象(尽管延迟可观察对象为重新请求).

Of course, the above observable suffers from the same problem as the simplified one in my original question: once the deferred observable completes once, Publish will immediately complete any future observable (despite the deferred observable being re-requested).

如上所述,我的最终目标是用只在令牌为null时执行此身份验证的管道替换 EnsureAuthenticatedAsync .但这是第二步,而我却失败了:)

As hinted above, my ultimate goal would be to replace EnsureAuthenticatedAsync altogether with a pipeline that only executes this authentication when the token is null. But that was step two, and I failed at one :)

因此,让我们回到最初的问题:是否有一种方法可以编写一个管道,无论当前订阅者的数量如何,该管道都将执行一次,但是如果所有订阅者都断开连接并再次重新连接,则该管道会再次执行?

So to bring this back to the original question: is there a way of writing a pipeline that will execute once regardless of the number of current subscribers, but execute again if all subscribers disconnect and reconnect again?

推荐答案

可观察的序列不能完成一次以上.您要删除的是 OnCompleted 调用,以使 authenticate 不能多次完成,然后将 .Take(1)添加到 EnsureAuthenticatedAsync ,以便要进行 authenticate subscription 会在一个值之后完成.

Observable sequences can't complete more than once. What you want here is to remove the OnCompleted call so that authenticate can't complete more than once, and add .Take(1) to EnsureAuthenticatedAsync so that the subscription to authenticate will complete after one value.

以下是正在运行的控制台应用程序.用 obs 替换对 obs1 (具有 Take )的引用,以重现您的问题.在这两种情况下,您都可以快速按Enter键,以使所有四个订户获得相同的值.

Below is a working console app. Replace references to obs1 (which has Take) with obs to reproduce your problem. In both cases you can press enter quickly to have all four subscribers get the same value.

class Program
{
    static int value = 0;

    static void Main(string[] args)
    {
        var obs = Observable.Create<int>(observer =>
        {
            Console.WriteLine("Generating");

            Interlocked.Increment(ref value);

            return Observable.Return(value)
                .Delay(TimeSpan.FromSeconds(1))
                .Subscribe(observer);
        })
        .Publish() 
        .RefCount();

        var obs1 = obs.Take(1);

        obs1.Subscribe(
            i => Console.WriteLine("First {0}", i), 
            () => Console.WriteLine("First complete"));
        obs1.Subscribe(
            i => Console.WriteLine("Second {0}", i), 
            () => Console.WriteLine("Second complete"));

        Console.ReadLine();

        obs1.Subscribe(
            i => Console.WriteLine("Third {0}", i), 
            () => Console.WriteLine("Third complete"));
        obs1.Subscribe(
            i => Console.WriteLine("Fourth {0}", i), 
            () => Console.WriteLine("Fourth complete"));

        Console.WriteLine("Press enter to exit");
        Console.ReadLine();
    }
}

这篇关于为什么在所有初始订阅者都断开连接后,RefCount无法正常工作?(redux)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆