如何制作一个只能订阅一次的轻量级`Replay`运算符? [英] How to make a lightweight `Replay` operator that can be subscribed only once?

查看:38
本文介绍了如何制作一个只能订阅一次的轻量级`Replay`运算符?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在各种情况下,我都希望RxReplay操作符能够缓冲传入的通知,在第一次订阅时同步重放其缓冲区,并在第一次订阅之后停止缓冲。这个轻量级Replay运算符应该只能为一个订阅者提供服务。可以找到这样一个操作符的一个用例here,在第一次订阅之后继续缓冲只是浪费资源。出于演示目的,我将在这里展示一个我希望可以避免的有问题的行为的人为示例:

var observable = Observable
    .Interval(TimeSpan.FromMilliseconds(500))
    .SelectMany(x => Enumerable.Range((int)x * 100_000 + 1, 100_000))
    .Take(800_000)
    .Do(x =>
    {
        if (x % 100_000 == 0) Console.WriteLine(
            $"{DateTime.Now:HH:mm:ss.fff} > " +
            $"Produced: {x:#,0}, TotalMemory: {GC.GetTotalMemory(true):#,0} bytes");
    })
    .Replay()
    .AutoConnect(0);

await Task.Delay(2200);

Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > Subscribing...");

// First subscription
await observable.Do(x =>
{
    if (x % 100_000 == 0)
        Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > Emitted: {x:#,0}");
});

// Second subscription
Console.WriteLine($"Count: {await observable.Count():#,0}");

可观测对象总共生成800,000个值。Replay机制立即连接到源,并且在其完成之前订阅了一半。

输出:

16:54:19.893 > Produced: 100,000, TotalMemory: 635,784 bytes
16:54:20.341 > Produced: 200,000, TotalMemory: 1,164,376 bytes
16:54:20.840 > Produced: 300,000, TotalMemory: 2,212,992 bytes
16:54:21.354 > Produced: 400,000, TotalMemory: 2,212,992 bytes
16:54:21.543 > Subscribing...
16:54:21.616 > Emitted: 100,000
16:54:21.624 > Emitted: 200,000
16:54:21.633 > Emitted: 300,000
16:54:21.641 > Emitted: 400,000
16:54:21.895 > Produced: 500,000, TotalMemory: 4,313,344 bytes
16:54:21.897 > Emitted: 500,000
16:54:22.380 > Produced: 600,000, TotalMemory: 6,411,208 bytes
16:54:22.381 > Emitted: 600,000
16:54:22.868 > Produced: 700,000, TotalMemory: 6,411,600 bytes
16:54:22.869 > Emitted: 700,000
16:54:23.375 > Produced: 800,000, TotalMemory: 6,413,400 bytes
16:54:23.376 > Emitted: 800,000
Count: 800,000

订阅后内存使用量持续增长。这是意料之中的,因为所有值都被缓冲,并在重放的可观察对象的整个生命周期内保持缓冲。理想的行为是在订阅后内存使用量骤降。在传播缓冲值之后,应该丢弃该缓冲区,因为在订阅之后它就没有用处了。另外,第二个订阅(await observable.Count())应该失败,并显示InvalidOperationException。在可观察对象失去Replay功能后,我不想再次订阅它。

下面是我试图实现的自定义ReplayOnce操作符的存根。有谁知道如何实施它吗?

public static IConnectableObservable<T> ReplayOnce<T>(this IObservable<T> source)
{
    return source.Replay(); // TODO: enforce the subscribe-once policy
}
有一个相关的问题here,是关于如何使Replay运算符具有可按需偶尔清空的缓冲区的。我的问题不同,因为我希望在订阅后完全禁用缓冲区,并且不再开始增长。

推荐答案

我想出了ReplayOnce运算符的实现,它基于组播自定义ReplayOnceSubject<T>。此主题最初由ReplaySubject<T>支持,在第一次(且仅允许)订阅期间使用正常的Subject<T>进行切换:

public static IConnectableObservable<T> ReplayOnce<T>(
    this IObservable<T> source)
{
    return source.Multicast(new ReplayOnceSubject<T>());
}

private class ReplayOnceSubject<T> : ISubject<T>
{
    private readonly object _locker = new object();
    private ISubject<T> _subject = new ReplaySubject<T>();

    public void OnNext(T value)
    {
        lock (_locker) _subject.OnNext(value);
    }

    public void OnError(Exception error)
    {
        lock (_locker) _subject.OnError(error);
    }

    public void OnCompleted()
    {
        lock (_locker) _subject.OnCompleted();
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        lock (_locker)
        {
            if (_subject is ReplaySubject<T> replaySubject)
            {
                var subject = new Subject<T>();
                var subscription = subject.Subscribe(observer);
                // Now replay the buffered notifications
                replaySubject.Subscribe(subject).Dispose();
                _subject = subject;
                return subscription;
            }
            else
                throw new InvalidOperationException("Already subscribed.");
        }
    }
}
replaySubject.Subscribe(subject)行确保不仅将缓冲值传播到观察者,而且还将传播任何可能的OnError/OnCompleted通知。订阅后,ReplaySubject不再被引用,并且有资格进行垃圾回收。

这篇关于如何制作一个只能订阅一次的轻量级`Replay`运算符?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆