Python 中的 Spark 数据帧 - 使用 UDF 时执行卡住 [英] Spark dataframe in Python - execution stuck when using UDFs

查看:38
本文介绍了Python 中的 Spark 数据帧 - 使用 UDF 时执行卡住的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个用 Python 编写的 Spark 作业,它使用 DataBricks CSV 阅读器从 CSV 文件中读取数据.

I have a spark job written in Python which is reading data from the CSV files using DataBricks CSV reader.

我想通过应用 udf 函数将一些列从字符串转换为双精度,该函数实际上也在更改浮点分隔符.

I want to convert some columns from string to double by applying an udf function which actually is also changing the floating point separator.

convert_udf = F.udf(
    lambda decimal_str: _to_float(decimal_separator, decimal_str), 
    returnType=FloatType())

for name in columns:
     df = df.withColumn(name, convert_udf(df[name]))

def _to_float(decimal_separator, decimal_str):
    if isinstance(decimal_str, str) or isinstance(decimal_str, unicode):
        return (None if len(decimal_str.strip()) == 0 
               else float(decimal_str.replace(decimal_separator, '.')))
    else:
        return decimal_str

调用 udf 函数时,Spark 作业卡住了.我试图从 _to_float 函数返回一个固定的 double 值,但没有成功.使用 SQL 上下文的 udf 和数据框之间似乎有问题.

The Spark job is getting stuck when the udf function is called. I tried to return a fixed double value from the _to_float function without success. It looks like there is something wrong between the udf and data frame using SQL context.

推荐答案

长话短说,除非必要,否则不要使用 Python UDF(以及一般的 UDF):

Long story short don't use Python UDFs (and UDFs in general) unless it is necessary:

  • 由于通过 Python 解释器进行全程往返,效率低下
  • 无法通过 Catalyst 优化
  • 如果反复使用会创建很长的谱系

对于像这样的简单操作,只需使用内置函数:

For simple operations like this one just use built-in functions:

from pyspark.sql.functions import regexp_replace

decimal_separator = ","
exprs = [
    regexp_replace(c, decimal_separator, ".").cast("float").alias(c) 
    if c in columns else c 
    for c in df.columns
]

df.select(*exprs)

这篇关于Python 中的 Spark 数据帧 - 使用 UDF 时执行卡住的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆