如何在 RxJava 中计算移动平均值 [英] how to calculate moving average in RxJava
问题描述
在金融领域,我们通常需要从时间序列数据流中计算移动窗口聚合值,以移动平均为例,假设我们有以下数据流(T是时间戳,V是实际vlaue):
In finance domain, we usually need to calculate the moving-window aggregate value from a stream of time series data, use moving average as an example, say we have the following data stream(T is time stamp and V is the actual vlaue):
[T0,V0],[T1,V1],[T2,V2],[T3,V3],[T4,V4],[T5,V5],[T6,V6],[T7,V7],[T8,V8],[T9,V9],[T10,1V0],......
从我们得到的流中计算移动平均值 3:
to calculate a moving average 3 from the stream we get:
avg([T0,V0],[T1,V1],[T2,V2]),
avg([T1,V1],[T2,V2],[T3,V3]),
avg([T2,V2],[T3,V3],[T4,V4]),
avg([T3,V3],[T4,V4],[T5,V5]),
avg([T4,V4],[T5,V5],[T6,V6]),...
要计算移动平均线,我们似乎可以这样做:
To calculate the moving average, it seems like we could do it by :
- 从原始流构建一个 Observable
- 通过将值聚合到组中,从原始流构建一个 Observable
- 使用聚合运算符计算步骤 2 中 Observable 的最终结果.
第 1 步和第 3 步实现起来很简单,但是,对于第 2 步,当前的 RxJava 似乎没有内置运算符来生成移动窗口组,window/groupBy 运算符似乎不适合这种情况,我没有找到一种简单的方法来组合现有运算符的解决方案,有人可以建议如何以优雅"的方式在 RxJava 中做到这一点吗?
Step 1 and 3 is trivial to implement, however, for step 2 it seems like current RxJava do not have build-in operator to produce moving-windows groups, the window/groupBy operator seems not fit in this case, and I did not find a easy way to compose a solution from existing operators, can any one suggest how to do this in RxJava in a "elegantly" fashion?
推荐答案
RxJava version: 0.15.1
RxJava version: 0.15.1
import java.util.List;
import rx.Observable;
import rx.util.functions.Action1;
class Bar {
public static void main(String args[]) {
Integer arr[] = {1, 2, 3, 4, 5, 6}; // N = 6
Observable<Integer> oi = Observable.from(arr);
// 1.- bundle 3, skip 1
oi.buffer(3, 1)
/**
* 2.- take only the first X bundles
* When bundle 3, X = N - 2 => 4
* When bundle 4, X = N - 3 => 3
* When bundle a, X = N - (a-1)
*/
.take(4)
// 3.- calculate average
.subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> lst) {
int sum = 0;
for(int i = 0; i < lst.size(); i++) {
sum += lst.get(i);
}
System.out.println("MA(3) " + lst +
" => " + sum / lst.size());
}
});
}
}
示例输出:
MA(3) [1, 2, 3] =>2
MA(3) [1, 2, 3] => 2
MA(3) [2, 3, 4] =>3
MA(3) [2, 3, 4] => 3
MA(3) [3, 4, 5] =>4
MA(3) [3, 4, 5] => 4
MA(3) [4, 5, 6] =>5
MA(3) [4, 5, 6] => 5
这篇关于如何在 RxJava 中计算移动平均值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!