UnsupportedOperationException:无法评估表达式:..在使用column()和udf()添加新列时 [英] UnsupportedOperationException: Cannot evalute expression: .. when adding new column withColumn() and udf()

查看:39
本文介绍了UnsupportedOperationException:无法评估表达式:..在使用column()和udf()添加新列时的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

所以我想做的就是简单地转换字段:将年,月,日,小时,分钟(如下所示为整数类型)转换为字符串类型.

So what I am trying to do is simply to convert fields: year, month, day, hour, minute (which are of type integer as seen below) into a string type.

所以我有一个数据框df_src类型:

So I have a dataframe df_src of type :

<class 'pyspark.sql.dataframe.DataFrame'>

,这是它的架构:

root
 |-- src_ip: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)

我也早先声明了一个函数:

I also declared a function earlier :

def parse_df_to_string(year, month, day, hour=0, minute=0):
second = 0
return "{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}".format(year, month, day, hour, minute, second)

我也做了一个测试,它就像一个魅力:

And I also did a test and it works like a charm :

print parse_df_to_string(2016, 10, 15, 21)
print type(parse_df_to_string(2016, 10, 15, 21))

2016-10-15 21:00:00
<type 'str'>

所以我也使用udf在spark api中做了类似的事情:

so I also did something similar as in spark api with udf :

from pyspark.sql.functions import udf
u_parse_df_to_string = udf(parse_df_to_string)

最后这个请求:

df_src.select('*', 
              u_parse_df_to_string(df_src['year'], df_src['month'], df_src['day'], df_src['hour'], df_src['minute'])
             ).show()

会导致:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-126-770b587e10e6> in <module>()
     25 # Could not make this part wor..
     26 df_src.select('*',
---> 27         u_parse_df_to_string(df_src['year'], df_src['month'], df_src['day'], df_src['hour'], df_src['minute'])
     28              ).show()

/opt/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in show(self, n, truncate)
    285         +---+-----+
    286         """
--> 287         print(self._jdf.showString(n, truncate))
    288 
    289     def __repr__(self):

/opt/spark-2.0.0-bin-hadoop2.7/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    931         answer = self.gateway_client.send_command(command)
    932         return_value = get_return_value(
--> 933             answer, self.gateway_client, self.target_id, self.name)
    934 
    935         for temp_arg in temp_args:

/opt/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()
    ...


    Py4JJavaError: An error occurred while calling o5074.showString.
: java.lang.UnsupportedOperationException: Cannot evaluate expression: parse_df_to_string(input[1, int, true], input[2, int, true], input[3, int, true], input[4, int, true], input[5, int, true])
    at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:224)
    at org.apache.spark.sql.execution.python.PythonUDF.doGenCode(PythonUDF.scala:27)
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:101)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:101)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$generateExpressions$1.apply(CodeGenerator.scala:740)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$generateExpressions$1.apply(CodeGenerator.scala:740)

...

我尝试了很多事情,我尝试仅使用一个参数和参数来调用该方法...但是没有帮助.

I tried many things, I tried to call the method with only one parameter&argument...but did not help.

它确实起作用的一种方法是通过创建一个具有新列的新数据框,如下所示:

One way it did work though is by creating a new dataframe with a new column as follow :

df_src_grp_hr_d = df_src.select('*', concat(
    col("year"), 
    lit("-"), 
    col("month"), 
    lit("-"), 
    col("day"),
    lit(" "),
    col("hour"),
    lit(":0")).alias('time'))`

之后,我可以将该列转换为时间戳记:

where after that I could cast the column to timestamp :

df_src_grp_hr_to_timestamp = df_src_grp_hr_d.select(
df_src_grp_hr_d['src_ip'], 
df_src_grp_hr_d['year'],
df_src_grp_hr_d['month'],
df_src_grp_hr_d['day'],
df_src_grp_hr_d['hour'],
df_src_grp_hr_d['time'].cast('timestamp'))

推荐答案

好吧..我想我理解问题了...原因是因为我的dataFrame刚加载了很多数据,导致 show()操作失败.

allright..I think I understand the problem...The cause is because my dataFrame just had a lot of data loaded in memory causing show() action to fail.

我认识到的是导致异常的原因:

The way I realize it is that what is causing the exception :

Py4JJavaError: An error occurred while calling o2108.showString.
: java.lang.UnsupportedOperationException: Cannot evaluate expression: 

确实是 df.show()动作.

我可以通过执行以下代码段来确认这一点:将pyspark字符串转换为日期格式

I could confirm that by executing the code snippet from : Convert pyspark string to date format

from datetime import datetime
from pyspark.sql.functions import col,udf, unix_timestamp
from pyspark.sql.types import DateType



# Creation of a dummy dataframe:
df1 = sqlContext.createDataFrame([("11/25/1991","11/24/1991","11/30/1991"), 
                            ("11/25/1391","11/24/1992","11/30/1992")], schema=['first', 'second', 'third'])

# Setting an user define function:
# This function converts the string cell into a date:
func =  udf (lambda x: datetime.strptime(x, '%M/%d/%Y'), DateType())

df = df1.withColumn('test', func(col('first')))

df.show()

df.printSchema()

有效!但它仍然无法与我的dataFrame df_src 一起使用.

which worked! But it still did not work with my dataFrame df_src.

原因是因为我正在从数据库服务器中加载大量数据(例如超过8-9百万行),所以当 .show()(默认情况下显示20个条目)加载到dataFrame中的结果.

The cause is because I am loading a lot a lot of data in memory from my database server (like over 8-9 millions of rows) it seems that spark is unable to perform the execution within udf when .show() (which displays 20 entries by default) of the results loaded in a dataFrame.

即使调用show(n = 1),也会引发相同的异常.

Even if show(n=1) is called, same exception would be thrown.

但是,如果调用printSchema(),则会看到新列已被有效添加.

But if printSchema() is called, you will see that the new column is effectively added.

查看是否添加新列的一种方法是简单地调用操作 print dataFrame.take(10).

One way to see if the new column is added it would be simply to call the action print dataFrame.take(10) instead.

最后,一种使它起作用的方法是影响一个新的数据框,而在select()中以如下方式调用udf时不调用 .show():

Finally, one way to make it work is to affect a new dataframe and not call .show() when calling udf in a select() as :

df_to_string = df_src.select('*', 
          u_parse_df_to_string(df_src['year'], df_src['month'], df_src['day'], df_src['hour'], df_src['minute'])
         )

然后将其缓存:

df_to_string.cache

现在可以毫无问题地调用 .show()了:

Now .show() can be called with no issues :

df_to_string.show()

这篇关于UnsupportedOperationException:无法评估表达式:..在使用column()和udf()添加新列时的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆