如何加入两个 DataFrame 并更改缺失值的列? [英] How to join two DataFrames and change column for missing values?
问题描述
val df1 = sc.parallelize(Seq(
("a1",10,"ACTIVE","ds1"),
("a1",20,"ACTIVE","ds1"),
("a2",50,"ACTIVE","ds1"),
("a3",60,"ACTIVE","ds1"))
).toDF("c1","c2","c3","c4")`
val df2 = sc.parallelize(Seq(
("a1",10,"ACTIVE","ds2"),
("a1",20,"ACTIVE","ds2"),
("a1",30,"ACTIVE","ds2"),
("a1",40,"ACTIVE","ds2"),
("a4",20,"ACTIVE","ds2"))
).toDF("c1","c2","c3","c5")`
df1.show()
// +---+---+------+---+
// | c1| c2| c3| c4|
// +---+---+------+---+
// | a1| 10|ACTIVE|ds1|
// | a1| 20|ACTIVE|ds1|
// | a2| 50|ACTIVE|ds1|
// | a3| 60|ACTIVE|ds1|
// +---+---+------+---+
df2.show()
// +---+---+------+---+
// | c1| c2| c3| c5|
// +---+---+------+---+
// | a1| 10|ACTIVE|ds2|
// | a1| 20|ACTIVE|ds2|
// | a1| 30|ACTIVE|ds2|
// | a1| 40|ACTIVE|ds2|
// | a4| 20|ACTIVE|ds2|
// +---+---+------+---+
我的要求是:我需要加入两个数据帧.我的输出数据帧应该包含来自 df1 的所有记录以及来自 df2 的记录,这些记录不在 df1 中,仅用于匹配的c1".我从 df2 中提取的记录应该在c3"列更新为非活动状态.
My requirement is: I need to Join both dataframes. My output dataframe should be having all the records from df1 and also the records from df2 which are not in df1 for the matching "c1" only. And the records which I pull from df2 should be updated to Inactive at column "c3".
在这个例子中,只有c1"的匹配值是 a1.因此,我需要从 df2 中提取 c2=30 和 40 条记录并使它们处于非活动状态.
In this example only matching value of "c1" is a1. So I need to pull c2=30 and 40 records from df2 and make them INACTIVE.
这是输出.
df_output.show()
// +---+---+--------+---+
// | c1| c2| c3 | c4|
// +---+---+--------+---+
// | a1| 10|ACTIVE |ds1|
// | a1| 20|ACTIVE |ds1|
// | a2| 50|ACTIVE |ds1|
// | a3| 60|ACTIVE |ds1|
// | a1| 30|INACTIVE|ds1|
// | a1| 40|INACTIVE|ds1|
// +---+---+--------+---+
谁能帮我做这件事.
推荐答案
首先,一件小事.我为 df2
中的列使用了不同的名称:
First, a small thing. I use different names for the columns in df2
:
val df2 = sc.parallelize(...).toDF("d1","d2","d3","d4")
没什么大不了的,但这让我更容易推理.
No big deal, but this made things easier for me to reason about.
现在是有趣的东西.为了清楚起见,我会有点冗长:
Now for the fun stuff. I am going to be a bit verbose for the sake of clarity:
val join = df1
.join(df2, df1("c1") === df2("d1"), "inner")
.select($"d1", $"d2", $"d3", lit("ds1").as("d4"))
.dropDuplicates
这里我执行以下操作:
df1
和df2
在c1
和d1
列上的内连接- 选择
df2
列并在最后一列中简单地硬编码"ds1
以替换ds2
- 删除重复项
- Inner join between
df1
anddf2
on thec1
andd1
columns - Select the
df2
columns and simply "hardcode"ds1
in the last column to replaceds2
- Drop duplicates
这基本上只是过滤掉df2
中没有在df1
中的c1
中有相应键的所有内容.
This basically just filters out everything in df2
that does not have a corresponding key in c1
in df1
.
接下来我比较:
val diff = join
.except(df1)
.select($"d1", $"d2", lit("INACTIVE").as("d3"), $"d4")
这是一个基本的集合操作,它在join
中查找df1
中not的所有内容.这些是要停用的项目,因此我选择了所有列,但将第三列替换为硬编码的 INACTIVE
值.
This is a basic set operation that finds everything in join
that is not in df1
. These are the items to deactivate, so I select all the columns but replace the third with a hardcoded INACTIVE
value.
剩下的就是把它们放在一起:
All that's left is to put them all together:
df1.union(diff)
这只是将 df1
与我们之前计算的停用值表结合以产生最终结果:
This simply combines df1
with the table of deactivated values we calculated earlier to produce the final result:
+---+---+--------+---+
| c1| c2| c3| c4|
+---+---+--------+---+
| a1| 10| ACTIVE|ds1|
| a1| 20| ACTIVE|ds1|
| a2| 50| ACTIVE|ds1|
| a3| 60| ACTIVE|ds1|
| a1| 30|INACTIVE|ds1|
| a1| 40|INACTIVE|ds1|
+---+---+--------+---+
再说一次,您不需要所有这些中间值.我只是详细地帮助跟踪整个过程.
And again, you don't need all these intermediate values. I just was verbose to help trace through the process.
这篇关于如何加入两个 DataFrame 并更改缺失值的列?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!