pySpark forEachPartition-代码在哪里执行 [英] pySpark forEachPartition - Where is code executed

查看:91
本文介绍了pySpark forEachPartition-代码在哪里执行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用2.3版的pySpark(无法在当前的开发系统中更新到2.4版),并且对

I'm using pySpark in version 2.3 (cannot update to 2.4 in my current dev-System) and have the following questions concerning the foreachPartition.

首先介绍一下上下文:据我了解,pySpark- UDFs 强制在Python实例中在Java虚拟机(JVM)外部执行Python代码,从而降低了性能.由于我需要对数据应用一些Python函数并希望最大程度地减少开销成本,因此我想到了至少将一堆可处理的数据加载到驱动程序中并将其作为Pandas-DataFrame进行处理的想法.无论如何,这将导致丧失Spark具有的并行性优势.然后我读到 foreachPartition 将一个函数应用于分区中的所有数据,因此允许并行处理.

First a little context: As far as I understood pySpark-UDFs force the Python-code to be executed outside the Java Virtual Machine (JVM) in a Python-instance, making it performance-costing. Since I need to apply some Python-functions to my data and want to minimize overhead costs, I had the idea to at least load a handable bunch of data into the driver and process it as Pandas-DataFrame. Anyhow, this would lead to a loss of the parallelism-advantage Spark has. Then I read that foreachPartition applies a function to all the data within a partition and, hence, allows parallel processing.

我现在的问题是:

  1. 当我通过 foreachPartition 应用Python函数时,Python执行是否在驱动程序进程中进行(因此,分区数据通过网络传输到了我的驱动程序)?

  1. When I apply a Python-function via foreachPartition, does the Python-execution take place within the driver process (and the partition-data is therefore transfered over the network to my driver)?

是在 foreachPartition 中按行处理的数据(意味着每个RDD行都被逐一传输到Python实例),还是一次处理了分区数据(例如,整个分区被转移到实例并由一个Python实例整体处理)?

Is the data processed row-wise within foreachPartition (meaning every RDD-row is transfered one by one to the Python-instance), or is the partition-data processed at once (meaning, for example, the whole partition is transfered to the instance and is handled as whole by one Python-instance)?

提前感谢您的输入!

我当前的驱动程序中"-在 forEachPartition 之前用于处理整个批次的解决方案如下:

My current "in driver" - solution I used before forEachPartition in order to process entire batches looked like:

def partition_generator(rdd):
    glom = rdd.glom()
    #Optionally persist glom
    for partition in range(rdd.getNumPartitions()):
        yield glom.map(lambda row: row[partition]).collect()

有关此处发生情况的一些解释: glom 将所有分区的相应行分组到一个列表中.取自文档:

A little explanation about what happens here: glom groups the respective rows of all partitions to a list. Taken from the docs:

glom(self):返回通过将每个分区内的所有元素合并到列表中而创建的RDD.

glom(self): Return an RDD created by coalescing all elements within each partition into a list.

因此, for 循环遍历可用分区的数量( getNumPartitions()),并在每次迭代中在驱动程序内生成一个分区( glom.map(lambda row:row [partition]).collect()).

So the for-loop iterates over the number of available partitions (getNumPartitions()) and in every iteration a partition is yielded within the driver (glom.map(lambda row: row[partition]).collect()).

推荐答案

幸运的是,我偶然发现了Mrinal对 mapPartitions 的出色解释(回答

Luckily I stumbled upon this great explanation of mapPartitions from Mrinal (answered here).

mapPartitions 在RDD的每个分区上应用一个函数.因此,如果分区分布在不同的节点上,则可以使用并行化.在这些节点上创建了处理Python函数所必需的相应Python实例.虽然 foreachPartition 仅应用函数(例如,将数据写入.csv文件),但 mapPartitions 也会返回新的RDD.因此,使用 foreachPartition 对我来说是错误的选择.

mapPartitions applies a function on each partition of an RDD. Hence, parallelization can be used if the partitions are distributed over different nodes. The corresponding Python-instances, which are necessary for processing the Python-functions, are created on these nodes. While foreachPartition only applies a function (e.g. write your data in a .csv-file), mapPartitions also returns a new RDD. Therefore, using foreachPartition was the wrong choice for me.

为了回答我的第二个问题: map UDF 之类的函数创建一个新的Python实例,并逐行从DataFrame/RDD传递数据,导致很多开销. foreachPartition mapPartitions (均为RDD函数)将整个分区转移到Python实例.

In order to answer my second question: Functions like map or UDFs create a new Python-instance and pass data from the DataFrame/RDD row-by-row, resulting in a lot of overhead. foreachPartition and mapPartitions (both RDD-functions) transfer an entire partition to a Python-instance.

此外,使用生成器还减少了迭代此传输的分区数据所需的内存量(分区作为迭代器对象处理,然后通过对该对象进行迭代来处理每一行).

Additionally, using generators also reduces the amount of memory necessary for iterating over this transferred partition data (partitions are handled as iterator objects, while each row is then processed by iterating over this object).

一个例子可能像这样:

def generator(partition):
    """
    Function yielding some result created by some function applied to each row of a partition (in this case lower-casing a string)

    @partition: iterator-object of partition
    """

    for row in partition:
        yield [word.lower() for word in row["text"]]


df = spark.createDataFrame([(["TESTA"], ), (["TESTB"], )], ["text"])
df = df.repartition(2)
df.rdd.mapPartitions(generator).toDF(["text"]).show()


#Result:
+-----+
| text|
+-----+
|testa|
|testb|
+-----+

希望这可以帮助面临类似问题的人:)

Hope this helps somebody facing similar problems :)

这篇关于pySpark forEachPartition-代码在哪里执行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆