火花 java.lang.StackOverflowError [英] Spark java.lang.StackOverflowError

查看:16
本文介绍了火花 java.lang.StackOverflowError的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用 spark 来计算用户评论的 pagerank,但是当我在大型数据集(40k 条目)上运行我的代码时,我不断收到 Spark java.lang.StackOverflowError.在少量条目上运行代码时,它可以正常工作.

条目示例:

product/productId: B00004CK40 review/userId: A39IIHQF18YGZA review/profileName: CAM Salas review/helpfulness: 0/0 review/score: 4.0 review/time: 1175817600 review/summary: Reliable comedy review/text: Nice剧本,演得很好的喜剧,还有一个年轻的妮可莱特·谢里丹.库萨克处于最佳状态.

代码:

public void calculatePageRank() {sc.clearCallSite();sc.clearJobGroup();JavaRDD字符串>rddFileData = sc.textFile(inputFileName).cache();sc.setCheckpointDir("pagerankCheckpoint/");JavaRDD字符串>rddMovieData = rddFileData.map(new Function < String, String > () {@覆盖公共字符串调用(字符串 arg0)抛出异常 {字符串[] 数据 = arg0.split("	");String movieId = data[0].split(":")[1].trim();String userId = data[1].split(":")[1].trim();返回电影 ID + "	" + 用户 ID;}});JavaPairRDD<String, Iterable<String>>rddPairReviewData = rddMovieData.mapToPair(new PairFunction < String, String, String > () {@覆盖公共元组2 <字符串,字符串>调用(字符串 arg0)抛出异常 {字符串[] 数据 = arg0.split("	");返回新的元组2 <字符串,字符串>(数据[0],数据[1]);}}).groupByKey().cache();JavaRDD<Iterable<String>>cartUsers = rddPairReviewData.map(f -> f._2());列表<可迭代<字符串>>cartUsersList = cartUsers.collect();JavaPairRDD<字符串,字符串>最终笛卡尔=空;int iterCounter = 0;for(Iterable<String> out : cartUsersList){JavaRDD<字符串>currentUsersRDD = sc.parallelize(Lists.newArrayList(out));如果(最终笛卡尔==空){finalCartesian = currentUsersRDD.cartesian(currentUsersRDD);}别的{finalCartesian = currentUsersRDD.cartesian(currentUsersRDD).union(finalCartesian);如果(iterCounter % 20 == 0){finalCartesian.checkpoint();}}}JavaRDD<Tuple2<String,String>>finalCartesianToTuple = finalCartesian.map(m -> new Tuple2(m._1(),m._2()));finalCartesianToTuple = finalCartesianToTuple.filter(x -> x._1().compareTo(x._2())!=0);JavaPairRDD<字符串,字符串>userIdPairs = finalCartesianToTuple.mapToPair(m -> new Tuple2(m._1(),m._2()));JavaRDD<字符串>userIdPairsString = userIdPairs.map(new Function < Tuple2<String, String>, String > () {//Tuple2, Tuple2t) 抛出异常 {返回 t._1 + " " + t._2;}});尝试 {//使用此 https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java 计算 pagerankJavaPageRank.calculatePageRank(userIdPairsString, 100);} 捕捉(异常 e){//TODO 自动生成的 catch 块e.printStackTrace();}sc.close();}

解决方案

我有很多建议可以帮助您大大提高问题中代码的性能.

  1. 缓存:缓存应该用于那些您需要为相同/不同的操作(迭代算法)反复引用的数据集.

<块引用>

一个例子是 RDD.count——告诉你在文件,该文件需要被读取.所以如果你写 RDD.count,在此时将读取文件,计算行数,然后计数将被返回.

如果再次调用 RDD.count 会怎样?同样的事情:文件将是再读一遍数.那么 RDD.cache 有什么作用呢?现在,如果你跑RDD.count第一次,文件会被加载,缓存,然后算了.如果您第二次调用 RDD.count,该操作将使用缓存.它只会从缓存中获取数据并计算行,无需重新计算.

阅读更多关于缓存的信息这里.

在您的代码示例中,您没有重用已缓存的任何内容.所以你可以从那里删除 .cache.

  1. 并行化:在代码示例中,您已经并行化了 RDD 中的每个单独元素,这些元素已经是一个分布式集合.我建议您合并 rddFileDatarddMovieDatarddPairReviewData 步骤,以便一次性完成.

摆脱 .collect 因为这会将结果返回给驱动程序,并且可能是错误的实际原因.

I'm using spark in order to calculate the pagerank of user reviews, but I keep getting Spark java.lang.StackOverflowError when I run my code on a big dataset (40k entries). when running the code on a small number of entries it works fine though.

Entry Example :

product/productId: B00004CK40   review/userId: A39IIHQF18YGZA   review/profileName: C. A. M. Salas  review/helpfulness: 0/0 review/score: 4.0   review/time: 1175817600 review/summary: Reliable comedy review/text: Nice script, well acted comedy, and a young Nicolette Sheridan. Cusak is in top form.

The Code:

public void calculatePageRank() {
    sc.clearCallSite();
    sc.clearJobGroup();

    JavaRDD < String > rddFileData = sc.textFile(inputFileName).cache();
    sc.setCheckpointDir("pagerankCheckpoint/");

    JavaRDD < String > rddMovieData = rddFileData.map(new Function < String, String > () {

        @Override
        public String call(String arg0) throws Exception {
            String[] data = arg0.split("	");
            String movieId = data[0].split(":")[1].trim();
            String userId = data[1].split(":")[1].trim();
            return movieId + "	" + userId;
        }
    });

    JavaPairRDD<String, Iterable<String>> rddPairReviewData = rddMovieData.mapToPair(new PairFunction < String, String, String > () {

        @Override
        public Tuple2 < String, String > call(String arg0) throws Exception {
            String[] data = arg0.split("	");
            return new Tuple2 < String, String > (data[0], data[1]);
        }
    }).groupByKey().cache();


    JavaRDD<Iterable<String>> cartUsers = rddPairReviewData.map(f -> f._2());
      List<Iterable<String>> cartUsersList = cartUsers.collect();
      JavaPairRDD<String,String> finalCartesian = null;
      int iterCounter = 0;
      for(Iterable<String> out : cartUsersList){
          JavaRDD<String> currentUsersRDD = sc.parallelize(Lists.newArrayList(out));
          if(finalCartesian==null){
              finalCartesian = currentUsersRDD.cartesian(currentUsersRDD);
          }
          else{
              finalCartesian = currentUsersRDD.cartesian(currentUsersRDD).union(finalCartesian);
              if(iterCounter % 20 == 0) {
                  finalCartesian.checkpoint();
              }
          }
      }
      JavaRDD<Tuple2<String,String>> finalCartesianToTuple = finalCartesian.map(m -> new Tuple2<String,String>(m._1(),m._2()));

      finalCartesianToTuple = finalCartesianToTuple.filter(x -> x._1().compareTo(x._2())!=0);
      JavaPairRDD<String, String> userIdPairs = finalCartesianToTuple.mapToPair(m -> new Tuple2<String,String>(m._1(),m._2()));

      JavaRDD<String> userIdPairsString = userIdPairs.map(new Function < Tuple2<String, String>, String > () {

        //Tuple2<Tuple2<MovieId, userId>, Tuple2<movieId, userId>>
          @Override
          public String call (Tuple2<String, String> t) throws Exception {
            return t._1 + " " + t._2;
          }
      });

    try {

//calculate pagerank using this https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
        JavaPageRank.calculatePageRank(userIdPairsString, 100);
    } catch (Exception e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

    sc.close();

}

解决方案

I have multiple suggestions which will help you to greatly improve the performance of the code in your question.

  1. Caching: Caching should be used on those data sets which you need to refer to again and again for same/ different operations (iterative algorithms.

An example is RDD.count — to tell you the number of lines in the file, the file needs to be read. So if you write RDD.count, at this point the file will be read, the lines will be counted, and the count will be returned.

What if you call RDD.count again? The same thing: the file will be read and counted again. So what does RDD.cache do? Now, if you run RDD.count the first time, the file will be loaded, cached, and counted. If you call RDD.count a second time, the operation will use the cache. It will just take the data from the cache and count the lines, no recomputing.

Read more about caching here.

In your code sample you are not reusing anything that you've cached. So you may remove the .cache from there.

  1. Parallelization: In the code sample, you've parallelized every individual element in your RDD which is already a distributed collection. I suggest you to merge the rddFileData, rddMovieData and rddPairReviewData steps so that it happens in one go.

Get rid of .collect since that brings the results back to the driver and maybe the actual reason for your error.

这篇关于火花 java.lang.StackOverflowError的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆