Spark无法将负载平均分配给任务 [英] Spark not distributing load to tasks evenly
问题描述
RDD.saveAsTextFile的最后阶段很慢.我怀疑记录在各个分区和任务之间分布不均的问题.有什么方法可以强制执行此操作吗?
RDD.saveAsTextFile the last stage is very slow. I am suspecting that the issues that the records are not evenly distributed across partitions and tasks. Is there any way to force this?
public static JavaRDD<String> getJsonUserIdVideoIdRDD(JavaRDD<Rating> cachedRating,
JavaPairRDD<Integer, Integer> userIdClusterId,
int numPartitions, String outDir){
/*
convert the JavaRDD<Rating> to JavaPairRDD<Integer,DmRating>
*/
JavaPairRDD<Integer,DmRating> userIdDmRating = cachedRating.mapToPair(new PairFunction<Rating, Integer, DmRating>() {
public Tuple2<Integer, DmRating> call(Rating dmRating) throws Exception {
return new Tuple2<>(dmRating.user(), (DmRating)dmRating);
}
});
/*
join this RDD with userIdClusterID RDD by key
*/
JavaPairRDD<Integer, Tuple2<Integer, DmRating>> userId_T_clusterIdDmRating = userIdClusterId.join(userIdDmRating, numPartitions);
// extract the clusterId to videoId map
JavaPairRDD<Integer, Integer> clusterIdVideoId = userId_T_clusterIdDmRating.mapToPair(new PairFunction<Tuple2<Integer, Tuple2<Integer,DmRating>>, Integer, Integer>() {
public Tuple2<Integer, Integer> call(Tuple2<Integer, Tuple2<Integer, DmRating>> userIdDmRatingClusterId ) throws Exception {
Integer userId = userIdDmRatingClusterId._1();
Tuple2<Integer, DmRating> dmRatingClusterId = userIdDmRatingClusterId._2();
return new Tuple2<Integer, Integer>(dmRatingClusterId._1(), dmRatingClusterId._2().product());
}
});
//////
/// Count the popularity of a video in a cluster
JavaPairRDD<String, Integer> clusterIdVideoIdStrInt = clusterIdVideoId.mapToPair(new PairFunction<Tuple2<Integer, Integer>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, Integer> videoIdClusterId) throws Exception {
return new Tuple2<>(String.format("%d:%d", videoIdClusterId._1(), videoIdClusterId._2()), 1);
}
});
JavaPairRDD<String, Integer> clusterIdVideoIdStrCount = clusterIdVideoIdStrInt.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1+v2;
}
});
///
JavaPairRDD<Integer, Tuple2<Integer, Integer>> clusterId_T_videoIdCount = clusterIdVideoIdStrCount.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Tuple2<Integer, Integer>> call(Tuple2<String, Integer> clusterIdVideoIdStrCount) throws Exception {
String[] splits = clusterIdVideoIdStrCount._1().split(":");
try{
if(splits.length==2){
int clusterId = Integer.parseInt(splits[0]);
int videoId = Integer.parseInt(splits[1]);
return new Tuple2<>(clusterId, new Tuple2<>(videoId, clusterIdVideoIdStrCount._2()));
}else{
//Should never occur
LOGGER.error("Could not split {} into two with : as the separator!", clusterIdVideoIdStrCount._1());
}
}catch (NumberFormatException ex){
LOGGER.error(ex.getMessage());
}
return new Tuple2<>(-1, new Tuple2<>(-1,-1));
}
});
JavaPairRDD<Integer, Iterable<Tuple2<Integer, Integer>>> clusterIdVideoIdGrouped = clusterId_T_videoIdCount.groupByKey();
JavaPairRDD<Integer, DmRating> clusterIdDmRating = userId_T_clusterIdDmRating.mapToPair(new PairFunction<Tuple2<Integer, Tuple2<Integer, DmRating>>, Integer, DmRating>() {
@Override
public Tuple2<Integer, DmRating> call(Tuple2<Integer, Tuple2<Integer, DmRating>> userId_T_clusterIdDmRating) throws Exception {
return userId_T_clusterIdDmRating._2();
}
});
JavaPairRDD<Integer, Tuple2<DmRating, Iterable<Tuple2<Integer, Integer>>>> clusterId_T_DmRatingVideoIds = clusterIdDmRating.join(clusterIdVideoIdGrouped, numPartitions);
JavaPairRDD<Integer, String> userIdStringRDD = clusterId_T_DmRatingVideoIds.mapToPair(new PairFunction<Tuple2<Integer, Tuple2<DmRating, Iterable<Tuple2<Integer, Integer>>>>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<Integer, Tuple2<DmRating, Iterable<Tuple2<Integer, Integer>>>> v1) throws Exception {
int clusterId = v1._1();
Tuple2<DmRating, Iterable<Tuple2<Integer, Integer>>> tuple = v1._2();
DmRating rating = tuple._1();
Iterable<Tuple2<Integer, Integer>> videosCounts= tuple._2();
StringBuilder recosStr = new StringBuilder();
boolean appendComa = false;
for(Tuple2<Integer, Integer> videoCount : videosCounts){
if(appendComa) recosStr.append(",");
recosStr.append("{");
recosStr.append("\"video_id\":");
recosStr.append(videoCount._1());
recosStr.append(",");
recosStr.append("\"count\":");
recosStr.append(videoCount._2());
recosStr.append("}");
appendComa = true;
}
String val = String.format("{\"user_id\":\"%s\",\"v1st\":\"%s\",\"redis_uid\":%s,\"cluster_id\": %d,\"recommendations\":[ %s ]}", rating.dmUserId, rating.dmV1stStr, rating.user(), clusterId, recosStr);
return new Tuple2<Integer, String>(rating.user(), val);
}
});
JavaPairRDD<Integer, Iterable<String>> groupedRdd = userIdStringRDD.groupByKey(numPartitions);
JavaRDD<String> jsonStringRdd = groupedRdd.map(new Function<Tuple2<Integer, Iterable<String>>, String>() {
@Override
public String call(Tuple2<Integer, Iterable<String>> v1) throws Exception {
for(String str : v1._2()){
return str;
}
LOGGER.error("Could not fetch a string from iterable so returning empty");
return "";
}
});
//LOGGER.info("Number of items in RDD: {}", jsonStringRDD.count());
//return jsonStringRDD.persist(StorageLevel.MEMORY_ONLY_SER_2());
LOGGER.info("Repartitioning the data into {}", numPartitions );
jsonStringRdd.cache().saveAsTextFile(outDir);
return jsonStringRdd;
}
集群大小: 1.主机:16个CPU,32GB 2.工作人员4:32CPU,102GB,4X375GB SSD驱动器
Cluster Size: 1. Master : 16 CPU, 32GB 2. Workers 4 : 32CPU, 102GB, 4X375GB SSD Drives
我将代码更改为使用DataFrames.还是同样的问题
I changed the code to use DataFrames instead. Still the same issue
public static void saveAlsKMeansRecosAsParquet(JavaPairRDD<Integer, Tuple2<DmRating, Integer>> userIdRatingClusterIdRDD,
int numPartitions,
JavaSparkContext javaSparkContext,
String outdir){
JavaRDD<DmRating> dmRatingJavaRDD = userIdRatingClusterIdRDD.map(new Function<Tuple2<Integer, Tuple2<DmRating, Integer>>, DmRating>() {
public DmRating call(Tuple2<Integer, Tuple2<DmRating, Integer>> v1) throws Exception {
//Integer userId = v1._1();
Tuple2<DmRating, Integer> values = v1._2();
DmRating rating = values._1();
Integer clusterId = values._2();
rating.setClusterId(clusterId);
rating.setVideoId(rating.product());
rating.setV1stOrUserId((rating.userId== null || rating.userId.isEmpty())? rating.v1stId : rating.userId);
rating.setRedisId(rating.user());
return rating;
//return String.format("{\"clusterId\": %s,\"userId\": %s, \"userId\":\"%s\", \"videoId\": %s}", clusterId, userId, rating.userId, rating.product());
}
});
SQLContext sqlContext = new SQLContext(javaSparkContext);
DataFrame dmRatingDF = sqlContext.createDataFrame(dmRatingJavaRDD, DmRating.class);
dmRatingDF.registerTempTable("dmrating");
DataFrame clusterIdVideoIdDF = sqlContext.sql("SELECT clusterId, videoId FROM dmrating").cache();
DataFrame rolledupClusterIdVideoIdDF = clusterIdVideoIdDF.rollup("clusterId","videoId").count().cache();
DataFrame clusterIdUserIdDF = sqlContext.sql("SELECT clusterId, userId, redisId, v1stId FROM dmrating").distinct().cache();
JavaRDD<Row> rolledUpRDD = rolledupClusterIdVideoIdDF.toJavaRDD();
JavaRDD<Row> filteredRolledUpRDD = rolledUpRDD.filter(new Function<Row, Boolean>() {
@Override
public Boolean call(Row v1) throws Exception {
//make sure the size and values of the properties are correct
return !(v1.size()!=3 || v1.isNullAt(0) || v1.isNullAt(1) || v1.isNullAt(2));
}
});
JavaPairRDD<Integer, Tuple2<Integer, Integer>> clusterIdVideoIdCount = filteredRolledUpRDD.mapToPair(new PairFunction<Row, Integer, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Tuple2<Integer, Integer>> call(Row row) throws Exception {
Tuple2<Integer, Integer> videoIdCount = new Tuple2<Integer, Integer>(row.getInt(1), Long.valueOf(row.getLong(2)).intValue());
return new Tuple2<Integer, Tuple2<Integer, Integer>>(row.getInt(0),videoIdCount);
}
}).cache();
JavaPairRDD<Integer, Iterable<Tuple2<Integer, Integer>>> groupedPair = clusterIdVideoIdCount.groupByKey(numPartitions).cache();
JavaRDD<ClusterIdVideos> groupedFlat = groupedPair.map(new Function<Tuple2<Integer, Iterable<Tuple2<Integer, Integer>>>, ClusterIdVideos>() {
@Override
public ClusterIdVideos call(Tuple2<Integer, Iterable<Tuple2<Integer, Integer>>> v1) throws Exception {
ClusterIdVideos row = new ClusterIdVideos();
Iterable<Tuple2<Integer, Integer>> videosCounts= v1._2();
StringBuilder recosStr = new StringBuilder();
recosStr.append("[");
boolean appendComa = false;
for(Tuple2<Integer, Integer> videoCount : videosCounts){
if(appendComa) recosStr.append(",");
recosStr.append("{");
recosStr.append("\"video_id\":");
recosStr.append(videoCount._1());
recosStr.append(",");
recosStr.append("\"count\":");
recosStr.append(videoCount._2());
recosStr.append("}");
appendComa = true;
}
recosStr.append("]");
row.setClusterId(v1._1());
row.setVideos(recosStr.toString());
return row;
}
}).cache();
DataFrame groupedClusterId = sqlContext.createDataFrame(groupedFlat, ClusterIdVideos.class);
DataFrame recosDf = clusterIdUserIdDF.join(groupedClusterId, "clusterId");
recosDf.write().parquet(outdir);
}
推荐答案
确定.罪魁祸首是groupBy和join操作. Spark网站上的文档说
Ok found the issue. The culprits were groupBy and join operations. The documentation on the Spark's Website says
在Spark中,通常不会将数据分布在各个分区中,以将其放置在特定操作的必要位置.在计算期间,单个任务将在单个分区上进行操作-因此,为了组织要执行的单个reduceByKey reduce任务的所有数据,Spark需要执行所有操作.它必须从所有分区读取以找到所有键的所有值,然后将各个分区中的值汇总在一起以计算每个键的最终结果-这称为shuffle.
In Spark, data is generally not distributed across partitions to be in the necessary place for a specific operation. During computations, a single task will operate on a single partition - thus, to organize all the data for a single reduceByKey reduce task to execute, Spark needs to perform an all-to-all operation. It must read from all partitions to find all the values for all keys, and then bring together values across partitions to compute the final result for each key - this is called the shuffle.
要优化任何join/groupByKey操作,目标应该是减少随机播放.我发现此广告牌对诊断问题非常有帮助.尤其是幻灯片 12 .
To optimize any join/groupByKey operations, the goal should be to reduce the shuffle. I found this deck very helpful in diagnosing the problem. Especially the slide 12.
我知道集群ID数据非常小,每次运行100个集群,因此我为较小的表创建了一个广播变量,并将其广播给所有执行者,并使用该变量进行联接.这样效果很好,并将计算时间从2小时减少到10分钟.
I know that the cluster ids data is very small, 100 clusters per run, so I created a broadcast variable for the smaller table and broadcast it to all executors and use that variable for join. This worked well and reduced the computation time from 2hrs to 10mins.
//convert json string to DF
DataFrame groupedClusterId = sqlContext.read().json(groupedFlat.rdd());
Broadcast<DataFrame> broadcastDataFrame= javaSparkContext.broadcast(groupedClusterId);
DataFrame recosDf = clusterIdUserIdDF.join(broadcastDataFrame.value(),"clusterId");
recosDf.write().parquet(outdir);
这篇关于Spark无法将负载平均分配给任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!