pyspark dataframe withColumn命令不起作用 [英] pyspark dataframe withColumn command not working
问题描述
我有一个输入数据框:df_input( 更新的df_input )
I have a input dataframe: df_input (updated df_input)
|comment|inp_col|inp_val|
|11 |a |a1 |
|12 |a |a2 |
|15 |b |b3 |
|16 |b |b4 |
|17 |c |&b |
|17 |c |c5 |
|17 |d |&c |
|17 |d |d6 |
|17 |e |&d |
|17 |e |e7 |
我想将inp_val列中的变量替换为其值。我尝试使用下面的代码创建一个新列。
I want to replace the variable in inp_val column to its value. I have tried with the below code to create a new column.
记录以'&'开头的值列表
Taken the list of values which starts with '&'
df_new = df_inp.select(inp_val).where(df.inp_val.substr(0, 1) == '&')
现在,我遍历该列表以将'&'列值数据替换为其原始列表。
Now I'm iterating over the list to replace the '&' column value data to it original list.
for a in [row[inp_val] for row in df_new.collect()]
df_inp = df_inp.withColumn
(
'new_col',
when(df.inp_val.substr(0, 1) == '&',
[row[inp_val] for row in df.select(df.inp_val).where(df.inp_col == a[1:]).collect()])
.otherwise(df.inp_val)
)
但是,我出现以下错误:
But, I'm getting error as below:
Java.lang.RuntimeException: Unsupported literal tpe class java.util.ArrayList [[5], [6]]
基本上我希望输出如下。 请检查并让我知道错误出在哪里??? 。
我在想根据以上代码尝试插入两种类型的数据类型值?
Basically I want the output as below. Please check and let me know where is the error???. I was thinking that two type of datatype values I'm trying to insert as per the above code??
更新的代码行 :
tst_1 = tst.withColumn("col3_extract", when(tst.col3.substr(0, 1) == '&', regexp_replace(tst.col3, "&", "")).otherwise(""))
# Select which values need to be replaced; withColumnRenamed will also solve spark self join issues
# The substring search can also be done using regex function
tst_filter=tst.where(~F.col('col3').contains('&')).withColumnRenamed('col2','col2_collect')
# For the selected data, perform a collect list
tst_clct = tst_filter.groupby('col2_collect').agg(F.collect_list('col3').alias('col3_collect'))
#%% Join the main table with the collected list
tst_join = tst_1.join(tst_clct,on=tst_1.col3_extract==tst_clct.col2_collect,how='left').drop('col2_collect')
#%% In the column3 replace the values such as a, b
tst_result = tst_join.withColumn("result",F.when(~F.col('col3').contains('&'),F.array(F.col('col3'))).otherwise(F.col('col3_collect')))
但是,上述代码不适用于多次迭代
更新后的预期输出 :
|comment|inp_col|inp_val|new_col |
|11 |a |a1 |['a1'] |
|12 |a |a2 |['a2'] |
|15 |b |b3 |['b3'] |
|16 |b |b4 |['b4'] |
|17 |c |&b |['b3', 'b4'] |
|18 |c |c5 |['c5'] |
|19 |d |&c |['b3', 'b4', 'c5'] |
|20 |d |d6 |['d6'] |
|21 |e |&d |['b3', 'b4', 'c5', 'd6'] |
|22 |e |e7 |['e7'] |
推荐答案
尝试一下, 自我加入
,并在 类似的加入中使用
是必经之路。收集的列表
条件
Try this, self-join
with collected list
on rlike join condition
is the way to go.
df.show() #sampledataframe
#+-------+---------+---------+
#|comment|input_col|input_val|
#+-------+---------+---------+
#| 11| a| 1|
#| 12| a| 2|
#| 15| b| 5|
#| 16| b| 6|
#| 17| c| &b|
#| 17| c| 7|
#+-------+---------+---------+
df.join(df.groupBy("input_col").agg(F.collect_list("input_val").alias("y1"))\
.withColumnRenamed("input_col","x1"),F.expr("""input_val rlike x1"""),'left')\
.withColumn("new_col", F.when(F.col("input_val").cast("int").isNotNull(), F.array("input_val"))\
.otherwise(F.col("y1"))).drop("x1","y1").show()
#+-------+---------+---------+-------+
#|comment|input_col|input_val|new_col|
#+-------+---------+---------+-------+
#| 11| a| 1| [1]|
#| 12| a| 2| [2]|
#| 15| b| 5| [5]|
#| 16| b| 6| [6]|
#| 17| c| &b| [5, 6]|
#| 17| c| 7| [7]|
#+-------+---------+---------+-------+
这篇关于pyspark dataframe withColumn命令不起作用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!