pyspark dataframe withColumn命令不起作用 [英] pyspark dataframe withColumn command not working

查看:372
本文介绍了pyspark dataframe withColumn命令不起作用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个输入数据框:df_input( 更新的df_input

I have a input dataframe: df_input (updated df_input)

|comment|inp_col|inp_val|
|11     |a      |a1     |
|12     |a      |a2     |
|15     |b      |b3     |
|16     |b      |b4     |
|17     |c      |&b     |
|17     |c      |c5     |
|17     |d      |&c     |
|17     |d      |d6     |
|17     |e      |&d     |
|17     |e      |e7     |

我想将inp_val列中的变量替换为其值。我尝试使用下面的代码创建一个新列。

I want to replace the variable in inp_val column to its value. I have tried with the below code to create a new column.

记录以'&'开头的值列表

Taken the list of values which starts with '&'

df_new = df_inp.select(inp_val).where(df.inp_val.substr(0, 1) == '&')

现在,我遍历该列表以将'&'列值数据替换为其原始列表。

Now I'm iterating over the list to replace the '&' column value data to it original list.

for a in [row[inp_val] for row in df_new.collect()]
   df_inp = df_inp.withColumn
                 (
                   'new_col', 
                   when(df.inp_val.substr(0, 1) == '&', 
                   [row[inp_val] for row in df.select(df.inp_val).where(df.inp_col == a[1:]).collect()])
                   .otherwise(df.inp_val)
                 )

但是,我出现以下错误:

But, I'm getting error as below:

Java.lang.RuntimeException: Unsupported literal tpe class java.util.ArrayList [[5], [6]]

基本上我希望输出如下。 请检查并让我知道错误出在哪里???
我在想根据以上代码尝试插入两种类型的数据类型值?

Basically I want the output as below. Please check and let me know where is the error???. I was thinking that two type of datatype values I'm trying to insert as per the above code??

更新的代码行

tst_1 = tst.withColumn("col3_extract", when(tst.col3.substr(0, 1) == '&', regexp_replace(tst.col3, "&", "")).otherwise(""))
# Select which values need to be replaced; withColumnRenamed will also solve spark self join issues
# The substring search can also be done using regex function
tst_filter=tst.where(~F.col('col3').contains('&')).withColumnRenamed('col2','col2_collect')
# For the selected data, perform a collect list
tst_clct = tst_filter.groupby('col2_collect').agg(F.collect_list('col3').alias('col3_collect'))
#%% Join the main table with the collected list
tst_join = tst_1.join(tst_clct,on=tst_1.col3_extract==tst_clct.col2_collect,how='left').drop('col2_collect')
#%% In the column3  replace the values such as a, b
tst_result = tst_join.withColumn("result",F.when(~F.col('col3').contains('&'),F.array(F.col('col3'))).otherwise(F.col('col3_collect')))

但是,上述代码不适用于多次迭代

更新后的预期输出

|comment|inp_col|inp_val|new_col                      |
|11     |a      |a1     |['a1']                       |
|12     |a      |a2     |['a2']                       |
|15     |b      |b3     |['b3']                       |
|16     |b      |b4     |['b4']                       |
|17     |c      |&b     |['b3', 'b4']                 |
|18     |c      |c5     |['c5']                       |
|19     |d      |&c     |['b3', 'b4', 'c5']           |
|20     |d      |d6     |['d6']                       |
|21     |e      |&d     |['b3', 'b4', 'c5', 'd6']     |
|22     |e      |e7     |['e7']                       |


推荐答案

尝试一下, 自我加入 ,并在 类似的加入中使用 收集的列表 条件 是必经之路。

Try this, self-join with collected list on rlike join condition is the way to go.

df.show() #sampledataframe

#+-------+---------+---------+
#|comment|input_col|input_val|
#+-------+---------+---------+
#|     11|        a|        1|
#|     12|        a|        2|
#|     15|        b|        5|
#|     16|        b|        6|
#|     17|        c|       &b|
#|     17|        c|        7|
#+-------+---------+---------+

df.join(df.groupBy("input_col").agg(F.collect_list("input_val").alias("y1"))\
          .withColumnRenamed("input_col","x1"),F.expr("""input_val rlike x1"""),'left')\
  .withColumn("new_col", F.when(F.col("input_val").cast("int").isNotNull(), F.array("input_val"))\
                    .otherwise(F.col("y1"))).drop("x1","y1").show()

#+-------+---------+---------+-------+
#|comment|input_col|input_val|new_col|
#+-------+---------+---------+-------+
#|     11|        a|        1|    [1]|
#|     12|        a|        2|    [2]|
#|     15|        b|        5|    [5]|
#|     16|        b|        6|    [6]|
#|     17|        c|       &b| [5, 6]|
#|     17|        c|        7|    [7]|
#+-------+---------+---------+-------+

这篇关于pyspark dataframe withColumn命令不起作用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆