我怎么能交替地缓冲和接收流的实时数据流 [英] How can I alternately buffer and flow a live data stream in Rx

查看:82
本文介绍了我怎么能交替地缓冲和接收流的实时数据流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个流。一个是数据的流动(可以是任何类型),另一种是一个布尔流充当栅极。我需要将这些组合成具有以下行为流:


  • 当门打开(最近的值是真实的),那么该数据
    应直通流动

  • 当门关闭(最近的值是假),那么数据
    应缓冲以释放作为单个元件时
    门的下一个开放

  • 解决方案应preserve数据和preserve的所有元素
    为了

我真的不知道如何把这个在一起。我一直在测试的输入是这样的:

  //发出每秒的演示数据流
变种数据流= Observable.Interval(TimeSpan.FromSeconds(1));//即每5秒钟切换演示标志流
无功切换= FALSE;
变种gateStream = Observable.Interval(TimeSpan.FromSeconds(5))
    。选择(_ =>!=切换切换);


解决方案

我会做如下:


  • 窗口中使用门流作为闭幕选择数据流

  • 我们可以使用 DistinctUntilChanged 在栅流,以确保没有重复值

  • 我们也将迫使栅极流以开始闭合(假) - 它不会影响输出,并允许一个巧妙的方法

  • 然后使用选择,让每个元素的索引号的过载。有了这个,我们可以告诉如果我们需要缓冲区,或者只是发出窗口就是,因为我们知道,偶数窗口是缓冲(因为我们确信闸门流假开始)

  • 我们可以使用了ToList(),直到它关闭缓冲每个连窗 - 这是实际上相当于一个缓冲器()的等待,直到 OnCompleted

  • 我们使用一个身份的SelectMany 扁平化缓冲窗口

  • 最后,我们并置窗,以保证顺序是preserved

它看起来是这样的:

  dataStream.Window(gateStream.StartWith(假).DistinctUntilChanged())
          。选择((W,I)= I标记%2 == 0 w.ToList()的SelectMany(X =&X的催化剂):。w)的
          .Concat()
          .Subscribe(Console.WriteLine);

I have two streams. One is a flow of data (could be any type), the other is a boolean stream acting as a gate. I need to combine these into a stream that has the following behaviour:

  • When the gate is open (most recent value was true) then the data should flow straight through
  • When the gate is closed (most recent value was false) then the data should be buffered to be released as individual elements when the gate is next open
  • The solution should preserve all elements of the data and preserve order

I am not really sure how to put this together. The inputs I have been testing with are like this:

// a demo data stream that emits every second
var dataStream = Observable.Interval(TimeSpan.FromSeconds(1));

// a demo flag stream that toggles every 5 seconds
var toggle = false;
var gateStream = Observable.Interval(TimeSpan.FromSeconds(5))
    .Select(_ => toggle = !toggle);

解决方案

I would do this as follows:

  • Window the data stream using the gate stream as a closing selector
  • We can use DistinctUntilChanged on the gate stream to ensure no repeated values
  • We will also force the gate stream to start closed (false) - it won't affect the output and allows a neat trick
  • Then use the overload of Select that gives each element an index number. With this we can tell if we need to buffer or just emit the window as is, because we know that even-numbered windows are for buffering (because we made sure the gate stream starts with false)
  • We can use ToList() to buffer each even window until it closes - this is the actually equivalent of a Buffer() that waits until OnCompleted
  • We use an identity SelectMany to flatten the buffered windows
  • Finally we concatenate the windows in order to guarantee order is preserved

It looks like this:

dataStream.Window(gateStream.StartWith(false).DistinctUntilChanged())
          .Select((w, i) =>  i % 2 == 0 ? w.ToList().SelectMany(x => x) : w)
          .Concat()
          .Subscribe(Console.WriteLine);

这篇关于我怎么能交替地缓冲和接收流的实时数据流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆