使用pandas_udf和Parquet序列化时发生内存泄漏? [英] Memory leaks when using pandas_udf and Parquet serialization?

查看:98
本文介绍了使用pandas_udf和Parquet序列化时发生内存泄漏?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正在使用PySpark开发我的第一个整个系统,并且遇到了一些奇怪的,与内存相关的问题.在其中一个阶段中,我想类似于拆分-应用-合并"策略,以便修改DataFrame.也就是说,我想将一个函数应用于给定列定义的每个组,最后将它们全部合并.问题是,我要应用的函数是一种针对拟合模型的预测方法,该模型说出" Pandas惯用语,即将其矢量化并以Pandas系列作为输入.

I am currently developing my first whole system using PySpark and I am running into some strange, memory-related issues. In one of the stages, I would like to resemble a Split-Apply-Combine strategy in order to modify a DataFrame. That is, I would like to apply a function to each of the groups defined by a given column and finally combine them all. Problem is, the function I want to apply is a prediction method for a fitted model that "speaks" the Pandas idiom, i.e., it is vectorized and takes a Pandas Series as an input.

然后,我设计了一个迭代策略,遍历各个组并手动应用pandas_udf.Scalar来解决问题.组合部分使用对DataFrame.unionByName()的增量调用完成.我决定不使用pandas_udf的GroupedMap类型,因为文档指出该内存应由用户管理,并且只要其中一个组太大而无法保存在内存中或由一组表示,则应格外小心熊猫DataFrame.

I have then designed an iterative strategy, traversing the groups and manually applying a pandas_udf.Scalar in order to solve the problem. The combination part is done using incremental calls to DataFrame.unionByName(). I have decided not to use the GroupedMap type of pandas_udf because the docs state that the memory should be managed by the user, and you should have special care whenever one of the groups might be too large to keep it in memory or be represented by a Pandas DataFrame.

主要问题是所有处理似乎都运行良好,但最后我想将最终的DataFrame序列化为Parquet文件.在这一点上,我收到了许多有关DataFrameWriter的类似Java的错误或内存不足异常.

The main problem is that all the processing seems to run fine, but in the end I want to serialize the final DataFrame to a Parquet file. And it is at this point where I receive a lot of Java-like errors about DataFrameWriter, or out-of-memory exceptions.

我已经在Windows和Linux计算机上尝试了该代码.我设法避免错误的唯一方法是增加机器中的--driver-memory值.最小值在每个平台上都不同,并且取决于问题的大小,这使我怀疑内存泄漏.

I have tried the code in both Windows and Linux machines. The only way I have managed to avoid the errors has been to increase the --driver-memory value in the machines. The minimum value is different in every platform, and is dependent on the size of the problem, which somehow makes me suspect on memory leaks.

直到我开始使用pandas_udf时,问题才发生.我认为在使用pandas_udf进行的pyarrow序列化的整个过程中,可能在某处内存泄漏.

The problem did not happen until I started using pandas_udf. I think that there is probably a memory leak somewhere in the whole process of pyarrow serialization taking place under the hood when using a pandas_udf.

我创建了一个最小的可复制示例.如果我直接使用Python运行此脚本,则会产生错误.使用提交火花并增加大量驱动程序内存,有可能使其工作.

I have created a minimal reproducible example. If I run this script directly using Python, it produces the error. Using spark-submit and increasing a lot the driver memory, it is possible to make it work.

import pyspark
import pyspark.sql.functions as F
import pyspark.sql.types as spktyp


# Dummy pandas_udf -------------------------------------------------------------
@F.pandas_udf(spktyp.DoubleType())
def predict(x):
    return x + 100.0


# Initialization ---------------------------------------------------------------
spark = pyspark.sql.SparkSession.builder.appName(
        "mre").master("local[3]").getOrCreate()

sc = spark.sparkContext

# Generate a dataframe ---------------------------------------------------------
out_path = "out.parquet"

z = 105
m = 750000

schema = spktyp.StructType(
    [spktyp.StructField("ID", spktyp.DoubleType(), True)]
)

df = spark.createDataFrame(
    [(float(i),) for i in range(m)],
    schema
)

for j in range(z):
    df = df.withColumn(
        f"N{j}",
        F.col("ID") + float(j)
    )

df = df.withColumn(
    "X",
    F.array(
        F.lit("A"),
        F.lit("B"),
        F.lit("C"),
        F.lit("D"),
        F.lit("E")
    ).getItem(
        (F.rand()*3).cast("int")
    )
)

# Set the column names for grouping, input and output --------------------------
group_col = "X"
in_col = "N0"
out_col = "EP"

# Extract different group ids in grouping variable -----------------------------
rows = df.select(group_col).distinct().collect()
groups = [row[group_col] for row in rows]
print(f"Groups: {groups}")

# Split and treat the first id -------------------------------------------------
first, *others = groups

cur_df = df.filter(F.col(group_col) == first)
result = cur_df.withColumn(
    out_col,
    predict(in_col)
)

# Traverse the remaining group ids ---------------------------------------------
for i, other in enumerate(others):
    cur_df = df.filter(F.col(group_col) == other)
    new_df = cur_df.withColumn(
        out_col,
        predict(in_col)
    )

    # Incremental union --------------------------------------------------------
    result = result.unionByName(new_df)

# Save to disk -----------------------------------------------------------------
result.write.mode("overwrite").parquet(out_path)

令人震惊地(至少对我来说),如果我在序列化语句之前调用repartition(),问题似乎就消失了.

Shockingly (at least for me), the problem seems to vanish if I put a call to repartition() just before the serialization statement.

result = result.repartition(result.rdd.getNumPartitions())
result.write.mode("overwrite").parquet(out_path)

已将此行放置到位,我可以降低很多驱动程序内存配置,并且脚本可以正常运行.尽管我怀疑对代码的懒惰评估和pyarrow序列化可能是相关的,但我几乎无法理解所有这些因素之间的关系.

Having put this line into place, I can lower a lot the driver memory configuration, and the script runs fine. I can barely understand the relationship among all those factors, although I suspect lazy evaluation of the code and pyarrow serialization might be related.

这是我用于开发的当前环境:

This is the current environment I am using for development:

arrow-cpp                 0.13.0           py36hee3af98_1    conda-forge
asn1crypto                0.24.0                py36_1003    conda-forge
astroid                   2.2.5                    py36_0
atomicwrites              1.3.0                      py_0    conda-forge
attrs                     19.1.0                     py_0    conda-forge
blas                      1.0                         mkl
boost-cpp                 1.68.0            h6a4c333_1000    conda-forge
brotli                    1.0.7             he025d50_1000    conda-forge
ca-certificates           2019.3.9             hecc5488_0    conda-forge
certifi                   2019.3.9                 py36_0    conda-forge
cffi                      1.12.3           py36hb32ad35_0    conda-forge
chardet                   3.0.4                 py36_1003    conda-forge
colorama                  0.4.1                    py36_0
cryptography              2.6.1            py36hb32ad35_0    conda-forge
dill                      0.2.9                    py36_0
docopt                    0.6.2                    py36_0
entrypoints               0.3                      py36_0
falcon                    1.4.1.post1     py36hfa6e2cd_1000    conda-forge
fastavro                  0.21.21          py36hfa6e2cd_0    conda-forge
flake8                    3.7.7                    py36_0
future                    0.17.1                py36_1000    conda-forge
gflags                    2.2.2                ha925a31_0
glog                      0.3.5                h6538335_1
hug                       2.5.2            py36hfa6e2cd_0    conda-forge
icc_rt                    2019.0.0             h0cc432a_1
idna                      2.8                   py36_1000    conda-forge
intel-openmp              2019.3                      203
isort                     4.3.17                   py36_0
lazy-object-proxy         1.3.1            py36hfa6e2cd_2
libboost                  1.67.0               hd9e427e_4
libprotobuf               3.7.1                h1a1b453_0    conda-forge
lz4-c                     1.8.1.2              h2fa13f4_0
mccabe                    0.6.1                    py36_1
mkl                       2018.0.3                      1
mkl_fft                   1.0.6            py36hdbbee80_0
mkl_random                1.0.1            py36h77b88f5_1
more-itertools            4.3.0                 py36_1000    conda-forge
ninabrlong                0.1.0                     dev_0    <develop>
nose                      1.3.7                 py36_1002    conda-forge
nose-exclude              0.5.0                      py_0    conda-forge
numpy                     1.15.0           py36h9fa60d3_0
numpy-base                1.15.0           py36h4a99626_0
openssl                   1.1.1b               hfa6e2cd_2    conda-forge
pandas                    0.23.3           py36h830ac7b_0
parquet-cpp               1.5.1                         2    conda-forge
pip                       19.0.3                   py36_0
pluggy                    0.11.0                     py_0    conda-forge
progressbar2              3.38.0                     py_1    conda-forge
py                        1.8.0                      py_0    conda-forge
py4j                      0.10.7                   py36_0
pyarrow                   0.13.0           py36h8c67754_0    conda-forge
pycodestyle               2.5.0                    py36_0
pycparser                 2.19                     py36_1    conda-forge
pyflakes                  2.1.1                    py36_0
pygam                     0.8.0                      py_0    conda-forge
pylint                    2.3.1                    py36_0
pyopenssl                 19.0.0                   py36_0    conda-forge
pyreadline                2.1                      py36_1
pysocks                   1.6.8                 py36_1002    conda-forge
pyspark                   2.4.1                      py_0
pytest                    4.5.0                    py36_0    conda-forge
pytest-runner             4.4                        py_0    conda-forge
python                    3.6.6                hea74fb7_0
python-dateutil           2.8.0                    py36_0
python-hdfs               2.3.1                      py_0    conda-forge
python-mimeparse          1.6.0                      py_1    conda-forge
python-utils              2.3.0                      py_1    conda-forge
pytz                      2019.1                     py_0
re2                       2019.04.01       vc14h6538335_0  [vc14]  conda-forge
requests                  2.21.0                py36_1000    conda-forge
requests-kerberos         0.12.0                   py36_0
scikit-learn              0.20.1           py36hb854c30_0
scipy                     1.1.0            py36hc28095f_0
setuptools                41.0.0                   py36_0
six                       1.12.0                   py36_0
snappy                    1.1.7                h777316e_3
sqlite                    3.28.0               he774522_0
thrift-cpp                0.12.0            h59828bf_1002    conda-forge
typed-ast                 1.3.1            py36he774522_0
urllib3                   1.24.2                   py36_0    conda-forge
vc                        14.1                 h0510ff6_4
vs2015_runtime            14.15.26706          h3a45250_0
wcwidth                   0.1.7                      py_1    conda-forge
wheel                     0.33.1                   py36_0
win_inet_pton             1.1.0                    py36_0    conda-forge
wincertstore              0.2              py36h7fe50ca_0
winkerberos               0.7.0                    py36_1
wrapt                     1.11.1           py36he774522_0
xz                        5.2.4                h2fa13f4_4
zlib                      1.2.11               h62dcd97_3
zstd                      1.3.3                hfe6a214_0

任何提示或帮助将不胜感激.

Any hint or help would be much appreciated.

推荐答案

我想对您的帖子发表评论,但是我的声誉太低了.

I wanted to comment to your post, but my reputation is too low.

根据我的经验,udf会大大降低您的性能,尤其是如果您使用python(或pandas?)编写它们.有一篇文章,为什么您不应该使用python udfs而应使用scala udfs:

According to my experience udf slow down your performance drastically, especially if you write them in python (or pandas?). There is an article, why you shoudn't use python udfs and use scala udfs instead: https://medium.com/wbaa/using-scala-udfs-in-pyspark-b70033dd69b9

就我而言,即使很复杂也可以使用内置函数,并且运行时间比以前减少了5%.

In my case it was possible to use built-in functions, even it was pretty complicated, and the runtime decreased to about 5% compared to before.

对于您的OOM错误以及为什么分区适合您的原因,我没有任何解释.我唯一可以给您的建议是,尽可能避免使用UDF,尽管您的情况似乎并不那么容易.

For your OOM Error and why a repartition worked for you, I have no explanation. The only advice I can give you is to avoid UDFs as much as possible, although it seems to be not that easy in your case.

这篇关于使用pandas_udf和Parquet序列化时发生内存泄漏?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆