Spark pandas_udf并不快 [英] Spark pandas_udf is not faster

查看:554
本文介绍了Spark pandas_udf并不快的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正面临着繁重的数据转换.简而言之,我有几列数据,每列包含对应于一些序数的字符串.例如,HIGHMIDLOW.我的目标是将这些字符串映射为将保留顺序的整数.在这种情况下,LOW -> 0MID -> 1HIGH -> 2.

I'm facing a heavy data transformation. In a nutshell, I have columns of data, each containing strings which correspond to some ordinals. For example, HIGH, MID and LOW. My objective is to map these strings to integers which will preserve the order. In this case, LOW -> 0, MID -> 1 and HIGH -> 2.

这是一个生成此类数据的简单函数:

Here is a simple function generating such data:

def fresh_df(N=100000, seed=None):
    np.random.seed(seed)
    feat1 = np.random.choice(["HI", "LO", "MID"], size=N)
    feat2 = np.random.choice(["SMALL", "MEDIUM", "LARGE"], size=N)

    pdf = pd.DataFrame({
        "feat1": feat1,
        "feat2": feat2
    })
    return spark.createDataFrame(pdf)

我的第一个方法是:

feat1_dict = {"HI": 1, "MID": 2, "LO": 3}
feat2_dict = {"SMALL": 0, "MEDIUM": 1, "LARGE": 2}

mappings = {
    "feat1": F.create_map([F.lit(x) for x in chain(*feat1_dict.items())]),
    "feat2": F.create_map([F.lit(x) for x in chain(*feat2_dict.items())])
}

for col in df.columns:
    col_map = mappings[col]
    df = df.withColumn(col+"_mapped", col_map[df[col]])

这可以按预期工作,但实际上它变慢了,我想优化该过程.我读到有关 pandas_udf 这给了我希望.这是修改后的代码:

This works as expected but in reality it turns to be slow and I wanted to optimize the process. I read about pandas_udf and it gave me hope. Here is the modified code:

feats_dict = {
    "feat1": feat1_dict,
    "feat2": feat2_dict
}

for col_name in df.columns:
    @F.pandas_udf('integer', F.PandasUDFType.SCALAR)
    def map_map(col):
        return col.map(feats_dict[col_name])
    df = df.withColumn(col_name + "_mapped", map_map(df[col_name]))

A!比较这两个版本时,执行时间没有任何改善.我在Spark的本地实例(使用docker)和5个节点的EMR群集(具有默认配置)上进行了比较.

Alas! When comparing these two versions there was no improvement in terms of execution time. I compared the two on a local instance of Spark (using docker) and on a 5 nodes EMR cluster (with the default configurations).

我创建了一个笔记本,您可以在其中查看所有代码.通常,我使用以下导入:

I created a notebook where you can see all the code. In general, I used the following imports:

import numpy as np
import pandas as pd

from itertools import chain
from pyspark.sql import functions as F

我想念什么?为什么此过程如此缓慢,为什么使用pandas_udf却没有任何改善?

What am I missing? Why is this process so slow and why there's no improvement when using pandas_udf?

推荐答案

为什么这么慢?因为Spark在JVM中运行,而pyspark不在(因为它是python进程)运行,并且要使其成为可能,所以该进程需要将所有序列化和反序列化的数据移动到JVM.

Why so slow? Because the Spark runs in JVM and pyspark doesn't (because its a python process) and to make it the process possible needs to move all data serializing and deserializing to JVM.

您可以使用whenotherwise函数映射值,并避免序列化和反序列化过程,从而提高性能.

You can map the values with when and otherwise function and avoid the serialize and deserialize process, increasing the performance.

import numpy as np
import pandas as pd
import pyspark.sql.functions as f
from pyspark.shell import spark


def fresh_df(n=100000, seed=None):
    np.random.seed(seed)
    feat1 = np.random.choice(["HI", "LO", "MID"], size=n)
    feat2 = np.random.choice(["SMALL", "MEDIUM", "LARGE"], size=n)

    pdf = pd.DataFrame({
        "feat1": feat1,
        "feat2": feat2
    })
    return spark.createDataFrame(pdf)


df = fresh_df()
df = df.withColumn('feat1_mapped', f
                   .when(df.feat1 == f.lit('HI'), 1)
                   .otherwise(f.when(df.feat1 == f.lit('MID'), 2).otherwise(3)))

df = df.withColumn('feat2_mapped', f
                   .when(df.feat2 == f.lit('SMALL'), 0)
                   .otherwise(f.when(df.feat2 == f.lit('MEDIUM'), 1).otherwise(2)))
df.show(n=20)

输出

+-----+------+------------+------------+
|feat1| feat2|feat1_mapped|feat2_mapped|
+-----+------+------------+------------+
|   LO| SMALL|           3|           0|
|   LO|MEDIUM|           3|           1|
|  MID|MEDIUM|           2|           1|
|  MID| SMALL|           2|           0|
|  MID| LARGE|           2|           2|
|  MID| SMALL|           2|           0|
|   LO| SMALL|           3|           0|
|  MID| LARGE|           2|           2|
|  MID| LARGE|           2|           2|
|  MID| SMALL|           2|           0|
|  MID|MEDIUM|           2|           1|
|   LO| LARGE|           3|           2|
|   HI|MEDIUM|           1|           1|
|   LO| SMALL|           3|           0|
|   HI|MEDIUM|           1|           1|
|  MID| SMALL|           2|           0|
|  MID|MEDIUM|           2|           1|
|   HI| SMALL|           1|           0|
|   HI| LARGE|           1|           2|
|  MID| LARGE|           2|           2|
+-----+------+------------+------------+

这篇关于Spark pandas_udf并不快的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆