如何通过spark插入HDFS? [英] How do I upsert into HDFS with spark?

查看:99
本文介绍了如何通过spark插入HDFS?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在HDFS中对数据进行了分区.在某个时候,我决定对其进行更新.该算法是:

I have partitioned data in the HDFS. At some point I decide to update it. The algorithm is:

  • 从kafka主题中读取新数据.
  • 找出新数据的分区名称.
  • 从具有HDFS中这些名称的分区中加载数据.
  • 将HDFS数据与新数据合并.
  • 覆盖磁盘上已经存在的分区.

问题是,如果新数据具有磁盘上尚不存在的分区,该怎么办.在这种情况下,它们不会被写入. https://stackoverflow.com/a/49691528/10681828 <-例如,此解决方案不编写新分区.

The problem is that what if the new data has partitions that don't exist on disk yet. In that case they don't get written. https://stackoverflow.com/a/49691528/10681828 <- this solution doesn't write new partitions for example.

上图描述了这种情况.让我们将左磁盘视为HDFS中已经存在的分区,并将右磁盘视为刚刚从Kafka接收到的分区.

The above picture describes the situation. Let's think of the left disk as being the partitions that are already in HDFS and of the right disk as partitions that we just received from Kafka.

右侧磁盘的某些分区将与现有分区相交,而其他分区则不会相交.这段代码:

Some of the partitions of the right disk will intersect with the already existing ones, the others won't. And this code:

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
dataFrame
    .write
    .mode(SaveMode.Overwrite)
    .partitionBy("date", "key")
    .option("header", "true")
    .format(format)
    .save(path)

无法将图片的蓝色部分写入磁盘.

is not able to write the blue part of the picture to disk.

那么,如何解决此问题?请提供代码.我在寻找表现出色的人.

So, how do I resolve this issue? Please provide code. I am looking for something performant.

不懂的人的例子:

假设我们在HDFS中具有以下数据:

Suppose we have this data in the HDFS:

  • PartitionA的数据为"1"
  • PartitionB的数据为"1"

现在我们收到了这些新数据:

Now we receive this new data:

  • PartitionB的数据为"2"
  • PartitionC的数据为"1"

因此,分区A和B在HDFS中,分区B和C是新分区,并且由于B在HDFS中,因此我们对其进行了更新.而且我想编写C.因此最终结果应如下所示:

So, partitions A and B are in the HDFS, and partitions B and C are the new ones, and since B is in the HDFS we update it. And I want C to be written. So the end result should look like this:

  • PartitionA的数据为"1"
  • PartitionB的数据为"2"
  • PartitionC的数据为"1"

但是如果我使用上面的代码,我会得到:

But If I use the code from above, I get this:

  • PartitionA的数据为"1"
  • PartitionB的数据为"2"

因为spark 2.3的新功能overwrite dynamic无法创建PartitionC.

Because the new feature overwrite dynamic from spark 2.3 is not able to create PartitionC.

更新:事实证明,如果您使用配置单元表,则可以使用.但是,如果您使用的是纯火花,则不会...因此,我想蜂巢的覆盖和火花的覆盖工作会有所不同.

Update: It turns out that if you use hive tables instead, this will work. But if you use pure spark it doesn't... So, I guess hive's overwrite and spark's overwrite work differently.

推荐答案

最后,我只是决定从HDFS中删除分区的绿色"子集,而改用SaveMode.Append.我认为这是火花中的错误.

In the end I just decided to delete that "green" subset of partitions from HDFS, and use SaveMode.Append instead. I think this is a bug in spark.

这篇关于如何通过spark插入HDFS?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆