如何使用 spark 插入 HDFS? [英] How do I upsert into HDFS with spark?

查看:37
本文介绍了如何使用 spark 插入 HDFS?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 HDFS 中对数据进行了分区.在某些时候,我决定更新它.算法是:

I have partitioned data in the HDFS. At some point I decide to update it. The algorithm is:

  • 从 kafka 主题中读取新数据.
  • 找出新数据的分区名称.
  • 从 HDFS 中具有这些名称的分区加载数据.
  • 将 HDFS 数据与新数据合并.
  • 覆盖磁盘上已有的分区.

问题是,如果新数据具有磁盘上尚不存在的分区怎么办.在这种情况下,它们不会被写入.

The problem is that what if the new data has partitions that don't exist on disk yet. In that case they don't get written. https://stackoverflow.com/a/49691528/10681828 <- this solution doesn't write new partitions for example.

上图描述了情况.让我们将左侧磁盘视为已经在 HDFS 中的分区,将右侧磁盘视为我们刚从 Kafka 收到的分区.

The above picture describes the situation. Let's think of the left disk as being the partitions that are already in HDFS and of the right disk as partitions that we just received from Kafka.

正确磁盘的某些分区会与已经存在的分区相交,其他的则不会.这段代码:

Some of the partitions of the right disk will intersect with the already existing ones, the others won't. And this code:

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
dataFrame
    .write
    .mode(SaveMode.Overwrite)
    .partitionBy("date", "key")
    .option("header", "true")
    .format(format)
    .save(path)

无法将图片的蓝色部分写入磁盘.

is not able to write the blue part of the picture to disk.

那么,我该如何解决这个问题?请提供代码.我正在寻找高性能的东西.

So, how do I resolve this issue? Please provide code. I am looking for something performant.

给不明白的人举个例子:

An example for those who don't understand:

假设我们在 HDFS 中有这些数据:

Suppose we have this data in the HDFS:

  • 分区A有数据1"
  • 分区 B 有数据1"

现在我们收到了这个新数据:

Now we receive this new data:

  • 分区 B 有数据2"
  • 分区C有数据1"

所以,分区 A 和 B 在 HDFS 中,分区 B 和 C 是新的,并且由于 B 在 HDFS 中,我们对其进行更新.我想写C.所以最终结果应该是这样的:

So, partitions A and B are in the HDFS, and partitions B and C are the new ones, and since B is in the HDFS we update it. And I want C to be written. So the end result should look like this:

  • 分区A有数据1"
  • 分区 B 有数据2"
  • 分区C 有数据1"

但是如果我使用上面的代码,我会得到这个:

But If I use the code from above, I get this:

  • 分区A有数据1"
  • 分区 B 有数据2"

因为来自 spark 2.3 的新特性 overwrite dynamic 无法创建 PartitionC.

Because the new feature overwrite dynamic from spark 2.3 is not able to create PartitionC.

更新:事实证明,如果您改用 hive 表,这将起作用.但是,如果您使用纯 spark 则不会...所以,我猜 hive 的覆盖和 spark 的覆盖工作不同.

Update: It turns out that if you use hive tables instead, this will work. But if you use pure spark it doesn't... So, I guess hive's overwrite and spark's overwrite work differently.

推荐答案

最后我决定从 HDFS 中删除分区的绿色"子集,并使用 SaveMode.Append 代替.我认为这是spark中的一个错误.

In the end I just decided to delete that "green" subset of partitions from HDFS, and use SaveMode.Append instead. I think this is a bug in spark.

这篇关于如何使用 spark 插入 HDFS?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆