迈向限制大RDD [英] Towards limiting the big RDD

查看:225
本文介绍了迈向限制大RDD的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在阅读很多图片,我想在其中开发一小部分。因此,我正试图了解






编辑:

  tiny_d = d.limit(1).map(lambda x:x.photo_id)
tiny_d.map(lambda x:x.photo_id).first()

给出 PipelinedRDD ,如这里,它实际上不会做任何动作,只是一个转换。



然而,第二行也会处理整个数据集(事实上,现在的任务数量与以前一样多,再加上一个!)。




* [2]即刻执行,而[4]仍在运行并且已超过3小时。



$ 由于名称的原因,我无法在文档中找到它。 >解决方案

基于你的代码,这里更简单的Spark 2.0测试用例

  case class my (x:Int)
val rdd = sc.parallelize(0.until(10000),1000).map {x => my(x)}
val df1 = spark.createDataFrame(rdd)
val df2 = df1.limit(1)
df1.map {r => r.getAs [Int](0)} .first
df2.map {r => r.getAs [Int](0)} .first //比前一行慢得多

实际上,Dataset.first等同于Dataset.limit(1).collect,因此请检查两种情况的实际计划:

 阶> df1.map {r => r.getAs [Int](0)} .limit(1).explain 
== Physical Plan ==
CollectLimit 1
+ - * SerializeFromObject [input [0,int,true] AS值#124]
+ - * MapElements< function1>,obj#123:int
+ - * DeserializeToObject createexternalrow(x#74,StructField(x,IntegerType,false)),obj#122 :org.apache.spark.sql.Row
+ - 扫描ExistingRDD [x#74]

scala> df2.map {r => r.getAs [Int](0)} .limit(1).explain
== Physical Plan ==
CollectLimit 1
+ - * SerializeFromObject [input [0,int,true] AS值#131]
+ - * MapElements< function1>,obj#130:int
+ - * DeserializeToObject createexternalrow(x#74,StructField(x,IntegerType,false)),obj#129 :org.apache.spark.sql.Row
+ - * GlobalLimit 1
+ - Exchange SinglePartition
+ - * LocalLimit 1
+ - 扫描ExistingRDD [x#74]

对于第一种情况,它与CollectLimitExec物理运算符中的优化有关。也就是说,它将首先获取第一个分区以获取行数的限制数,在这种情况下,如果不满足,则获取1个分区,然后获取更多分区,直到达到所需的限制。所以一般来说,如果第一个分区不是空的,那么只有第一个分区会被计算和获取。其他分区甚至不会被计算。然而,在第二种情况下,CollectLimitExec中的优化没有帮助,因为之前的限制操作涉及洗牌操作。将计算所有分区,并在每个分区上运行LocalLimit(1)以获得1行,然后将所有分区混洗到一个分区中。 CollectLimitExec将从结果单个分区获取1行。


I am reading many images and I would like to work on a tiny subset of them for developing. As a result I am trying to understand how and could make that happen:

In [1]: d = sqlContext.read.parquet('foo')
In [2]: d.map(lambda x: x.photo_id).first()
Out[2]: u'28605'

In [3]: d.limit(1).map(lambda x: x.photo_id)
Out[3]: PythonRDD[31] at RDD at PythonRDD.scala:43

In [4]: d.limit(1).map(lambda x: x.photo_id).first()
// still running...

..so what is happening? I would expect the limit() to run much faster than what we had in [2], but that's not the case*.

Below I will describe my understanding, and please correct me, since obviously I am missing something:

  1. d is an RDD of pairs (I know that from the schema) and I am saying with the map function:

    i) Take every pair (which will be named x and give me back the photo_id attribute).

    ii) That will result in a new (anonymous) RDD, in which we are applying the first() method, which I am not sure how it works$, but should give me the first element of that anonymous RDD.

  2. In [3], we limit the d RDD to 1, which means that despite d has many elements, use only 1 and apply the map function to that one element only. The Out [3] should be the RDD created by the mapping.

  3. In [4], I would expect to follow the logic of [3] and just print the one and only element of the limited RDD...


As expected, after looking at the monitor, [4] seems to process the whole dataset, while the others aren't, so it seems that I am not using limit() correctly, or that that's not what am I looking for:


Edit:

tiny_d = d.limit(1).map(lambda x: x.photo_id)
tiny_d.map(lambda x: x.photo_id).first()

The first will give a PipelinedRDD, which as described here, it will not actually do any action, just a transformation.

However, the second line will also process the whole dataset (as a matter of fact, the number of Tasks now are as many as before, plus one!).


*[2] executed instantly, while [4] is still running and >3h have passed..

$I couldn't find it in the documentation, because of the name.

解决方案

Based on your code, here is simpler test case on Spark 2.0

case class my (x: Int)
val rdd = sc.parallelize(0.until(10000), 1000).map { x => my(x) }
val df1 = spark.createDataFrame(rdd)
val df2 = df1.limit(1)
df1.map { r => r.getAs[Int](0) }.first
df2.map { r => r.getAs[Int](0) }.first // Much slower than the previous line

Actually, Dataset.first is equivalent to Dataset.limit(1).collect, so check the physical plan of the two cases:

scala> df1.map { r => r.getAs[Int](0) }.limit(1).explain
== Physical Plan ==
CollectLimit 1
+- *SerializeFromObject [input[0, int, true] AS value#124]
   +- *MapElements <function1>, obj#123: int
      +- *DeserializeToObject createexternalrow(x#74, StructField(x,IntegerType,false)), obj#122: org.apache.spark.sql.Row
         +- Scan ExistingRDD[x#74]

scala> df2.map { r => r.getAs[Int](0) }.limit(1).explain
== Physical Plan ==
CollectLimit 1
+- *SerializeFromObject [input[0, int, true] AS value#131]
   +- *MapElements <function1>, obj#130: int
      +- *DeserializeToObject createexternalrow(x#74, StructField(x,IntegerType,false)), obj#129: org.apache.spark.sql.Row
         +- *GlobalLimit 1
            +- Exchange SinglePartition
               +- *LocalLimit 1
                  +- Scan ExistingRDD[x#74]

For the first case, it is related to an optimisation in the CollectLimitExec physical operator. That is, it will first fetch the first partition to get limit number of row, 1 in this case, if not satisfied, then fetch more partitions, until the desired limit is reached. So generally, if the first partition is not empty, only the first partition will be calculated and fetched. Other partitions will even not be computed.

However, in the second case, the optimisation in the CollectLimitExec does not help, because the previous limit operation involves a shuffle operation. All partitions will be computed, and running LocalLimit(1) on each partition to get 1 row, and then all partitions are shuffled into a single partition. CollectLimitExec will fetch 1 row from the resulted single partition.

这篇关于迈向限制大RDD的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆