用新值更新数据框列 [英] update a dataframe column with new values
问题描述
df1具有字段id
和json
; df2具有字段id
和json
df1 has fields id
and json
; df2 has fields id
and json
df1.count()
=> 1200; df2.count()
=> 20
df1.count()
=> 1200; df2.count()
=> 20
df1具有所有行. df2的增量更新只有20行.
df1 has all the rows. df2 has an incremental update with just 20 rows.
我的目标是使用df2
中的值更新df1. df2
的所有ID在df1中.但是df2具有针对这些相同ID的更新值(在json
字段中).
My goal is to update df1 with the values from df2
. All the ids of df2
are in df1. But df2 has updated values(in the json
field) for those same ids.
结果df应该具有来自df1
的所有值和具有来自df2
的更新值.
Resulting df should have all the values from df1
and updated values from df2
.
做到这一点的最佳方法是什么? -联接和过滤器的数量最少.
What is the best way to do this? - With the least number of joins and filters.
谢谢!
推荐答案
您可以使用一个左连接来实现这一点.
You can achieve this using one left join.
创建示例数据框
使用@Shankar Koirala在他的答案中提供的示例数据.
Using the sample data provided by @Shankar Koirala in his answer.
data1 = [
(1, "a"),
(2, "b"),
(3, "c")
]
df1 = sqlCtx.createDataFrame(data1, ["id", "value"])
data2 = [
(1, "x"),
(2, "y")
]
df2 = sqlCtx.createDataFrame(data2, ["id", "value"])
进行左联接
使用id
列上的左联接来联接两个DataFrame.这会将所有行保留在左侧的DataFrame中.对于右侧DataFrame中没有匹配的id
的行,其值将为null
.
Join the two DataFrames using a left join on the id
column. This will keep all of the rows in the left DataFrame. For the rows in the right DataFrame that don't have a matching id
, the value will be null
.
import pyspark.sql.functions as f
df1.alias('l').join(df2.alias('r'), on='id', how='left')\
.select(
'id',
f.col('l.value').alias('left_value'),
f.col('r.value').alias('right_value')
)\
.show()
#+---+----------+-----------+
#| id|left_value|right_value|
#+---+----------+-----------+
#| 1| a| x|
#| 3| c| null|
#| 2| b| y|
#+---+----------+-----------+
选择所需数据
我们将使用不匹配的id
具有null
的事实来选择最后一列.使用 pyspark.sql.functions.when()
使用正确的值(如果它不为null),否则请保留左侧的值.
We will use the fact that the unmatched id
s have a null
to select the final columns. Use pyspark.sql.functions.when()
to use the right value if it is not null, otherwise keep the left value.
df1.alias('l').join(df2.alias('r'), on='id', how='left')\
.select(
'id',
f.when(
~f.isnull(f.col('r.value')),
f.col('r.value')
).otherwise(f.col('l.value')).alias('value')
)\
.show()
#+---+-----+
#| id|value|
#+---+-----+
#| 1| x|
#| 3| c|
#| 2| y|
#+---+-----+
如果需要按顺序排列id
,则可以对该输出进行排序.
You can sort this output if you want the id
s in order.
使用pyspark-sql
您可以使用pyspark-sql查询执行相同的操作:
You can do the same thing using a pyspark-sql query:
df1.registerTempTable('df1')
df2.registerTempTable('df2')
query = """SELECT l.id,
CASE WHEN r.value IS NOT NULL THEN r.value ELSE l.value END AS value
FROM df1 l LEFT JOIN df2 r ON l.id = r.id"""
sqlCtx.sql(query.replace("\n", "")).show()
#+---+-----+
#| id|value|
#+---+-----+
#| 1| x|
#| 3| c|
#| 2| y|
#+---+-----+
这篇关于用新值更新数据框列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!