Spark增量加载覆盖旧记录 [英] Spark incremental loading overwrite old record
本文介绍了Spark增量加载覆盖旧记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我需要使用 Spark (PySpark) 对表进行增量加载
示例如下:
第一天
id |价值-----------1 |美国广播公司2 |定义
第二天
id |价值-----------2 |编码器3 |xyz
预期结果
id |价值-----------1 |美国广播公司2 |编码器3 |xyz
这可以在关系数据库中轻松完成,
想知道这是否可以在 Spark 或其他转换工具中完成,例如快点?
解决方案
给你!第一个数据框:
>>>list1 = [(1, 'abc'),(2,'def')]>>>olddf = spark.createDataFrame(list1, ['id', 'value'])>>>olddf.show();+---+-----+|ID|值|+---+-----+|1|abc||2|定义|+---+-----+
第二个数据框:
<预><代码>>>>list2 = [(2, 'cde'),(3,'xyz')]>>>newdf = spark.createDataFrame(list2, ['id', 'value'])>>>newdf.show();+---+-----+|ID|值|+---+-----+|2|编码||3|xyz|+---+-----+现在使用全外连接连接和合并这两个数据名,并在选择时使用合并功能,并且可以用用户定义的值替换空值.
from pyspark.sql.functions import *>>>df = olddf.join(newdf, olddf.id == newdf.id,'full_outer').select(coalesce(olddf.id,newdf.id).alias("id"),coalesce(newdf.value,olddf.value).alias("value"))>>>df.show();+---+-----+|ID|值|+---+-----+|1|abc||3|xyz||2|编码|+---+-----+
我希望这能解决您的问题.:-)
I have a requirement to do the incremental loading to a table by using Spark (PySpark)
Here's the example:
Day 1
id | value
-----------
1 | abc
2 | def
Day 2
id | value
-----------
2 | cde
3 | xyz
Expected result
id | value
-----------
1 | abc
2 | cde
3 | xyz
This can be done easily in relational database,
Wondering whether this can be done in Spark or other transformational tool, e.g. Presto?
解决方案
Here you go! First Dataframe:
>>> list1 = [(1, 'abc'),(2,'def')]
>>> olddf = spark.createDataFrame(list1, ['id', 'value'])
>>> olddf.show();
+---+-----+
| id|value|
+---+-----+
| 1| abc|
| 2| def|
+---+-----+
Second Dataframe:
>>> list2 = [(2, 'cde'),(3,'xyz')]
>>> newdf = spark.createDataFrame(list2, ['id', 'value'])
>>> newdf.show();
+---+-----+
| id|value|
+---+-----+
| 2| cde|
| 3| xyz|
+---+-----+
Now join and merge these two datafame using full outer join and use coalesce function while select and can replace the null values wih user defined values.
from pyspark.sql.functions import *
>>> df = olddf.join(newdf, olddf.id == newdf.id,'full_outer').select(coalesce(olddf.id,newdf.id).alias("id"),coalesce(newdf.value,olddf.value).alias("value"))
>>> df.show();
+---+-----+
| id|value|
+---+-----+
| 1| abc|
| 3| xyz|
| 2| cde|
+---+-----+
I hope this should solve your problem. :-)
这篇关于Spark增量加载覆盖旧记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文