Python中的Spark数据框-使用UDF时执行停滞 [英] Spark dataframe in Python - execution stuck when using UDFs
问题描述
我有一个用Python编写的Spark作业,该作业正在使用DataBricks CSV阅读器从CSV文件读取数据.
I have a spark job written in Python which is reading data from the CSV files using DataBricks CSV reader.
我想通过应用udf函数将某些列从字符串转换为双精度,而udf函数实际上也在更改浮点分隔符.
I want to convert some columns from string to double by applying an udf function which actually is also changing the floating point separator.
convert_udf = F.udf(
lambda decimal_str: _to_float(decimal_separator, decimal_str),
returnType=FloatType())
for name in columns:
df = df.withColumn(name, convert_udf(df[name]))
def _to_float(decimal_separator, decimal_str):
if isinstance(decimal_str, str) or isinstance(decimal_str, unicode):
return (None if len(decimal_str.strip()) == 0
else float(decimal_str.replace(decimal_separator, '.')))
else:
return decimal_str
调用udf函数时,Spark作业被卡住.我试图从_to_float函数返回固定的double值,但没有成功.使用SQL上下文的udf和数据帧之间似乎有问题.
The Spark job is getting stuck when the udf function is called. I tried to return a fixed double value from the _to_float function without success. It looks like there is something wrong between the udf and data frame using SQL context.
推荐答案
长话短说,除非有必要,否则请不要使用Python UDF(通常是UDF):
Long story short don't use Python UDFs (and UDFs in general) unless it is necessary:
- 由于通过Python解释器的完整往返,因此效率低下
- 无法通过Catalyst优化 如果反复使用,
- 会创建长血统
- it is inefficient due to full round-trip through Python interpreter
- cannot be optimized by Catalyst
- creates long lineages if used iteratively
对于像这样的简单操作,只需使用内置函数即可:
For simple operations like this one just use built-in functions:
from pyspark.sql.functions import regexp_replace
decimal_separator = ","
exprs = [
regexp_replace(c, decimal_separator, ".").cast("float").alias(c)
if c in columns else c
for c in df.columns
]
df.select(*exprs)
这篇关于Python中的Spark数据框-使用UDF时执行停滞的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!