Spark SQL和MySQL- SaveMode.Overwrite不插入修改的数据 [英] Spark SQL and MySQL- SaveMode.Overwrite not inserting modified data

查看:856
本文介绍了Spark SQL和MySQL- SaveMode.Overwrite不插入修改的数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在MySQL中有一个test表,其ID和名称如下:

I have a test table in MySQL with id and name like below:

+----+-------+
| id | name  |
+----+-------+
| 1  | Name1 |
+----+-------+
| 2  | Name2 |
+----+-------+
| 3  | Name3 |
+----+-------+

我正在使用Spark DataFrame读取此数据(使用JDBC)并像这样修改数据

I am using Spark DataFrame to read this data (using JDBC) and modifying the data like this

Dataset<Row> modified = sparkSession.sql("select id, concat(name,' - new') as name from test");
modified.write().mode("overwrite").jdbc(AppProperties.MYSQL_CONNECTION_URL,
                "test", connectionProperties);

但是我的问题是,如果我提供覆盖模式,它将删除前一个表并创建一个新表,但不插入任何数据.

But my problem is, if I give overwrite mode, it drops the previous table and creates a new table but not inserting any data.

我尝试通过读取csv文件(与测试表相同的数据)并覆盖来尝试相同的程序.那对我有用.

I tried the same program by reading from a csv file (same data as test table) and overwriting. That worked for me.

我在这里想念什么吗?

Am I missing something here ?

谢谢!

推荐答案

问题出在您的代码中.因为您覆盖了要从中读取的表,所以实际上可以消除所有数据,然后Spark才能实际访问它.

The problem is in your code. Because you overwrite a table from which you're trying to read you effectively obliterate all data before Spark can actually access it.

请记住,Spark是懒惰的.创建Dataset时,Spark会获取所需的元数据,但不会加载数据.因此,没有魔术缓存可以保留原始内容.实际需要时将加载数据.在这里,当您执行write操作并且开始写入时,不再有要提取的数据.

Remember that Spark is lazy. When you create a Dataset Spark fetches required metadata, but doesn't load the data. So there is no magic cache which will preserve original content. Data will be loaded when it is actually required. Here it is when you execute write action and when you start writing there is no more data to be fetched.

您需要的是这样的

  • 创建一个Dataset.
  • 应用所需的转换并将数据写入中间MySQL表.

  • Create a Dataset.
  • Apply required transformations and write data to an intermediate MySQL table.

TRUNCATE原始输入和INSERT INTO ... SELECT来自中间表,或DROP原始表和RENAME中间表.

TRUNCATE the original input and INSERT INTO ... SELECT from the intermediate table or DROP the original table and RENAME intermediate table.

另一种但不太有利的方法是:

Alternative, but less favorable approach, would be:

  • 创建一个Dataset.
  • 应用所需的转换并将数据写入持久性Spark表(df.write.saveAsTable(...)或等效表)
  • TRUNCATE原始输入.
  • 读回数据并保存(spark.table(...).write.jdbc(...))
  • 丢弃Spark表.
  • Create a Dataset.
  • Apply required transformations and write data to a persistent Spark table (df.write.saveAsTable(...) or equivalent)
  • TRUNCATE the original input.
  • Read data back and save (spark.table(...).write.jdbc(...))
  • Drop Spark table.

我们不能过分强调使用Spark cache/persist并不是路要走.即使使用保守的StorageLevel(MEMORY_AND_DISK_2/MEMORY_AND_DISK_SER_2),缓存的数据也可能丢失(节点故障),从而导致无声的正确性错误.

We cannot stress enough that using Spark cache / persist is not the way to go. Even in with the conservative StorageLevel (MEMORY_AND_DISK_2 / MEMORY_AND_DISK_SER_2) cached data can be lost (node failures), leading to silent correctness errors.

这篇关于Spark SQL和MySQL- SaveMode.Overwrite不插入修改的数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆