在星火Java的移动平均线 [英] Moving Average in Spark Java

查看:852
本文介绍了在星火Java的移动平均线的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的实时数据流进入的火花,我想这样做在该时间序列数据移动平均预测。有什么办法来实现这个使用的火花在Java中?

I have real time streaming data coming into spark and I would like to do a moving average forecasting on that time-series data. Is there any way to implement this using spark in Java?

我已经提到: https://gist.github.com/samklr/27411098f04fc46dcd05 / 的修订

阿帕奇星火移动平均
但是这两个codeS都写在斯卡拉。因为我不熟悉的斯卡拉,我无法判断我是否会觉得有用,甚至code转换成Java。
是否有星火Java的任何直接执行预测?

I've already referred to : https://gist.github.com/samklr/27411098f04fc46dcd05/revisions and Apache Spark Moving Average but both these codes are written in Scala. Since I'm not familiar with Scala, I'm not able to judge if I'll find it useful or even convert the code to Java. Is there any direct implementation of forecasting in Spark Java?

推荐答案

我把你指的问题,并以翻译斯卡拉code到Java挣扎了几个小时:

I took the question you were referring and struggled for a couple of hours in order to translate the Scala code into Java:

// Read a file containing the Stock Quotations
// You can also paralelize a collection of objects to create a RDD
JavaRDD<String> linesRDD = sc.textFile("some sample file containing stock prices");

// Convert the lines into our business objects
JavaRDD<StockQuotation> quotationsRDD = linesRDD.flatMap(new ConvertLineToStockQuotation());

// We need these two objects in order to use the MLLib RDDFunctions object
ClassTag<StockQuotation> classTag = scala.reflect.ClassManifestFactory.fromClass(StockQuotation.class);
RDD<StockQuotation> rdd = JavaRDD.toRDD(quotationsRDD);

// Instantiate a RDDFunctions object to work with
RDDFunctions<StockQuotation> rddFs = RDDFunctions.fromRDD(rdd, classTag);

// This applies the sliding function and return the (DATE,SMA) tuple
JavaPairRDD<Date, Double> smaPerDate =     rddFs.sliding(slidingWindow).toJavaRDD().mapToPair(new MovingAvgByDateFunction());
List<Tuple2<Date, Double>> smaPerDateList = smaPerDate.collect();

然后,你必须使用新的功能类做的每一个数据窗口的实际计算:

Then you have to use a new Function Class to do the actual calculation of each data window:

public class MovingAvgByDateFunction implements PairFunction<Object,Date,Double> {

/**
 * 
 */
private static final long serialVersionUID = 9220435667459839141L;

@Override
public Tuple2<Date, Double> call(Object t) throws Exception {

    StockQuotation[] stocks = (StockQuotation[]) t;
    List<StockQuotation> stockList = Arrays.asList(stocks);

    Double result = stockList.stream().collect(Collectors.summingDouble(new ToDoubleFunction<StockQuotation>() {

        @Override
        public double applyAsDouble(StockQuotation value) {
            return value.getValue();
        }
    }));

    result = result / stockList.size();

    return new Tuple2<Date, Double>(stockList.get(0).getTimestamp(),result);
}
}

如果你想在这个更详细,我写了简单的移动平均线的位置:
https://t.co/gmWltdANd3

If you want more detail on this, I wrote about Simple Moving Averages here: https://t.co/gmWltdANd3

这篇关于在星火Java的移动平均线的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆