pySpark withColumn与功能 [英] pySpark withColumn with a function

查看:102
本文介绍了pySpark withColumn与功能的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含2列的数据框: account_id email_address ,现在我想再添加一列 updated_email_address ,我称之为一些函数在 email_address 上获取 updated_email_address .这是我的代码:

  def update_email(电子邮件):打印("==要更新的电子邮件:" +电子邮件)今天= datetime.date.today()已更新=子字符串(电子邮件,-8、8)+ str(today.strftime('%m'))+ str(today.strftime('%d'))+"_updated"返回更新df.withColumn('updated_email_address',update_email(df.email_address)) 

,但结果显示 updated_email_address 列为空:

  + --------------- + -------------- ++ --------------------- +|帐户ID |电子邮件地址|更新后的电子邮件地址|+ --------------- + -------------- + -------------------+| 123456gd7tuhha |abc@test.com |空|| djasevneuagsj1 |cde@test.com |空|+ --------------- + -------------- + --------------- + 

在打印出来的函数 updated_email 内:

  Column< b'(电子邮件地址+ ==要被提纯的电子邮件:)'> 

还显示了df的列数据类型为:

 <代码> dfData:pyspark.sql.dataframe.DataFrameaccount_id:字符串email_address:字符串Updated_email_address:双 

为什么 updated_email_address 列的类型为double?

解决方案

您正在调用 Column 类型的Python函数.您必须从 update_email 创建udf,然后使用它:

  update_email_udf = udf(update_email) 

但是,我建议您不要使用UDF进行此类转换,您可以仅使用Spark内置函数(UDF的性能差而闻名):

  df.withColumn('updated_email_address',concat(substring(col(电子邮件地址"),-8,8),date_format(current_date(),"ddMM"),lit("_ updated"))).展示() 

您可以在此处找到所有Spark SQL内置功能.<​​/p>

I have a dataframe which has 2 columns: account_id and email_address, now I want to add one more column updated_email_address which i call some function on email_address to get the updated_email_address. here is my code:

def update_email(email):
  print("== email to be updated: " + email)
  today = datetime.date.today()
  updated = substring(email, -8, 8) + str(today.strftime('%m')) + str(today.strftime('%d')) + "_updated"
  return updated

df.withColumn('updated_email_address', update_email(df.email_address))

but the result showed updated_email_address column as null:

+---------------+--------------+---------------------+
|account_id     |email_address |updated_email_address|
+---------------+--------------+---------------------+
|123456gd7tuhha |abc@test.com  |null           |
|djasevneuagsj1 |cde@test.com  |null           |
+---------------+--------------+---------------+

inside the function updated_email it printed out:

Column<b'(email_address + == email to be udpated: )'>

also it showed the df's column data type as:

dfData:pyspark.sql.dataframe.DataFrame
account_id:string
email_address:string
updated_email_address:double

why is updated_email_address column type of double?

解决方案

You're calling a Python function with Column type. You have to create udf from update_email and then use it:

update_email_udf = udf(update_email)

However, I'd suggest you to not use UDF fot such transformation, you could do it using only Spark built-in functions (UDFs are known for bad performance) :

df.withColumn('updated_email_address',
              concat(substring(col("email_address"), -8, 8), date_format(current_date(), "ddMM"), lit("_updated"))
             ).show()

You can find here all Spark SQL built-in functions.

这篇关于pySpark withColumn与功能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆