在前一个元素匹配条件之后获取流的第一个元素 [英] Take first elements of stream after previous element matches condition

查看:52
本文介绍了在前一个元素匹配条件之后获取流的第一个元素的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是反应性扩展(rx)的新手,并尝试在.NET中执行以下操作(但在JS和其他语言中应相同):

I'm new to reactive extensions (rx) and try to do the following thing in .NET (but should be the same in JS and other languages.):

我有一个带有包含字符串和bool属性的对象的流.流将是无限的.我具有以下条件:

I have a stream incoming with objects containing a string and a bool property. The stream would be infinite. I have the following conditions:

  • 应该始终打印第一个对象.
  • 现在应跳过所有传入的对象,直到将bool属性设置为"true"的对象到达为止.
  • 当一个对象的bool属性设置为"true"时,应跳过该对象,但应打印下一个对象(无论这些属性是什么).
  • 现在这种方式继续进行,应该打印出属性设置为true的对象之后的每个对象.

示例:

("one", false)--("two", true)--("three", false)--("four", false)--("five", true)--("six", true)--("seven", true)--("eight", false)--("nine", true)--("ten", false)

预期结果:

"one"--"three"--"six"--"seven"--"eight"--"ten"

请注意,已经打印了六个"和七个",因为它们跟随属性设置为true的对象,即使它们自己的属性也设置为"true".

Beware that "six" and "seven" have been printed because they follow an object with the property set to true, even if their own property is also set to "true".

一个简单的.NET程序对其进行测试:

Simple .NET program to test it:

using System;
using System.Reactive.Linq;
using System.Threading;

namespace ConsoleApp1
{
    class Program
    {
        class Result
        {
            public bool Flag { get; set; }
            public string Text { get; set; }
        }

        static void Main(string[] args)
        {               
            var source =
               Observable.Create<Result>(f =>
               {
                   f.OnNext(new Result() { Text = "one", Flag = false });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "two", Flag = true });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "three", Flag = false });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "four", Flag = false });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "five", Flag = true });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "six", Flag = true });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "seven", Flag = true });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "eight", Flag = false });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "nine", Flag = true });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "ten", Flag = false });

                   return () => Console.WriteLine("Observer has unsubscribed");
               });
        }
    }
}

我尝试使用 .Scan .Buffer 扩展名,但我不知道在我的方案中如何使用它们.

I tried to use the .Scan and .Buffer extensions but I don't know how exactly to use them in my scenario.

性能当然应该尽可能的好,因为最终流将是无限的.

Performance of course should be as good as possible, cause in the end the stream would be infinite.

推荐答案

尝试以下方法:

var results = new[]
{
    new Result() { Text = "one", Flag = false },
    new Result() { Text = "two", Flag = true },
    new Result() { Text = "three", Flag = false },
    new Result() { Text = "four", Flag = false },
    new Result() { Text = "five", Flag = true },
    new Result() { Text = "six", Flag = true },
    new Result() { Text = "seven", Flag = true },
    new Result() { Text = "eight", Flag = false },
    new Result() { Text = "nine", Flag = true },
    new Result() { Text = "ten", Flag = false },
};

IObservable<Result> source =
    Observable
        .Generate(
            0, x => x < results.Length, x => x + 1,
            x => results[x],
            x => TimeSpan.FromSeconds(1.0));

与您的 Observable.Create< Result> 方法相比,以上内容只是以一种更加惯用的方式生成了 source .

The above just produces the source in a more idiomatic way than your Observable.Create<Result> approach.

现在是查询:

IObservable<Result> query =
    source
        .StartWith(new Result() { Flag = true })
        .Publish(ss =>
            ss
                .Skip(1)
                .Zip(ss, (s1, s0) =>
                    s0.Flag
                    ? Observable.Return(s1) 
                    : Observable.Empty<Result>())
                .Merge());

此处使用 .Publish 允许可观察的源仅具有一个订阅,但是可以在 .Publish 方法中多次使用该订阅.然后可以使用标准的 Skip(1).Zip 方法来检查随后产生的值.

The use of .Publish here allows the source observable to have only one subscription, but for it to be used multiple times within the .Publish method. Then the standard Skip(1).Zip approach can be used to inspect the subsequent values being produced.

这是输出:

从Shlomo获得灵感之后,这是我使用 .Buffer(2,1)的方法:

After inspiration from Shlomo, here's my approach using .Buffer(2, 1):

IObservable<Result> query2 =
    source
        .StartWith(new Result() { Flag = true })
        .Buffer(2, 1)
        .Where(rs => rs.First().Flag)
        .SelectMany(rs => rs.Skip(1));

这篇关于在前一个元素匹配条件之后获取流的第一个元素的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆