如何重复一个可观察到的序列,直到它为空? [英] How to repeat an observable sequence until it's empty?

查看:0
本文介绍了如何重复一个可观察到的序列,直到它为空?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个IObservable<int>序列,它在前9次订阅时发出单个项目,在进一步订阅时不发出任何内容并立即完成:

int counter = 0;
IObservable<int> source = Observable.Defer(() =>
{
    if (++counter < 10)
        return Observable.Return(counter).Delay(TimeSpan.FromMilliseconds(100));
    else
        return Observable.Empty<int>();
});
现在我想重复这个序列,直到它完成为止。所以我使用了Repeat运算符:

source
    .Repeat()
    .Do(x => Console.WriteLine(x), () => Console.WriteLine("Completed"))
    .Wait();

问题是该查询永远不会完成。Repeat将一次又一次地订阅source序列。更糟糕的是,当source停止生成元素时,查询进入一个无情的死循环,劫持了CPU的一个核心(我的四核机器报告连续的CPU利用率为25%)。以下是上述代码的输出:

1
2
3
4
5
6
7
8
9
我想要的是Repeat运算符的一个变体,它在source停止生成元素时停止重复source。通过搜索内置的Rx运算符,我可以看到RepeatWhen运算符,但显然这只能用于更快地启动下一次重复,而不是完全停止重复:

// Repeatedly resubscribes to the source observable after a normal completion and
// when the observable returned by a handler produces an arbitrary item.
public static IObservable<TSource> RepeatWhen<TSource, TSignal>(
    this IObservable<TSource> source,
    Func<IObservable<object>, IObservable<TSignal>> handler);

我不能100%确定,因为handler参数的描述相当模糊,所以我可能遗漏了一些东西:

为每个观察者调用的函数,并获取可观察序列对象。它应该返回任意项的可观测项,响应于从源可观测项接收到完成信号,应该用信号通知该任意项。如果此可观察到的信号为终端事件,则序列将改为使用该信号终止。

我的问题是:如何实现重复source序列直到它为空的RepeatUntilEmpty运算符?是否可以基于上述RepeatWhen运算符实现?如果不是,我是否应该从低级(Observable.Create)开始重新实现基本的Repeat功能?或者,我可以使用Materialize运算符,以某种方式将其与现有的Repeat结合使用吗?我现在没有主意了。我愿意接受任何一种解决方案,无论是高水平还是低水平。

public static IObservable<T> RepeatUntilEmpty<T>(this IObservable<T> source)
{
    // What to do?
}

将我的原始代码中的Repeat替换为RepeatUntilEmpty,应该可以在发出9元素后立即完成查询。

推荐答案

您可以使用Materialize()/Dematerialize()根据从Repeat()语句收到的通知构建您自己的notifications序列。通知序列如下所示:

1C 2C 3C 4C 5C 6C 7C 8C 9C C C C ...

因此,我们寻找两个连续的OnCompleted通知。如果没有找到,我们仍然返回收到的OnNext通知,否则返回OnCompleted通知。代码可能如下所示:

public static void Main(string[] args)
{
    int counter = 0;
    IObservable<int> source = Observable.Defer(() =>
    {
        Console.WriteLine($"counter is now: {counter}");
        if (counter > 20) {
            System.Environment.Exit(1);
        }
        if (++counter < 10)
            return Observable.Return(counter).Delay(TimeSpan.FromMilliseconds(100));
        else
            return Observable.Empty<int>();
    });

    source
        .RepeatUntilEmpty()
        .Subscribe(x => {

                System.Threading.Thread.Sleep(10);
                Console.WriteLine($"SUBSCRIBE: {x}");
            }, () => Console.WriteLine("SUBSCRIBE:Completed"));

    System.Threading.Thread.Sleep(10000);
    Console.WriteLine("Main thread terminated");
}

RepeatUntilEmpty()方法如下:

public static IObservable<T> RepeatUntilEmpty<T>(this IObservable<T> source)
{
    return source
        .Materialize()
        .Repeat()
        .StartWith((Notification<T>)null)
        .Buffer(2, 1)
        .Select(it => {
            Console.WriteLine($"Buffer content: {String.Join(",", it)}");
            if (it[1].Kind != System.Reactive.NotificationKind.OnCompleted) {
                return it[1];
            }
            // it[1] is OnCompleted, check the previous one
            if (it[0] != null && it[0].Kind != System.Reactive.NotificationKind.OnCompleted) {
                // not a consecutive OnCompleted, so we ignore this OnCompleted with a NULL marker
                return null;
            }

            // okay, we have two consecutive OnCompleted, stop this observable.
            return it[1];
        })
        .Where(it => it != null) // remove the NULL marker
        .Dematerialize();
}

这将生成以下输出:

counter is now: 0
Buffer content: ,OnNext(1)
SUBSCRIBE: 1
Buffer content: OnNext(1),OnCompleted()
counter is now: 1
Buffer content: OnCompleted(),OnNext(2)
SUBSCRIBE: 2
Buffer content: OnNext(2),OnCompleted()
counter is now: 2
Buffer content: OnCompleted(),OnNext(3)
SUBSCRIBE: 3
Buffer content: OnNext(3),OnCompleted()
counter is now: 3
Buffer content: OnCompleted(),OnNext(4)
SUBSCRIBE: 4
Buffer content: OnNext(4),OnCompleted()
counter is now: 4
Buffer content: OnCompleted(),OnNext(5)
SUBSCRIBE: 5
Buffer content: OnNext(5),OnCompleted()
counter is now: 5
Buffer content: OnCompleted(),OnNext(6)
SUBSCRIBE: 6
Buffer content: OnNext(6),OnCompleted()
counter is now: 6
Buffer content: OnCompleted(),OnNext(7)
SUBSCRIBE: 7
Buffer content: OnNext(7),OnCompleted()
counter is now: 7
Buffer content: OnCompleted(),OnNext(8)
SUBSCRIBE: 8
Buffer content: OnNext(8),OnCompleted()
counter is now: 8
Buffer content: OnCompleted(),OnNext(9)
SUBSCRIBE: 9
Buffer content: OnNext(9),OnCompleted()
counter is now: 9
Buffer content: OnCompleted(),OnCompleted()
SUBSCRIBE:Completed
Main thread terminated
我还没有测试该代码如何处理OnError()通知,因此您可能需要检查一下。此外,我还遇到了一些问题,即source.Materialize().Repeat()部分将从原始源读取更多数据,尽管它后来决定停止可观测数据。特别是使用Do().Wait()语句时,我有时会收到其他输出,如:

counter is now: 9
Buffer content: OnCompleted(),OnCompleted()
SUBSCRIBE: Completed
counter is now: 10
counter is now: 11
counter is now: 12
counter is now: 13
counter is now: 14

这可能也是您的问题,因为Repeat()部分仍在尝试读取/连接空的可观察对象。

这篇关于如何重复一个可观察到的序列,直到它为空?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆