MongoDB Spark Connector-聚合速度很慢 [英] MongoDB Spark Connector - aggregation is slow

查看:68
本文介绍了MongoDB Spark Connector-聚合速度很慢的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在Spark应用程序和Mongos控制台上运行相同的聚合管道.在控制台上,眨眼间即可获取数据,并且只需第二次使用"it"即可检索所有期望的数据.但是,根据Spark WebUI,Spark应用程序将花费近两分钟的时间.

I am running the same aggregation pipeline with a Spark Application and on the Mongos console. On the console, the data is fetched within the blink of an eye, and only a second use of "it" is needed to retrieve all expected data. The Spark Application however takes almost two minutes according to the Spark WebUI.

如您所见,正在启动242个任务以获取结果.我不知道为什么在MongoDB聚合仅返回40个文档的同时启动如此大量的任务.看起来开销很高.

As you can see, 242 tasks are being launched to fetch the result. I am not sure why such an high amount of tasks is launched while there are only 40 documents being returned by the MongoDB aggregation. It looks like there is a high overhead.

我在Mongos控制台上运行的查询:

The query I run on the Mongos console:

db.data.aggregate([
   {
      $match:{
         signals:{
            $elemMatch:{
               signal:"SomeSignal",
               value:{
                  $gt:0,
                  $lte:100
               }
            }
         }
      }
   },
   {
      $group:{
         _id:"$root_document",
         firstTimestamp:{
            $min:"$ts"
         },
         lastTimestamp:{
            $max:"$ts"
         },
         count:{
            $sum:1
         }
      }
   }
])

Spark应用程序代码

The Spark Application code

    JavaMongoRDD<Document> rdd = MongoSpark.load(sc);

    JavaMongoRDD<Document> aggregatedRdd = rdd.withPipeline(Arrays.asList(
            Document.parse(
                    "{ $match: { signals: { $elemMatch: { signal: \"SomeSignal\", value: { $gt: 0, $lte: 100 } } } } }"),
            Document.parse(
                    "{ $group : { _id : \"$root_document\", firstTimestamp: { $min: \"$ts\"}, lastTimestamp: { $max: \"$ts\"} , count: { $sum: 1 } } }")));

    JavaRDD<String> outputRdd = aggregatedRdd.map(new Function<Document, String>() {
        @Override
        public String call(Document arg0) throws Exception {
            String output = String.format("%s;%s;%s;%s", arg0.get("_id").toString(),
                    arg0.get("firstTimestamp").toString(), arg0.get("lastTimestamp").toString(),
                    arg0.get("count").toString());
            return output;
        }
    });

    outputRdd.saveAsTextFile("/user/spark/output");

在那之后,我使用 hdfs dfs -getmerge/user/spark/output/output.csv 并比较结果.

After that, I use hdfs dfs -getmerge /user/spark/output/ output.csv and compare the results.

为什么聚合如此缓慢?调用 withPipeline 难道不是要减少需要传输到Spark的数据量吗?看起来它没有像Mongos控制台那样进行聚合.在Mongos控制台上,它的速度非常快.我正在使用Spark 1.6.1和mongo-spark-connector_2.10版本1.1.0.

Why is the aggregation so slow? Isn't the call to withPipeline meant to reduce the amount of data needed to be transfered to Spark? It looks like it isn't doing the same aggregation the Mongos console does. On the Mongos console it is blazing fast. I am using Spark 1.6.1 and mongo-spark-connector_2.10 version 1.1.0.

我想知道的另一件事是启动了两个执行器(因为我使用的是默认执行设置atm),但是只有一个执行器可以完成所有工作.第二个执行者为什么不做任何工作?

Another thing I am wondering about is that two executors get launched (because I am using the default execution settings atm), but only one executor does all the work. Why isn't the second executor doing any work?

当使用其他聚合管道并调用 .count()而不是 saveAsTextFile(..)时,还将创建242个任务.这次将返回65.000个文档.

Edit 2: When using a different aggregation pipeline and calling .count() instead of saveAsTextFile(..), there are also 242 tasks being created. This time there will be 65.000 documents returned.

推荐答案

大量任务是由默认的Mongo Spark分区程序策略引起的.在计算分区时,它会忽略聚合管道,主要有两个原因:

The high number of tasks is caused by the default Mongo Spark partitioner strategy. It ignores the aggregation pipeline when calculating the partitions, for two main reasons:

  1. 它减少了计算分区的成本
  2. 确保分片分区程序和非分片分区程序具有相同的行为

但是,正如您所发现的,它们可以生成空分区,这对您而言是昂贵的.

However, as you've found they can generate empty partitions which in your case is costly.

修复的选择可能是:

  1. 更改分区策略

  1. Change partitioning strategy

用于选择备用分区程序以减少分区数.例如,PaginateByCount会将数据库拆分为一定数量的分区.

For choose an alternative partitioner to reduce the number of partitions. For example the PaginateByCount will split the database into a set number of partitions.

创建自己的分区程序-只需实现特征,就可以应用聚合管道并对结果进行分区.参见HalfwayPartitioner 自定义分区程序测试为例.

Create your own partitioner - simply implement the trait and you will be able to apply the aggregation pipeline and partition up the results. See the HalfwayPartitioner and custom partitioner test for an example.

使用$ out将结果预聚合到一个集合中,然后从那里读取.

Pre aggregate the results into a collection using $out and read from there.

自定义分区程序应该提供最佳解决方案,但是有多种方法可以更好地利用可用的默认分区程序.

A custom partitioner should produce the best solution but there are ways to make better use of the available default partitioners.

如果您认为应该有一个使用聚合管道计算分区的默认分区程序,那么请向MongoDB添加票证

If you think there should be a default partitioner that uses the aggregation pipeline to calculate the partitions then please add a ticket to the MongoDB Spark Jira project.

这篇关于MongoDB Spark Connector-聚合速度很慢的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆