何时使用 mapParitions 和 mapPartitionsWithIndex? [英] when to use mapParitions and mapPartitionsWithIndex?

查看:16
本文介绍了何时使用 mapParitions 和 mapPartitionsWithIndex?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

PySpark 文档描述了两个函数:

<块引用>

mapPartitions(f,preservesPartitioning=False)通过对这个 RDD 的每个分区应用一个函数来返回一个新的 RDD.>>>rdd = sc.parallelize([1, 2, 3, 4], 2)>>>def f(iterator): 产量总和(iterator)>>>rdd.mapPartitions(f).collect()[3, 7]

还有……

<块引用>

mapPartitionsWithIndex(f,preservesPartitioning=False)通过对这个 RDD 的每个分区应用一个函数来返回一个新的 RDD,同时跟踪原始分区的索引.>>>rdd = sc.parallelize([1, 2, 3, 4], 4)>>>def f(splitIndex, iterator): 收益率 splitIndex>>>rdd.mapPartitionsWithIndex(f).sum()6

这些函数试图解决哪些用例?我不明白为什么需要它们.

解决方案

要回答这个问题,我们需要将 map 与 mapPartitions/mapPartitionsWithIndex 进行比较(mapPartitions 和 mapPartitionsWithIndex 几乎做同样的事情,除了 mapPartitionsWithIndex 您可以跟踪哪个分区正在已处理).

现在 mapPartitions 和 mapPartitionsWithIndex 用于优化应用程序的性能.为了便于理解,假设您的 RDD 中的所有元素都是 XML 元素,并且您需要一个解析器来处理它们中的每一个.所以你必须采取一个好的解析器类的实例来继续前进.您可以通过两种方式做到这一点:

ma​​p + foreach: 这种情况下,对于每个元素,都会创建解析器类的一个实例,处理该元素,然后及时销毁该实例,但该实例不会用于其他元素.因此,如果您正在使用分布在 4 个分区中的 12 个元素的 RDD,解析器实例将被创建 12 次.正如您所知,创建实例是一项非常昂贵的操作,因此需要时间.

ma​​pPartitions/mapPartitionsWithIndex:这两个方法可以稍微解决上面的情况.mapPartitions/mapPartitionsWithIndex 适用于分区,而不适用于元素(请不要误会我的意思,所有元素都将被处理).这些方法将为每个分区创建一次解析器实例.由于您只有 4 个分区,解析器实例将被创建 4 次(在本例中比 map 少 8 倍).但是您将传递给这些方法的函数应该接受一个 Iterator 对象(一次将分区的所有元素作为输入).因此,在 mapPartitions 和 mapPartitionsWithIndex 的情况下,将创建解析器实例,将处理当前分区的所有元素,然后稍后将通过 GC 销毁该实例.您会注意到它们可以显着提高应用程序的性能.

所以底线是,每当您看到某些操作对所有元素都是通用的,并且一般而言,您可以执行一次并可以处理所有这些操作,最好使用 mapPartitions/mapPartitionsWithIndex.

请在下面的两个链接中找到代码示例的解释:https://bzhangusc.wordpress.com/2014/06/19/optimize-map-performamce-with-mappartitions/http://apachesparkbook.blogspot.in/2015/11/mappartition-example.html

The PySpark documentation describes two functions:

mapPartitions(f, preservesPartitioning=False)

   Return a new RDD by applying a function to each partition of this RDD.

   >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
   >>> def f(iterator): yield sum(iterator)
   >>> rdd.mapPartitions(f).collect()
   [3, 7]

And ...

mapPartitionsWithIndex(f, preservesPartitioning=False)

   Return a new RDD by applying a function to each partition of this RDD, 
   while tracking the index of the original partition.

   >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
   >>> def f(splitIndex, iterator): yield splitIndex
   >>> rdd.mapPartitionsWithIndex(f).sum()
   6

What use cases do these functions attempt to solve? I can't see why they would be required.

解决方案

To answer this question we need to compare map with mapPartitions/mapPartitionsWithIndex (mapPartitions and mapPartitionsWithIndex pretty much do the same thing except with mapPartitionsWithIndex you can track which partition is being processed).

Now mapPartitions and mapPartitionsWithIndex are used to optimize the performance of your application. Just for the sake of understanding let's say all the elements in your RDD are XML elements and you need a parser to process each of them. So you have to take an instance of a good parser class to move ahead with. You could do it in two ways:

map + foreach: In this case for each element, an instance of the parser class will be created, the element will be processed and then the instance will be destroyed in time but this instance will not be used for other elements. So if you are working with an RDD of 12 elements distributed among 4 partitions, the parser instance will be created 12 times. And as you know creating an instance is a very expensive operation so it will take time.

mapPartitions/mapPartitionsWithIndex: These two methods are able to address the above situation a little bit. mapPartitions/mapPartitionsWithIndex works on the partitions, not on the elements (please don't get me wrong, all elements will be processed). These methods will create the parser instance once for each partition. And as you have only 4 partitions, the parser instance will be created 4 times (for this example 8 times less than map). But the function you will pass to these methods should take an Iterator object (to take all the elements of a partition at once as input). So in case of mapPartitions and mapPartitionsWithIndex the parser instance will be created, all elements for the current partition will be processed, and then the instance will be destroyed later by GC. And you will notice that they can improve the performance of your application significantly.

So the bottom-line is, whenever you see that some operations are common to all elements, and in general, you could do it once and could process all of them, it's better to go with mapPartitions/mapPartitionsWithIndex.

Please find the below two links for explanations with code example: https://bzhangusc.wordpress.com/2014/06/19/optimize-map-performamce-with-mappartitions/ http://apachesparkbook.blogspot.in/2015/11/mappartition-example.html

这篇关于何时使用 mapParitions 和 mapPartitionsWithIndex?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆