RxJava 通过任意值去抖动 [英] RxJava debounce by arbitrary value

查看:82
本文介绍了RxJava 通过任意值去抖动的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

仍在弄清楚如何正确使用不同的 Rx* 运算符并偶然发现以下问题:

Still figuring out a proper use of different Rx* operators and stumbled upon the following problem:

我有以下类型的模型集合:

I have a collection of models of the following type:

class Model {

    final long timestamp;
    final Object data;

    public Model(long timestamp, Object data) {
        this.timestamp = timestamp;
        this.data = data;
    }
}

此集合按升序排序(按时间戳排序).

This collection is sorted in ascending order (sorted by timestamp).

我的目标 - 是按序列"对它们进行分组.序列" - 是元素序列,其中每个元素都非常接近其邻居:

My goal - is to group them by "sequences". "Sequence" - is sequence of elements where each element is really close to its neighbor:

----A-B-C-----D-E-F---H-I--->

在这种情况下,我有 3 个序列".轴上的位置由模型的 timestamp 属性(不是发射时间)定义.形成序列的最大距离应该是可配置的.

In this case I have 3 "sequences". Position on the axis is defined by Model's timestamp property (not the emission time). Max distance to form a sequence should be configurable.

或者让我们举一个更真实的例子:

Or let's take more real example:

List<Model> models = new ArrayList<Model>(10) {{
    add(new Model(0, null));
    add(new Model(5, null));
    add(new Model(10, null));
    add(new Model(100, null));
    add(new Model(108, null));
    add(new Model(111, null));
    add(new Model(115, null));
    add(new Model(200, null));
    add(new Model(201, null));
    add(new Model(202, null));
}};

在这种情况下,最大距离为 10 毫秒,我会得到 3 个序列 - (0,5,10) , (100,108,111,115) , (200,201,202)

In this case for max distance 10ms, I would get 3 sequences - (0,5,10) , (100,108,111,115) , (200,201,202)

这个逻辑与debounce操作符非常相似.但不是实时去抖动,我需要通过一些自定义属性去抖动.

This logic is really similar to debounce operator. But instead of debouncing by real time, I need to debounce by some custom property.

如果时间戳代表发射时间,我会这样做:

This is how I would do that if timestamp would represent the emission time:

List<Model> models = new ArrayList<Model>(10) {{
    add(new Model(0, null));
    add(new Model(5, null));
    add(new Model(10, null));
    add(new Model(100, null));
    add(new Model(108, null));
    add(new Model(111, null));
    add(new Model(115, null));
    add(new Model(200, null));
    add(new Model(201, null));
    add(new Model(202, null));
}};

Observable<Model> modelsObservable = Observable.from(models).share();

modelsObservable.buffer(modelsObservable.debounce(10, TimeUnit.MILLISECONDS))
        .subscribe(group -> {
            //this is one of my groups
        });

它不一定需要去抖动 - 我也在查看 groupBy 运算符,但我无法找出正确的分组标准..

It is not necessarily needs to be a debounce - I was also looking at groupBy operator, but I couldn't figure out the proper grouping criteria..

推荐答案

我不会摆弄调度程序,而是利用 Buffer/Window(取决于您是否需要下游 observables 或集合)和 Scan.

I wouldn't fiddle with the schedulers, but leverage Buffer/Window (depending if you need downstream observables or collections) and Scan.

在 Rx.Net 中,您可以通过以下方式实现:

In Rx.Net you can achieve it with:

        var models = new[] { 0, 5, 10, 100, 108, 111, 115, 200, 201, 202 }
            .ToObservable();

        var enrichedModels = models.Scan(
            new { Current = -1, Prev = -1 },
            (acc, cur) => new { Current = cur, Prev = acc.Current })
            .Skip(1).Publish();

        enrichedModels.Buffer(() => enrichedModels.SkipWhile(em => em.Current < em.Prev + 10))
            .Select(seq => seq.Select(em => em.Prev))
            .Subscribe(seq =>
            {
                Console.WriteLine(String.Join(",", seq));
            });

        enrichedModels.Connect();

结果:

0,5,10
100,108,111,115
200,201

如果您的源 observable 很热,可能可以跳过发布/连接.rx-java 拥有相同的操作符,但不是匿名类型,我想它们可以被元组或具体类替换.

Publish/Connect can probably be skipped if your source observable is hot. rx-java possesses the same operators, but not the anonymous types, I guess they can be replaced either by tuple or a concrete class.

这篇关于RxJava 通过任意值去抖动的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆