Spark pandas_udf并不快 [英] Spark pandas_udf is not faster
问题描述
我正面临着繁重的数据转换.简而言之,我有几列数据,每列包含对应于一些序数的字符串.例如,HIGH
,MID
和LOW
.我的目标是将这些字符串映射为将保留顺序的整数.在这种情况下,LOW -> 0
,MID -> 1
和HIGH -> 2
.
I'm facing a heavy data transformation. In a nutshell, I have columns of data, each containing strings which correspond to some ordinals. For example, HIGH
, MID
and LOW
. My objective is to map these strings to integers which will preserve the order. In this case, LOW -> 0
, MID -> 1
and HIGH -> 2
.
这是一个生成此类数据的简单函数:
Here is a simple function generating such data:
def fresh_df(N=100000, seed=None):
np.random.seed(seed)
feat1 = np.random.choice(["HI", "LO", "MID"], size=N)
feat2 = np.random.choice(["SMALL", "MEDIUM", "LARGE"], size=N)
pdf = pd.DataFrame({
"feat1": feat1,
"feat2": feat2
})
return spark.createDataFrame(pdf)
我的第一个方法是:
feat1_dict = {"HI": 1, "MID": 2, "LO": 3}
feat2_dict = {"SMALL": 0, "MEDIUM": 1, "LARGE": 2}
mappings = {
"feat1": F.create_map([F.lit(x) for x in chain(*feat1_dict.items())]),
"feat2": F.create_map([F.lit(x) for x in chain(*feat2_dict.items())])
}
for col in df.columns:
col_map = mappings[col]
df = df.withColumn(col+"_mapped", col_map[df[col]])
这可以按预期工作,但实际上它变慢了,我想优化该过程.我读到有关 pandas_udf
这给了我希望.这是修改后的代码:
This works as expected but in reality it turns to be slow and I wanted to optimize the process. I read about pandas_udf
and it gave me hope. Here is the modified code:
feats_dict = {
"feat1": feat1_dict,
"feat2": feat2_dict
}
for col_name in df.columns:
@F.pandas_udf('integer', F.PandasUDFType.SCALAR)
def map_map(col):
return col.map(feats_dict[col_name])
df = df.withColumn(col_name + "_mapped", map_map(df[col_name]))
A!比较这两个版本时,执行时间没有任何改善.我在Spark的本地实例(使用docker)和5个节点的EMR群集(具有默认配置)上进行了比较.
Alas! When comparing these two versions there was no improvement in terms of execution time. I compared the two on a local instance of Spark (using docker) and on a 5 nodes EMR cluster (with the default configurations).
我创建了一个笔记本,您可以在其中查看所有代码.通常,我使用以下导入:
I created a notebook where you can see all the code. In general, I used the following imports:
import numpy as np
import pandas as pd
from itertools import chain
from pyspark.sql import functions as F
我想念什么?为什么此过程如此缓慢,为什么使用pandas_udf
却没有任何改善?
What am I missing? Why is this process so slow and why there's no improvement when using pandas_udf
?
推荐答案
为什么这么慢?因为Spark在JVM中运行,而pyspark
不在(因为它是python进程)运行,并且要使其成为可能,所以该进程需要将所有序列化和反序列化的数据移动到JVM.
Why so slow? Because the Spark runs in JVM and pyspark
doesn't (because its a python process) and to make it the process possible needs to move all data serializing and deserializing to JVM.
您可以使用when
和otherwise
函数映射值,并避免序列化和反序列化过程,从而提高性能.
You can map the values with when
and otherwise
function and avoid the serialize and deserialize process, increasing the performance.
import numpy as np
import pandas as pd
import pyspark.sql.functions as f
from pyspark.shell import spark
def fresh_df(n=100000, seed=None):
np.random.seed(seed)
feat1 = np.random.choice(["HI", "LO", "MID"], size=n)
feat2 = np.random.choice(["SMALL", "MEDIUM", "LARGE"], size=n)
pdf = pd.DataFrame({
"feat1": feat1,
"feat2": feat2
})
return spark.createDataFrame(pdf)
df = fresh_df()
df = df.withColumn('feat1_mapped', f
.when(df.feat1 == f.lit('HI'), 1)
.otherwise(f.when(df.feat1 == f.lit('MID'), 2).otherwise(3)))
df = df.withColumn('feat2_mapped', f
.when(df.feat2 == f.lit('SMALL'), 0)
.otherwise(f.when(df.feat2 == f.lit('MEDIUM'), 1).otherwise(2)))
df.show(n=20)
输出
+-----+------+------------+------------+
|feat1| feat2|feat1_mapped|feat2_mapped|
+-----+------+------------+------------+
| LO| SMALL| 3| 0|
| LO|MEDIUM| 3| 1|
| MID|MEDIUM| 2| 1|
| MID| SMALL| 2| 0|
| MID| LARGE| 2| 2|
| MID| SMALL| 2| 0|
| LO| SMALL| 3| 0|
| MID| LARGE| 2| 2|
| MID| LARGE| 2| 2|
| MID| SMALL| 2| 0|
| MID|MEDIUM| 2| 1|
| LO| LARGE| 3| 2|
| HI|MEDIUM| 1| 1|
| LO| SMALL| 3| 0|
| HI|MEDIUM| 1| 1|
| MID| SMALL| 2| 0|
| MID|MEDIUM| 2| 1|
| HI| SMALL| 1| 0|
| HI| LARGE| 1| 2|
| MID| LARGE| 2| 2|
+-----+------+------------+------------+
这篇关于Spark pandas_udf并不快的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!