Spark无法将负载平均分配给任务 [英] Spark not distributing load to tasks evenly

查看:89
本文介绍了Spark无法将负载平均分配给任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

RDD.saveAsTextFile的最后阶段很慢.我怀疑记录在各个分区和任务之间分布不均的问题.有什么方法可以强制执行此操作吗?

RDD.saveAsTextFile the last stage is very slow. I am suspecting that the issues that the records are not evenly distributed across partitions and tasks. Is there any way to force this?

   public static JavaRDD<String> getJsonUserIdVideoIdRDD(JavaRDD<Rating> cachedRating,
                                                      JavaPairRDD<Integer, Integer> userIdClusterId,
                                                      int numPartitions, String outDir){
    /*
     convert the JavaRDD<Rating>  to JavaPairRDD<Integer,DmRating>
     */
    JavaPairRDD<Integer,DmRating> userIdDmRating = cachedRating.mapToPair(new PairFunction<Rating, Integer, DmRating>() {
        public Tuple2<Integer, DmRating> call(Rating dmRating) throws Exception {
            return new Tuple2<>(dmRating.user(), (DmRating)dmRating);
        }
    });

    /*
    join this RDD with userIdClusterID RDD by key
     */
    JavaPairRDD<Integer, Tuple2<Integer, DmRating>> userId_T_clusterIdDmRating = userIdClusterId.join(userIdDmRating, numPartitions);

    // extract the clusterId to videoId map
    JavaPairRDD<Integer, Integer> clusterIdVideoId =  userId_T_clusterIdDmRating.mapToPair(new PairFunction<Tuple2<Integer, Tuple2<Integer,DmRating>>, Integer, Integer>() {
        public Tuple2<Integer, Integer> call(Tuple2<Integer, Tuple2<Integer, DmRating>> userIdDmRatingClusterId ) throws Exception {
            Integer userId = userIdDmRatingClusterId._1();
            Tuple2<Integer, DmRating> dmRatingClusterId = userIdDmRatingClusterId._2();
            return new Tuple2<Integer, Integer>(dmRatingClusterId._1(), dmRatingClusterId._2().product());
        }
    });
    //////
    /// Count the popularity of a video in a cluster
    JavaPairRDD<String, Integer> clusterIdVideoIdStrInt = clusterIdVideoId.mapToPair(new PairFunction<Tuple2<Integer, Integer>, String, Integer>() {
        @Override
        public Tuple2<String, Integer> call(Tuple2<Integer, Integer> videoIdClusterId) throws Exception {
            return new Tuple2<>(String.format("%d:%d", videoIdClusterId._1(), videoIdClusterId._2()), 1);
        }
    });
    JavaPairRDD<String, Integer> clusterIdVideoIdStrCount =   clusterIdVideoIdStrInt.reduceByKey(new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer v1, Integer v2) throws Exception {
            return v1+v2;
        }
    });
    ///

    JavaPairRDD<Integer, Tuple2<Integer, Integer>> clusterId_T_videoIdCount = clusterIdVideoIdStrCount.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, Tuple2<Integer, Integer>>() {
        @Override
        public Tuple2<Integer, Tuple2<Integer, Integer>> call(Tuple2<String, Integer> clusterIdVideoIdStrCount) throws Exception {
            String[] splits = clusterIdVideoIdStrCount._1().split(":");
            try{
                if(splits.length==2){
                    int clusterId = Integer.parseInt(splits[0]);
                    int videoId = Integer.parseInt(splits[1]);
                    return new Tuple2<>(clusterId, new Tuple2<>(videoId, clusterIdVideoIdStrCount._2()));
                }else{
                    //Should never occur
                    LOGGER.error("Could not split {} into two with : as the separator!", clusterIdVideoIdStrCount._1());
                }
            }catch (NumberFormatException ex){
                LOGGER.error(ex.getMessage());
            }
            return new Tuple2<>(-1, new Tuple2<>(-1,-1));
        }
    });

    JavaPairRDD<Integer, Iterable<Tuple2<Integer, Integer>>> clusterIdVideoIdGrouped = clusterId_T_videoIdCount.groupByKey();

    JavaPairRDD<Integer, DmRating> clusterIdDmRating = userId_T_clusterIdDmRating.mapToPair(new PairFunction<Tuple2<Integer, Tuple2<Integer, DmRating>>, Integer, DmRating>() {
        @Override
        public Tuple2<Integer, DmRating> call(Tuple2<Integer, Tuple2<Integer, DmRating>> userId_T_clusterIdDmRating) throws Exception {
           return userId_T_clusterIdDmRating._2();
        }
    });

    JavaPairRDD<Integer, Tuple2<DmRating, Iterable<Tuple2<Integer, Integer>>>> clusterId_T_DmRatingVideoIds = clusterIdDmRating.join(clusterIdVideoIdGrouped, numPartitions);

    JavaPairRDD<Integer, String> userIdStringRDD = clusterId_T_DmRatingVideoIds.mapToPair(new PairFunction<Tuple2<Integer, Tuple2<DmRating, Iterable<Tuple2<Integer, Integer>>>>, Integer, String>() {
        @Override
        public Tuple2<Integer, String> call(Tuple2<Integer, Tuple2<DmRating, Iterable<Tuple2<Integer, Integer>>>> v1) throws Exception {
            int clusterId = v1._1();
            Tuple2<DmRating, Iterable<Tuple2<Integer, Integer>>> tuple = v1._2();
            DmRating rating = tuple._1();
            Iterable<Tuple2<Integer, Integer>> videosCounts= tuple._2();
            StringBuilder recosStr = new StringBuilder();
            boolean appendComa = false;
            for(Tuple2<Integer, Integer> videoCount : videosCounts){
                if(appendComa) recosStr.append(",");
                recosStr.append("{");
                recosStr.append("\"video_id\":");
                recosStr.append(videoCount._1());
                recosStr.append(",");
                recosStr.append("\"count\":");
                recosStr.append(videoCount._2());
                recosStr.append("}");
                appendComa = true;
            }
            String val = String.format("{\"user_id\":\"%s\",\"v1st\":\"%s\",\"redis_uid\":%s,\"cluster_id\": %d,\"recommendations\":[  %s ]}", rating.dmUserId,  rating.dmV1stStr, rating.user(), clusterId, recosStr);
            return new Tuple2<Integer, String>(rating.user(), val);
        }
    });
    JavaPairRDD<Integer, Iterable<String>> groupedRdd = userIdStringRDD.groupByKey(numPartitions);
    JavaRDD<String> jsonStringRdd = groupedRdd.map(new Function<Tuple2<Integer, Iterable<String>>, String>() {
        @Override
        public String call(Tuple2<Integer, Iterable<String>> v1) throws Exception {
            for(String str : v1._2()){
                return str;
            }
            LOGGER.error("Could not fetch a string from iterable so returning empty");
            return "";
        }
    });

    //LOGGER.info("Number of items in RDD: {}", jsonStringRDD.count());
    //return jsonStringRDD.persist(StorageLevel.MEMORY_ONLY_SER_2());
    LOGGER.info("Repartitioning the data into {}", numPartitions );
    jsonStringRdd.cache().saveAsTextFile(outDir);
    return jsonStringRdd;
}

集群大小: 1.主机:16个CPU,32GB 2.工作人员4:32CPU,102GB,4X375GB SSD驱动器

Cluster Size: 1. Master : 16 CPU, 32GB 2. Workers 4 : 32CPU, 102GB, 4X375GB SSD Drives

我将代码更改为使用DataFrames.还是同样的问题

I changed the code to use DataFrames instead. Still the same issue

public static void saveAlsKMeansRecosAsParquet(JavaPairRDD<Integer, Tuple2<DmRating, Integer>> userIdRatingClusterIdRDD,
                                                 int numPartitions,
                                                 JavaSparkContext javaSparkContext,
                                                 String outdir){

    JavaRDD<DmRating> dmRatingJavaRDD = userIdRatingClusterIdRDD.map(new Function<Tuple2<Integer, Tuple2<DmRating, Integer>>, DmRating>() {
        public DmRating call(Tuple2<Integer, Tuple2<DmRating, Integer>> v1) throws Exception {
            //Integer userId = v1._1();
            Tuple2<DmRating, Integer> values = v1._2();
            DmRating rating = values._1();
            Integer clusterId = values._2();
            rating.setClusterId(clusterId);
            rating.setVideoId(rating.product());
            rating.setV1stOrUserId((rating.userId== null || rating.userId.isEmpty())? rating.v1stId : rating.userId);
            rating.setRedisId(rating.user());
            return rating;
            //return String.format("{\"clusterId\": %s,\"userId\": %s, \"userId\":\"%s\", \"videoId\": %s}", clusterId, userId, rating.userId, rating.product());
        }
    });
    SQLContext sqlContext = new SQLContext(javaSparkContext);
    DataFrame dmRatingDF = sqlContext.createDataFrame(dmRatingJavaRDD, DmRating.class);
    dmRatingDF.registerTempTable("dmrating");
    DataFrame clusterIdVideoIdDF = sqlContext.sql("SELECT clusterId, videoId FROM dmrating").cache();
    DataFrame rolledupClusterIdVideoIdDF = clusterIdVideoIdDF.rollup("clusterId","videoId").count().cache();
    DataFrame clusterIdUserIdDF = sqlContext.sql("SELECT clusterId, userId, redisId, v1stId FROM dmrating").distinct().cache();
    JavaRDD<Row> rolledUpRDD = rolledupClusterIdVideoIdDF.toJavaRDD();
    JavaRDD<Row> filteredRolledUpRDD = rolledUpRDD.filter(new Function<Row, Boolean>() {
        @Override
        public Boolean call(Row v1) throws Exception {
            //make sure the size and values of the properties are correct
            return !(v1.size()!=3 || v1.isNullAt(0) || v1.isNullAt(1) || v1.isNullAt(2));
        }
    });

    JavaPairRDD<Integer, Tuple2<Integer, Integer>> clusterIdVideoIdCount = filteredRolledUpRDD.mapToPair(new PairFunction<Row, Integer, Tuple2<Integer, Integer>>() {
        @Override
        public Tuple2<Integer, Tuple2<Integer, Integer>> call(Row row) throws Exception {
            Tuple2<Integer, Integer> videoIdCount = new Tuple2<Integer, Integer>(row.getInt(1), Long.valueOf(row.getLong(2)).intValue());
            return new Tuple2<Integer, Tuple2<Integer, Integer>>(row.getInt(0),videoIdCount);
        }
    }).cache();
    JavaPairRDD<Integer, Iterable<Tuple2<Integer, Integer>>> groupedPair = clusterIdVideoIdCount.groupByKey(numPartitions).cache();
    JavaRDD<ClusterIdVideos> groupedFlat = groupedPair.map(new Function<Tuple2<Integer, Iterable<Tuple2<Integer, Integer>>>, ClusterIdVideos>() {
        @Override
        public ClusterIdVideos call(Tuple2<Integer, Iterable<Tuple2<Integer, Integer>>> v1) throws Exception {
            ClusterIdVideos row = new ClusterIdVideos();
            Iterable<Tuple2<Integer, Integer>> videosCounts= v1._2();
            StringBuilder recosStr = new StringBuilder();
            recosStr.append("[");
            boolean appendComa = false;
            for(Tuple2<Integer, Integer> videoCount : videosCounts){
                if(appendComa) recosStr.append(",");
                recosStr.append("{");
                recosStr.append("\"video_id\":");
                recosStr.append(videoCount._1());
                recosStr.append(",");
                recosStr.append("\"count\":");
                recosStr.append(videoCount._2());
                recosStr.append("}");
                appendComa = true;
            }
            recosStr.append("]");
            row.setClusterId(v1._1());
            row.setVideos(recosStr.toString());
            return row;
        }
    }).cache();

    DataFrame groupedClusterId = sqlContext.createDataFrame(groupedFlat, ClusterIdVideos.class);
    DataFrame recosDf = clusterIdUserIdDF.join(groupedClusterId, "clusterId");
    recosDf.write().parquet(outdir);
}

推荐答案

确定.罪魁祸首是groupBy和join操作. Spark网站上的文档说

Ok found the issue. The culprits were groupBy and join operations. The documentation on the Spark's Website says

在Spark中,通常不会将数据分布在各个分区中,以将其放置在特定操作的必要位置.在计算期间,单个任务将在单个分区上进行操作-因此,为了组织要执行的单个reduceByKey reduce任务的所有数据,Spark需要执行所有操作.它必须从所有分区读取以找到所有键的所有值,然后将各个分区中的值汇总在一起以计算每个键的最终结果-这称为shuffle.

In Spark, data is generally not distributed across partitions to be in the necessary place for a specific operation. During computations, a single task will operate on a single partition - thus, to organize all the data for a single reduceByKey reduce task to execute, Spark needs to perform an all-to-all operation. It must read from all partitions to find all the values for all keys, and then bring together values across partitions to compute the final result for each key - this is called the shuffle.

要优化任何join/groupByKey操作,目标应该是减少随机播放.我发现此广告牌对诊断问题非常有帮助.尤其是幻灯片 12 .

To optimize any join/groupByKey operations, the goal should be to reduce the shuffle. I found this deck very helpful in diagnosing the problem. Especially the slide 12.

我知道集群ID数据非常小,每次运行100个集群,因此我为较小的表创建了一个广播变量,并将其广播给所有执行者,并使用该变量进行联接.这样效果很好,并将计算时间从2小时减少到10分钟.

I know that the cluster ids data is very small, 100 clusters per run, so I created a broadcast variable for the smaller table and broadcast it to all executors and use that variable for join. This worked well and reduced the computation time from 2hrs to 10mins.

    //convert json string to DF
    DataFrame  groupedClusterId = sqlContext.read().json(groupedFlat.rdd());
    Broadcast<DataFrame> broadcastDataFrame= javaSparkContext.broadcast(groupedClusterId);

    DataFrame recosDf = clusterIdUserIdDF.join(broadcastDataFrame.value(),"clusterId");
    recosDf.write().parquet(outdir);

这篇关于Spark无法将负载平均分配给任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆