用新值更新数据框列 [英] update a dataframe column with new values

查看:93
本文介绍了用新值更新数据框列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

df1具有字段idjson; df2具有字段idjson

df1 has fields id and json; df2 has fields idand json

df1.count() => 1200; df2.count() => 20

df1.count() => 1200; df2.count() => 20

df1具有所有行. df2的增量更新只有20行.

df1 has all the rows. df2 has an incremental update with just 20 rows.

我的目标是使用df2中的值更新df1. df2的所有ID在df1中.但是df2具有针对这些相同ID的更新值(在json字段中).

My goal is to update df1 with the values from df2. All the ids of df2 are in df1. But df2 has updated values(in the json field) for those same ids.

结果df应该具有来自df1的所有值和具有来自df2的更新值.

Resulting df should have all the values from df1 and updated values from df2.

做到这一点的最佳方法是什么? -联接和过滤器的数量最少.

What is the best way to do this? - With the least number of joins and filters.

谢谢!

推荐答案

您可以使用一个左连接来实现这一点.

You can achieve this using one left join.

创建示例数据框

使用@Shankar Koirala在他的答案中提供的示例数据.

Using the sample data provided by @Shankar Koirala in his answer.

data1 = [
  (1, "a"),
  (2, "b"),
  (3, "c")
]
df1 = sqlCtx.createDataFrame(data1, ["id", "value"])

data2 = [
  (1, "x"), 
  (2, "y")
]

df2 = sqlCtx.createDataFrame(data2, ["id", "value"])

进行左联接

使用id列上的左联接来联接两个DataFrame.这会将所有行保留在左侧的DataFrame中.对于右侧DataFrame中没有匹配的id的行,其值将为null.

Join the two DataFrames using a left join on the id column. This will keep all of the rows in the left DataFrame. For the rows in the right DataFrame that don't have a matching id, the value will be null.

import pyspark.sql.functions as f
df1.alias('l').join(df2.alias('r'), on='id', how='left')\
    .select(
        'id',
         f.col('l.value').alias('left_value'),
         f.col('r.value').alias('right_value')
    )\
    .show()
#+---+----------+-----------+
#| id|left_value|right_value|
#+---+----------+-----------+
#|  1|         a|          x|
#|  3|         c|       null|
#|  2|         b|          y|
#+---+----------+-----------+

选择所需数据

我们将使用不匹配的id具有null的事实来选择最后一列.使用 pyspark.sql.functions.when() 使用正确的值(如果它不为null),否则请保留左侧的值.

We will use the fact that the unmatched ids have a null to select the final columns. Use pyspark.sql.functions.when() to use the right value if it is not null, otherwise keep the left value.

df1.alias('l').join(df2.alias('r'), on='id', how='left')\
    .select(
        'id',
        f.when(
            ~f.isnull(f.col('r.value')),
            f.col('r.value')
        ).otherwise(f.col('l.value')).alias('value')
    )\
    .show()
#+---+-----+
#| id|value|
#+---+-----+
#|  1|    x|
#|  3|    c|
#|  2|    y|
#+---+-----+

如果需要按顺序排列id,则可以对该输出进行排序.

You can sort this output if you want the ids in order.

使用pyspark-sql

您可以使用pyspark-sql查询执行相同的操作:

You can do the same thing using a pyspark-sql query:

df1.registerTempTable('df1')
df2.registerTempTable('df2')

query = """SELECT l.id, 
CASE WHEN r.value IS NOT NULL THEN r.value ELSE l.value END AS value 
FROM df1 l LEFT JOIN df2 r ON l.id = r.id"""
sqlCtx.sql(query.replace("\n", "")).show()
#+---+-----+
#| id|value|
#+---+-----+
#|  1|    x|
#|  3|    c|
#|  2|    y|
#+---+-----+

这篇关于用新值更新数据框列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆