RxJava 通过任意值去抖动 [英] RxJava debounce by arbitrary value
问题描述
仍在弄清楚如何正确使用不同的 Rx* 运算符并偶然发现以下问题:
Still figuring out a proper use of different Rx* operators and stumbled upon the following problem:
我有以下类型的模型集合:
I have a collection of models of the following type:
class Model {
final long timestamp;
final Object data;
public Model(long timestamp, Object data) {
this.timestamp = timestamp;
this.data = data;
}
}
此集合按升序排序(按时间戳排序).
This collection is sorted in ascending order (sorted by timestamp).
我的目标 - 是按序列"对它们进行分组.序列" - 是元素序列,其中每个元素都非常接近其邻居:
My goal - is to group them by "sequences". "Sequence" - is sequence of elements where each element is really close to its neighbor:
----A-B-C-----D-E-F---H-I--->
在这种情况下,我有 3 个序列".轴上的位置由模型的 timestamp
属性(不是发射时间)定义.形成序列的最大距离应该是可配置的.
In this case I have 3 "sequences". Position on the axis is defined by Model's timestamp
property (not the emission time). Max distance to form a sequence should be configurable.
或者让我们举一个更真实的例子:
Or let's take more real example:
List<Model> models = new ArrayList<Model>(10) {{
add(new Model(0, null));
add(new Model(5, null));
add(new Model(10, null));
add(new Model(100, null));
add(new Model(108, null));
add(new Model(111, null));
add(new Model(115, null));
add(new Model(200, null));
add(new Model(201, null));
add(new Model(202, null));
}};
在这种情况下,最大距离为 10 毫秒,我会得到 3 个序列 - (0,5,10) , (100,108,111,115) , (200,201,202)
In this case for max distance 10ms, I would get 3 sequences - (0,5,10) , (100,108,111,115) , (200,201,202)
这个逻辑与debounce
操作符非常相似.但不是实时去抖动,我需要通过一些自定义属性去抖动.
This logic is really similar to debounce
operator. But instead of debouncing by real time, I need to debounce by some custom property.
如果时间戳代表发射时间,我会这样做:
This is how I would do that if timestamp would represent the emission time:
List<Model> models = new ArrayList<Model>(10) {{
add(new Model(0, null));
add(new Model(5, null));
add(new Model(10, null));
add(new Model(100, null));
add(new Model(108, null));
add(new Model(111, null));
add(new Model(115, null));
add(new Model(200, null));
add(new Model(201, null));
add(new Model(202, null));
}};
Observable<Model> modelsObservable = Observable.from(models).share();
modelsObservable.buffer(modelsObservable.debounce(10, TimeUnit.MILLISECONDS))
.subscribe(group -> {
//this is one of my groups
});
它不一定需要去抖动 - 我也在查看 groupBy
运算符,但我无法找出正确的分组标准..
It is not necessarily needs to be a debounce - I was also looking at groupBy
operator, but I couldn't figure out the proper grouping criteria..
推荐答案
我不会摆弄调度程序,而是利用 Buffer/Window(取决于您是否需要下游 observables 或集合)和 Scan.
I wouldn't fiddle with the schedulers, but leverage Buffer/Window (depending if you need downstream observables or collections) and Scan.
在 Rx.Net 中,您可以通过以下方式实现:
In Rx.Net you can achieve it with:
var models = new[] { 0, 5, 10, 100, 108, 111, 115, 200, 201, 202 }
.ToObservable();
var enrichedModels = models.Scan(
new { Current = -1, Prev = -1 },
(acc, cur) => new { Current = cur, Prev = acc.Current })
.Skip(1).Publish();
enrichedModels.Buffer(() => enrichedModels.SkipWhile(em => em.Current < em.Prev + 10))
.Select(seq => seq.Select(em => em.Prev))
.Subscribe(seq =>
{
Console.WriteLine(String.Join(",", seq));
});
enrichedModels.Connect();
结果:
0,5,10
100,108,111,115
200,201
如果您的源 observable 很热,可能可以跳过发布/连接.rx-java 拥有相同的操作符,但不是匿名类型,我想它们可以被元组或具体类替换.
Publish/Connect can probably be skipped if your source observable is hot. rx-java possesses the same operators, but not the anonymous types, I guess they can be replaced either by tuple or a concrete class.
这篇关于RxJava 通过任意值去抖动的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!