Spark增量加载会覆盖旧记录 [英] Spark incremental loading overwrite old record

查看:90
本文介绍了Spark增量加载会覆盖旧记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要使用Spark(PySpark)对表进行增量加载

I have a requirement to do the incremental loading to a table by using Spark (PySpark)

这里是例子:

第1天

id | value
-----------
1  | abc
2  | def

第2天

id | value
-----------
2  | cde
3  | xyz

预期结果

id | value
-----------
1  | abc
2  | cde
3  | xyz

这可以在关系数据库中轻松完成,
想知道这是否可以在Spark或其他转换工具中完成,例如Presto?

This can be done easily in relational database,
Wondering whether this can be done in Spark or other transformational tool, e.g. Presto?

推荐答案

去这里! 第一个数据框:

Here you go! First Dataframe:

 >>> list1 = [(1, 'abc'),(2,'def')]
 >>> olddf = spark.createDataFrame(list1, ['id', 'value'])
 >>> olddf.show();
 +---+-----+
 | id|value|
 +---+-----+
 |  1|  abc|
 |  2|  def|
 +---+-----+

第二个数据框:

>>> list2 = [(2, 'cde'),(3,'xyz')]
>>> newdf = spark.createDataFrame(list2, ['id', 'value'])
>>> newdf.show();
+---+-----+
| id|value|
+---+-----+
|  2|  cde|
|  3|  xyz|
+---+-----+

现在使用完全外部联接来联接和合并这两个数据名望,并在选择并可以使用用户定义的值替换空值时使用合并功能.

Now join and merge these two datafame using full outer join and use coalesce function while select and can replace the null values wih user defined values.

from pyspark.sql.functions import *

>>> df = olddf.join(newdf, olddf.id == newdf.id,'full_outer').select(coalesce(olddf.id,newdf.id).alias("id"),coalesce(newdf.value,olddf.value).alias("value"))
>>> df.show();
+---+-----+
| id|value|
+---+-----+
|  1|  abc|
|  3|  xyz|
|  2|  cde|
+---+-----+

我希望这可以解决您的问题. :-)

I hope this should solve your problem. :-)

这篇关于Spark增量加载会覆盖旧记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆