如何连接两个DataFrame并更改缺少值的列? [英] How to join two DataFrames and change column for missing values?
问题描述
val df1 = sc.parallelize(Seq(
("a1",10,"ACTIVE","ds1"),
("a1",20,"ACTIVE","ds1"),
("a2",50,"ACTIVE","ds1"),
("a3",60,"ACTIVE","ds1"))
).toDF("c1","c2","c3","c4")`
val df2 = sc.parallelize(Seq(
("a1",10,"ACTIVE","ds2"),
("a1",20,"ACTIVE","ds2"),
("a1",30,"ACTIVE","ds2"),
("a1",40,"ACTIVE","ds2"),
("a4",20,"ACTIVE","ds2"))
).toDF("c1","c2","c3","c5")`
df1.show()
// +---+---+------+---+
// | c1| c2| c3| c4|
// +---+---+------+---+
// | a1| 10|ACTIVE|ds1|
// | a1| 20|ACTIVE|ds1|
// | a2| 50|ACTIVE|ds1|
// | a3| 60|ACTIVE|ds1|
// +---+---+------+---+
df2.show()
// +---+---+------+---+
// | c1| c2| c3| c5|
// +---+---+------+---+
// | a1| 10|ACTIVE|ds2|
// | a1| 20|ACTIVE|ds2|
// | a1| 30|ACTIVE|ds2|
// | a1| 40|ACTIVE|ds2|
// | a4| 20|ACTIVE|ds2|
// +---+---+------+---+
我的要求是:我需要同时加入两个数据框. 我的输出数据帧应具有来自df1的所有记录以及来自df2的记录,这些记录不在df1中,仅用于匹配"c1".而且我从df2中提取的记录应该在"c3"列更新为无效".
My requirement is: I need to Join both dataframes. My output dataframe should be having all the records from df1 and also the records from df2 which are not in df1 for the matching "c1" only. And the records which I pull from df2 should be updated to Inactive at column "c3".
在此示例中,只有"c1"的匹配值是a1.因此,我需要从df2中提取c2 = 30和40条记录,并使它们变为非活动状态.
In this example only matching value of "c1" is a1. So I need to pull c2=30 and 40 records from df2 and make them INACTIVE.
这是输出.
df_output.show()
// +---+---+--------+---+
// | c1| c2| c3 | c4|
// +---+---+--------+---+
// | a1| 10|ACTIVE |ds1|
// | a1| 20|ACTIVE |ds1|
// | a2| 50|ACTIVE |ds1|
// | a3| 60|ACTIVE |ds1|
// | a1| 30|INACTIVE|ds1|
// | a1| 40|INACTIVE|ds1|
// +---+---+--------+---+
任何人都可以帮助我做到这一点.
Can any one help me to do this.
推荐答案
首先,一件小事.我为df2
中的列使用了不同的名称:
First, a small thing. I use different names for the columns in df2
:
val df2 = sc.parallelize(...).toDF("d1","d2","d3","d4")
没什么大不了的,但这使我更容易推理.
No big deal, but this made things easier for me to reason about.
现在提供有趣的东西.为了清楚起见,我将有点冗长:
Now for the fun stuff. I am going to be a bit verbose for the sake of clarity:
val join = df1
.join(df2, df1("c1") === df2("d1"), "inner")
.select($"d1", $"d2", $"d3", lit("ds1").as("d4"))
.dropDuplicates
我在这里执行以下操作:
Here I do the following:
-
c1
和d1
列上df1
和df2
之间的内部联接 - 选择
df2
列,并在最后一列中简单地硬编码"ds1
来替换ds2
- 删除重复项
- Inner join between
df1
anddf2
on thec1
andd1
columns - Select the
df2
columns and simply "hardcode"ds1
in the last column to replaceds2
- Drop duplicates
这基本上只是过滤掉df2
中所有没有在df1
中的c1
中具有相应键的内容.
This basically just filters out everything in df2
that does not have a corresponding key in c1
in df1
.
下一个我比较:
val diff = join
.except(df1)
.select($"d1", $"d2", lit("INACTIVE").as("d3"), $"d4")
这是一个基本的设置操作,可以找到join
中所有不是df1
中的 的内容.这些是要停用的项目,因此我选择了所有列,但将第三个列替换为硬编码的INACTIVE
值.
This is a basic set operation that finds everything in join
that is not in df1
. These are the items to deactivate, so I select all the columns but replace the third with a hardcoded INACTIVE
value.
剩下的就是将它们放在一起:
All that's left is to put them all together:
df1.union(diff)
这简单地将df1
与我们先前计算的停用值表结合在一起,以产生最终结果:
This simply combines df1
with the table of deactivated values we calculated earlier to produce the final result:
+---+---+--------+---+
| c1| c2| c3| c4|
+---+---+--------+---+
| a1| 10| ACTIVE|ds1|
| a1| 20| ACTIVE|ds1|
| a2| 50| ACTIVE|ds1|
| a3| 60| ACTIVE|ds1|
| a1| 30|INACTIVE|ds1|
| a1| 40|INACTIVE|ds1|
+---+---+--------+---+
同样,您不需要所有这些中间值.我只是很冗长,无法帮助跟踪整个过程.
And again, you don't need all these intermediate values. I just was verbose to help trace through the process.
这篇关于如何连接两个DataFrame并更改缺少值的列?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!