“缓冲直到安静"来自 Reactive 的行为? [英] "Buffer until quiet" behavior from Reactive?

查看:26
本文介绍了“缓冲直到安静"来自 Reactive 的行为?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的问题有点像 Nagle 算法 旨在解决的问题,但不完全是.我想要的是将来自 IObservableOnNext 通知缓冲到一系列 IObservable>像这样:

My problem is sort of like the one the Nagle algorithm was created to solve, but not exactly. What I'd like is to buffer the OnNext notifications from an IObservable<T> into a sequence of IObservable<IList<T>>s like so:

  1. 当第一个 T 通知到达时,将其添加到缓冲区并开始倒计时
  2. 如果另一个T通知在倒计时结束前到达,则将其添加到缓冲区并重新开始倒计时
  3. 一旦倒计时结束(即生产者已经沉默了一段时间),将所有缓冲的 T 通知作为单个聚合 IList 通知转发.
  4. 如果缓冲区大小在倒计时结束前增长超过某个最大值,则无论如何都要发送它.
  1. When the first T notification arrives, add it to a buffer and start a countdown
  2. If another T notification arrives before the countdown expires, add it to the buffer and restart the countdown
  3. Once the countdown expires (i.e. the producer has been silent for some length of time), forward all the buffered T notifications as a single aggregate IList<T> notification.
  4. If the buffer size grows beyond some maximum before the countdown expires, send it anyway.

IObservable>Buffer(this IObservable<T>, Timespan, int, IScheduler) 看起来很有希望,但它似乎定期发送聚合通知,而不是在第一个通知到达时启动计时器并在额外通知时重新启动它"一个到达"我想要的行为,如果没有从下面产生通知,它还会在每个时间窗口结束时发送一个空列表.

IObservable<IList<T>> Buffer(this IObservable<T>, Timespan, int, IScheduler) looked promising, but it appears to send aggregate notifications out at regular intervals rather than doing the "start the timer when the first notification arrives and restart it when additional ones arrive" behavior I'd like, and it also sends out an empty list at the end of each time window if no notifications have been produced from below.

不想删除任何T通知;只是缓冲它们.

I do not want to drop any of the T notifications; just buffer them.

这样的东西存在吗,还是我需要自己写?

Does anything like this exist, or do I need to write my own?

推荐答案

SO 上存在一些类似的问题,但并不完全是这样.这是一个可以解决问题的扩展方法.

Some similar questions exist on SO but not exactly like this. Here's an extension method that does the trick.

public static IObservable<IList<TSource>> BufferWithThrottle<TSource>
                                          (this IObservable<TSource> source,
                                           int maxAmount, TimeSpan threshold)
{
    return Observable.Create<IList<TSource>>((obs) =>
    {
        return source.GroupByUntil(_ => true,
                                   g => g.Throttle(threshold).Select(_ => Unit.Default)
                                         .Merge( g.Buffer(maxAmount).Select(_ => Unit.Default)))
                     .SelectMany(i => i.ToList())
                     .Subscribe(obs);
    });
}

这篇关于“缓冲直到安静"来自 Reactive 的行为?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆