如何使用来自另一个数据帧的新值更新 pyspark 数据帧? [英] How to update a pyspark dataframe with new values from another dataframe?

查看:35
本文介绍了如何使用来自另一个数据帧的新值更新 pyspark 数据帧?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个火花数据框:

数据框 A:

|col_1 |col_2 |... |col_n ||val_1 |val_2 |... |val_n |

和数据框 B:

|col_1 |col_2 |... |col_m ||val_1 |val_2 |... |val_m |

数据框 B 可以包含来自数据框 A 的重复、更新和新行.我想在 spark 中编写一个操作,我可以在其中创建一个新数据框,其中包含来自数据框 A 的行以及来自数据框 B 的更新和新行.

我首先创建了一个哈希列,其中仅包含不可更新的列.这是唯一标识.所以假设 col1col2 可以改变值(可以更新),但是 col3,..,coln 是唯一的.我创建了一个散列函数 hash(col3,..,coln):

A=A.withColumn("hash", hash(*[col(colname) for colname in unique_cols_A]))B=B.withColumn("hash", hash(*[col(colname) for colname in unique_cols_B]))

现在我想编写一些火花代码,基本上从 B 中选择散列不在 A (所以新行和更新行) 中的行,并将它们与来自 A 的行.我如何在 pyspark 中实现这一点?

数据框 B 可以有来自数据框 A 的额外列,因此不可能联合.

示例

数据框 A:

+-----+-----+|col_1|col_2|+-----+-----+|一个|www||乙|ee||| |rrr|+-----+-----+

数据框 B:

+-----+-----+-----+|col_1|col_2|col_3|+-----+-----+-----+|一个|我们|1||d|YY|2||| |回复|3|+-----+-----+-----+

结果:数据框 C:

+-----+-----+-----+|col_1|col_2|col_3|+-----+-----+-----+|一个|我们|1||乙|ee|空||| |回复|3||d|YY|2|+-----+-----+-----+

解决方案

这与 使用新值更新数据框列,除了您还想添加来自 DataFrame B 的行.一种方法是首先执行链接问题中概述的操作,然后将结果与 DataFrame 合并B 并删除重复项.

例如:

dfA.alias('a').join(dfB.alias('b'), on=['col_1'], how='left')\.选择('col_1',f.当(~f.isnull(f.col('b.col_2')),f.col('b.col_2')).否则(f.col('a.col_2')).alias('col_2'),'b.col_3')\.union(dfB)\.dropDuplicates()\.sort('col_1')\.表演()#+-----+-----+-----+#|col_1|col_2|col_3|#+-----+-----+-----+#|一个|我们|1|#|乙|ee|空|#|| |回复|3|#|d|YY|2|#+-----+-----+-----+

或者更一般地使用列表推导式,如果您有很多要替换的列并且您不想对它们全部进行硬编码:

cols_to_update = ['col_2']dfA.alias('a').join(dfB.alias('b'), on=['col_1'], how='left')\.选择(*[['col_1'] +[f.当(~f.isnull(f.col('b.{}'.format(c))),f.col('b.{}'.format(c))).否则(f.col('a.{}'.format(c))).alias(c)对于 cols_to_update 中的 c] +['b.col_3']])\.union(dfB)\.dropDuplicates()\.sort('col_1')\.表演()

I have two spark dataframes:

Dataframe A:

|col_1 | col_2 | ... | col_n |
|val_1 | val_2 | ... | val_n |

and dataframe B:

|col_1 | col_2 | ... | col_m |
|val_1 | val_2 | ... | val_m |

Dataframe B can contain duplicate, updated and new rows from dataframe A. I want to write an operation in spark where I can create a new dataframe containing the rows from dataframe A and the updated and new rows from dataframe B.

I started by creating a hash column containing only the columns that are not updatable. This is the unique id. So let's say col1 and col2 can change value (can be updated), but col3,..,coln are unique. I have created a hash function as hash(col3,..,coln):

A=A.withColumn("hash", hash(*[col(colname) for colname in unique_cols_A]))
B=B.withColumn("hash", hash(*[col(colname) for colname in unique_cols_B]))

Now I want to write some spark code that basically selects the rows from B that have the hash not in A (so new rows and updated rows) and join them into a new dataframe together with the rows from A. How can I achieve this in pyspark?

Edit: Dataframe B can have extra columns from dataframe A, so a union is not possible.

Sample example

Dataframe A:

+-----+-----+
|col_1|col_2|
+-----+-----+
|    a|  www|
|    b|  eee|
|    c|  rrr|
+-----+-----+

Dataframe B:

+-----+-----+-----+
|col_1|col_2|col_3|
+-----+-----+-----+
|    a|  wew|    1|
|    d|  yyy|    2|
|    c|  rer|    3|
+-----+-----+-----+

Result: Dataframe C:

+-----+-----+-----+
|col_1|col_2|col_3|
+-----+-----+-----+
|    a|  wew|    1|
|    b|  eee| null|
|    c|  rer|    3|
|    d|  yyy|    2|
+-----+-----+-----+

解决方案

This is closely related to update a dataframe column with new values, except that you also want to add the rows from DataFrame B. One approach would be to first do what is outlined in the linked question and then union the result with DataFrame B and drop duplicates.

For example:

dfA.alias('a').join(dfB.alias('b'), on=['col_1'], how='left')\
    .select(
        'col_1',
        f.when(
            ~f.isnull(f.col('b.col_2')),
            f.col('b.col_2')
        ).otherwise(f.col('a.col_2')).alias('col_2'),
        'b.col_3'
    )\
    .union(dfB)\
    .dropDuplicates()\
    .sort('col_1')\
    .show()
#+-----+-----+-----+
#|col_1|col_2|col_3|
#+-----+-----+-----+
#|    a|  wew|    1|
#|    b|  eee| null|
#|    c|  rer|    3|
#|    d|  yyy|    2|
#+-----+-----+-----+

Or more generically using a list comprehension if you have a lot of columns to replace and you don't want to hard code them all:

cols_to_update = ['col_2']

dfA.alias('a').join(dfB.alias('b'), on=['col_1'], how='left')\
    .select(
        *[
            ['col_1'] + 
            [
                f.when(
                    ~f.isnull(f.col('b.{}'.format(c))),
                    f.col('b.{}'.format(c))
                ).otherwise(f.col('a.{}'.format(c))).alias(c)
                for c in cols_to_update
            ] + 
            ['b.col_3']
        ]
    )\
    .union(dfB)\
    .dropDuplicates()\
    .sort('col_1')\
    .show()

这篇关于如何使用来自另一个数据帧的新值更新 pyspark 数据帧?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆