Spark java.lang.StackOverflowError [英] Spark java.lang.StackOverflowError

查看:353
本文介绍了Spark java.lang.StackOverflowError的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用spark来计算用户评论的pagerank,但是当我在大数据集上运行代码时,我一直收到Spark java.lang.StackOverflowError (40k条目)。当在少量条目上运行代码时它工作正常。

I'm using spark in order to calculate the pagerank of user reviews, but I keep getting Spark java.lang.StackOverflowError when I run my code on a big dataset (40k entries). when running the code on a small number of entries it works fine though.

条目示例:

product/productId: B00004CK40   review/userId: A39IIHQF18YGZA   review/profileName: C. A. M. Salas  review/helpfulness: 0/0 review/score: 4.0   review/time: 1175817600 review/summary: Reliable comedy review/text: Nice script, well acted comedy, and a young Nicolette Sheridan. Cusak is in top form.

代码:

public void calculatePageRank() {
    sc.clearCallSite();
    sc.clearJobGroup();

    JavaRDD < String > rddFileData = sc.textFile(inputFileName).cache();
    sc.setCheckpointDir("pagerankCheckpoint/");

    JavaRDD < String > rddMovieData = rddFileData.map(new Function < String, String > () {

        @Override
        public String call(String arg0) throws Exception {
            String[] data = arg0.split("\t");
            String movieId = data[0].split(":")[1].trim();
            String userId = data[1].split(":")[1].trim();
            return movieId + "\t" + userId;
        }
    });

    JavaPairRDD<String, Iterable<String>> rddPairReviewData = rddMovieData.mapToPair(new PairFunction < String, String, String > () {

        @Override
        public Tuple2 < String, String > call(String arg0) throws Exception {
            String[] data = arg0.split("\t");
            return new Tuple2 < String, String > (data[0], data[1]);
        }
    }).groupByKey().cache();


    JavaRDD<Iterable<String>> cartUsers = rddPairReviewData.map(f -> f._2());
      List<Iterable<String>> cartUsersList = cartUsers.collect();
      JavaPairRDD<String,String> finalCartesian = null;
      int iterCounter = 0;
      for(Iterable<String> out : cartUsersList){
          JavaRDD<String> currentUsersRDD = sc.parallelize(Lists.newArrayList(out));
          if(finalCartesian==null){
              finalCartesian = currentUsersRDD.cartesian(currentUsersRDD);
          }
          else{
              finalCartesian = currentUsersRDD.cartesian(currentUsersRDD).union(finalCartesian);
              if(iterCounter % 20 == 0) {
                  finalCartesian.checkpoint();
              }
          }
      }
      JavaRDD<Tuple2<String,String>> finalCartesianToTuple = finalCartesian.map(m -> new Tuple2<String,String>(m._1(),m._2()));

      finalCartesianToTuple = finalCartesianToTuple.filter(x -> x._1().compareTo(x._2())!=0);
      JavaPairRDD<String, String> userIdPairs = finalCartesianToTuple.mapToPair(m -> new Tuple2<String,String>(m._1(),m._2()));

      JavaRDD<String> userIdPairsString = userIdPairs.map(new Function < Tuple2<String, String>, String > () {

        //Tuple2<Tuple2<MovieId, userId>, Tuple2<movieId, userId>>
          @Override
          public String call (Tuple2<String, String> t) throws Exception {
            return t._1 + " " + t._2;
          }
      });

    try {

//calculate pagerank using this https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
        JavaPageRank.calculatePageRank(userIdPairsString, 100);
    } catch (Exception e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

    sc.close();

}


推荐答案

我有多个建议可以帮助您大大提高问题中代码的性能。

I have multiple suggestions which will help you to greatly improve the performance of the code in your question.


  1. 缓存:缓存应该用于那些你需要一次又一次地引用相同/不同操作的数据集(迭代算法。

  1. Caching: Caching should be used on those data sets which you need to refer to again and again for same/ different operations (iterative algorithms.




一个例子是RDD。 count - 告诉你
文件中的行数,需要读取文件。所以如果你写RDD。 count ,在
这一点读取文件,计算行数,并返回
计数。

An example is RDD.count — to tell you the number of lines in the file, the file needs to be read. So if you write RDD.count, at this point the file will be read, the lines will be counted, and the count will be returned.

如果你再次拨打RDD。 count 怎么办?同样的事情:文件将是
读取和计算那么RDD。缓存是做什么的?现在,如果你运行
RDD。 count 第一个时间,文件将被加载,缓存,并计算
。如果你再次调用RDD。 count ,操作将使用
缓存。它将从缓存中获取数据并计算
行,不计算重新计算。

What if you call RDD.count again? The same thing: the file will be read and counted again. So what does RDD.cache do? Now, if you run RDD.count the first time, the file will be loaded, cached, and counted. If you call RDD.count a second time, the operation will use the cache. It will just take the data from the cache and count the lines, no recomputing.

了解有关缓存的更多信息此处

Read more about caching here.

在您的代码示例中,您不会重复使用已缓存的任何内容。所以你可以从那里删除 .cache

In your code sample you are not reusing anything that you've cached. So you may remove the .cache from there.


  1. 并行化:在代码示例中,您已经并行化了RDD中已经是分布式集合的每个元素。我建议你合并 rddFileData rddMovieData rddPairReviewData 步骤,以便一次性发生。

  1. Parallelization: In the code sample, you've parallelized every individual element in your RDD which is already a distributed collection. I suggest you to merge the rddFileData, rddMovieData and rddPairReviewData steps so that it happens in one go.

摆脱 .collect 因为这会将结果带回驱动程序也许是你错误的实际原因。

Get rid of .collect since that brings the results back to the driver and maybe the actual reason for your error.

这篇关于Spark java.lang.StackOverflowError的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆