pySpark forEachPartition-代码在哪里执行 [英] pySpark forEachPartition - Where is code executed
问题描述
我正在使用2.3版的pySpark(无法在当前的开发系统中更新到2.4版),并且对
I'm using pySpark in version 2.3 (cannot update to 2.4 in my current dev-System) and have the following questions concerning the foreachPartition.
首先介绍一下上下文:据我了解,pySpark- UDFs
强制在Python实例中在Java虚拟机(JVM)外部执行Python代码,从而降低了性能.由于我需要对数据应用一些Python函数并希望最大程度地减少开销成本,因此我想到了至少将一堆可处理的数据加载到驱动程序中并将其作为Pandas-DataFrame进行处理的想法.无论如何,这将导致丧失Spark具有的并行性优势.然后我读到 foreachPartition
将一个函数应用于分区中的所有数据,因此允许并行处理.
First a little context: As far as I understood pySpark-UDFs
force the Python-code to be executed outside the Java Virtual Machine (JVM) in a Python-instance, making it performance-costing.
Since I need to apply some Python-functions to my data and want to minimize overhead costs, I had the idea to at least load a handable bunch of data into the driver and process it as Pandas-DataFrame. Anyhow, this would lead to a loss of the parallelism-advantage Spark has.
Then I read that foreachPartition
applies a function to all the data within a partition and, hence, allows parallel processing.
我现在的问题是:
-
当我通过
foreachPartition
应用Python函数时,Python执行是否在驱动程序进程中进行(因此,分区数据通过网络传输到了我的驱动程序)?
When I apply a Python-function via
foreachPartition
, does the Python-execution take place within the driver process (and the partition-data is therefore transfered over the network to my driver)?
是在 foreachPartition
中按行处理的数据(意味着每个RDD行都被逐一传输到Python实例),还是一次处理了分区数据(例如,整个分区被转移到实例并由一个Python实例整体处理)?
Is the data processed row-wise within foreachPartition
(meaning every RDD-row is transfered one by one to the Python-instance), or is the partition-data processed at once (meaning, for example, the whole partition is transfered to the instance and is handled as whole by one Python-instance)?
提前感谢您的输入!
我当前的驱动程序中"-在 forEachPartition
之前用于处理整个批次的解决方案如下:
My current "in driver" - solution I used before forEachPartition
in order to process entire batches looked like:
def partition_generator(rdd):
glom = rdd.glom()
#Optionally persist glom
for partition in range(rdd.getNumPartitions()):
yield glom.map(lambda row: row[partition]).collect()
有关此处发生情况的一些解释: glom
将所有分区的相应行分组到一个列表中.取自文档:
A little explanation about what happens here: glom
groups the respective rows of all partitions to a list. Taken from the docs:
glom(self):返回通过将每个分区内的所有元素合并到列表中而创建的RDD.
glom(self): Return an RDD created by coalescing all elements within each partition into a list.
因此, for
循环遍历可用分区的数量( getNumPartitions()
),并在每次迭代中在驱动程序内生成一个分区( glom.map(lambda row:row [partition]).collect()
).
So the for
-loop iterates over the number of available partitions (getNumPartitions()
) and in every iteration a partition is yielded within the driver (glom.map(lambda row: row[partition]).collect()
).
推荐答案
幸运的是,我偶然发现了Mrinal对 mapPartitions
的出色解释(回答
Luckily I stumbled upon this great explanation of mapPartitions
from Mrinal (answered here).
mapPartitions
在RDD的每个分区上应用一个函数.因此,如果分区分布在不同的节点上,则可以使用并行化.在这些节点上创建了处理Python函数所必需的相应Python实例.虽然 foreachPartition
仅应用函数(例如,将数据写入.csv文件),但 mapPartitions
也会返回新的RDD.因此,使用 foreachPartition
对我来说是错误的选择.
mapPartitions
applies a function on each partition of an RDD. Hence, parallelization can be used if the partitions are distributed over different nodes. The corresponding Python-instances, which are necessary for processing the Python-functions, are created on these nodes.
While foreachPartition
only applies a function (e.g. write your data in a .csv-file), mapPartitions
also returns a new RDD. Therefore, using foreachPartition
was the wrong choice for me.
为了回答我的第二个问题: map
或 UDF
之类的函数创建一个新的Python实例,并逐行从DataFrame/RDD传递数据,导致很多开销. foreachPartition
和 mapPartitions
(均为RDD函数)将整个分区转移到Python实例.
In order to answer my second question: Functions like map
or UDFs
create a new Python-instance and pass data from the DataFrame/RDD row-by-row, resulting in a lot of overhead. foreachPartition
and mapPartitions
(both RDD-functions) transfer an entire partition to a Python-instance.
此外,使用生成器还减少了迭代此传输的分区数据所需的内存量(分区作为迭代器对象处理,然后通过对该对象进行迭代来处理每一行).
Additionally, using generators also reduces the amount of memory necessary for iterating over this transferred partition data (partitions are handled as iterator objects, while each row is then processed by iterating over this object).
一个例子可能像这样:
def generator(partition):
"""
Function yielding some result created by some function applied to each row of a partition (in this case lower-casing a string)
@partition: iterator-object of partition
"""
for row in partition:
yield [word.lower() for word in row["text"]]
df = spark.createDataFrame([(["TESTA"], ), (["TESTB"], )], ["text"])
df = df.repartition(2)
df.rdd.mapPartitions(generator).toDF(["text"]).show()
#Result:
+-----+
| text|
+-----+
|testa|
|testb|
+-----+
希望这可以帮助面临类似问题的人:)
Hope this helps somebody facing similar problems :)
这篇关于pySpark forEachPartition-代码在哪里执行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!