添加最大值和最小值火花流JAVA? [英] Adding max and min in spark stream in JAVA?

查看:485
本文介绍了添加最大值和最小值火花流JAVA?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想它的元组的火花dstream..each增加最大和最小每个RDD。我写了下面code,但不知道如何传递参数的最小值和最大值。
任何人都可以提出一个办法做到这一点转型?
我试过如下:

  JavaPairDStream< Tuple2<长,整数>中Tuple3<为Integer,Long,龙>> sortedtsStream = transformedMaxMintsStream.transformToPair(新SORT2());类MINMAX实现了功能与LT; JavaPairRDD< Tuple2<长,整数>中整数>中JavaPairRDD< Tuple2<长,整数>中Tuple3<为Integer,Long,龙>>> {
    龙最大;
    龙敏;
    @覆盖
    公共JavaPairRDD< Tuple2<长,整数>中Tuple3<为Integer,Long,龙>>致电(JavaPairRDD< Tuple2<长,整数>中整数GT;输入)抛出异常{
        JavaPairRDD< Tuple2<长,整数GT;,&Tuple3 LT;为Integer,Long,龙>>输出;
        最大= input.max(新CMP1())._ 1._1;
        分= input.min(新CMP1())._ 1._1;
        输出= input.mapToPair(新maptoMinMax());
        返回输出;
    }
    类maptoMinMax实现PairFunction< Tuple2< Tuple2<长,整数>中整数>中Tuple2<长,整数>中Tuple3<为Integer,Long,龙>> {        @覆盖
        公共Tuple2< Tuple2<长,整数>中Tuple3<为Integer,Long,龙>>致电(Tuple2< Tuple2<长,整数>中整数GT; tuple2IntegerTuple2)抛出异常{
            返回新Tuple2< Tuple2<长,整数>中Tuple3<为Integer,Long,龙>>(新Tuple2<长,整数GT;(tuple2IntegerTuple2._1._1,tuple2IntegerTuple2._1._2),新Tuple3<为Integer,Long,朗GT&(tuple2IntegerTuple2._2,​​最大值,最小值));
        }
    }
}

我收到以下错误:从本质上讲好像分钟和JavaPairRDD最大的功能均未发现

  15/06/18 11点05分06秒INFO BlockManagerInfo:增加投入0-1434639906000内存在localhost:42829(尺寸:464.0 KB,自由:264.9 MB)
15/06/18 11点05分06秒INFO BlockGenerator:推块输入0-1434639906000
异常螺纹JobGeneratorjava.lang.NoSuchMethodError:org.apache.spark.api.java.JavaPairRDD.max(Ljava / UTIL /比较器;)Lscala / Tuple2;
        在org.necla.ngla.spark_streaming.MinMax.call(Type4ViolationChecker.java:346)
        在org.necla.ngla.spark_streaming.MinMax.call(Type4ViolationChecker.java:340)
        在org.apache.spark.streaming.api.java.JavaDStreamLike$class.scalaTransform$3(JavaDStreamLike.scala:360)
        在org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:361)
        在org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:361)
        在org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
        在org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
        在org.apache.spark.streaming.dstream.DStream $$ anonfun $ $改造2 $$ anonfun $ 5.apply(DStream.scala:668)
        在org.apache.spark.streaming.dstream.DStream $$ anonfun $ $改造2 $$ anonfun $ 5.apply(DStream.scala:666)
        在org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:41)
        在org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
        在org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
        在scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        在org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
        在org.apache.spark.streaming.dstream.DStream $$ anonfun $ getOrCompute $ 1 $$ anonfun $ 1.适用(DStrea


解决方案

我们可以使用 rdd.transform 申请在同一RDD几个操作来我们的结果每批间隔。我们将这个结果添加到每个元组(按问题SPEC)

  data.transform {RDD = GT;
     VAL MX = rdd.map(X =>(X,X))。减少{壳体((X1,X2),(Y1,Y2))=> ((X1分钟Y1),(X2最大值Y2))}
     rdd.map(ELEM =>(ELEM,MX))
}

这会产生像RDD每个块间隔(1至999之间包括随机数):


  

(258,(0998))(591,(0998))...


Java版本是语义上相同但相当冗长,由于所有元组LT; ...>对象

I am trying to add max and min to each RDD in a spark dstream..each of it's tuple. I wrote the following code, but can't understand how to pass the parameter min and max. Can anyone suggest a way to do this transformation? I tried the following:

JavaPairDStream<Tuple2<Long, Integer>, Tuple3<Integer,Long,Long>> sortedtsStream = transformedMaxMintsStream.transformToPair(new Sort2());

class MinMax implements Function<JavaPairRDD<Tuple2<Long, Integer>, Integer>, JavaPairRDD<Tuple2<Long, Integer>, Tuple3<Integer, Long, Long>>>{
    Long max;
    Long min;
    @Override
    public JavaPairRDD<Tuple2<Long, Integer>, Tuple3<Integer, Long, Long>> call(JavaPairRDD<Tuple2<Long, Integer>, Integer> input) throws Exception {
        JavaPairRDD<Tuple2<Long,Integer>,Tuple3<Integer,Long,Long>> output;
        max = input.max(new CMP1())._1._1;
        min = input.min(new CMP1())._1._1;
        output = input.mapToPair(new maptoMinMax());
        return output   ;
    }
    class maptoMinMax implements PairFunction<Tuple2<Tuple2<Long, Integer>, Integer>, Tuple2<Long, Integer>, Tuple3<Integer, Long, Long>> {

        @Override
        public Tuple2<Tuple2<Long, Integer>, Tuple3<Integer, Long, Long>> call(Tuple2<Tuple2<Long, Integer>, Integer> tuple2IntegerTuple2) throws Exception {
            return new Tuple2<Tuple2<Long, Integer>, Tuple3<Integer, Long, Long>>(new Tuple2<Long, Integer>(tuple2IntegerTuple2._1._1,tuple2IntegerTuple2._1._2), new Tuple3<Integer, Long, Long>(tuple2IntegerTuple2._2, max,min));
        }
    }
}

I get the following error: Essentially seems like min and max functions for JavaPairRDD were not found

15/06/18 11:05:06 INFO BlockManagerInfo: Added input-0-1434639906000 in memory on localhost:42829 (size: 464.0 KB, free: 264.9 MB)
15/06/18 11:05:06 INFO BlockGenerator: Pushed block input-0-1434639906000
Exception in thread "JobGenerator" java.lang.NoSuchMethodError: org.apache.spark.api.java.JavaPairRDD.max(Ljava/util/Comparator;)Lscala/Tuple2;
        at org.necla.ngla.spark_streaming.MinMax.call(Type4ViolationChecker.java:346)
        at org.necla.ngla.spark_streaming.MinMax.call(Type4ViolationChecker.java:340)
        at org.apache.spark.streaming.api.java.JavaDStreamLike$class.scalaTransform$3(JavaDStreamLike.scala:360)
        at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:361)
        at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:361)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:668)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:666)
        at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:41)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStrea

解决方案

We can use rdd.transform to apply several operations on the same RDD to come to our result for each batch interval. We will add this result to each tuple (as per question spec)

data.transform{rdd => 
     val mx = rdd.map(x=> (x,x)).reduce{case ((x1,x2),(y1,y2)) => ((x1 min y1), (x2 max y2))}
     rdd.map(elem => (elem,mx))                              
}

This produces an RDD each block interval like (random numbers between 1 and 999 incl):

(258,(0,998)) (591,(0,998)) ...

Java version is semantically identical but quite more verbose due to all those Tuple<...> objects.

这篇关于添加最大值和最小值火花流JAVA?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆