如何覆盖在Spark中读取DataFrame所在的Parquet文件 [英] How to overwrite a parquet file from where DataFrame is being read in Spark

查看:114
本文介绍了如何覆盖在Spark中读取DataFrame所在的Parquet文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是我所遇到的问题的缩影,其中我遇到了错误.让我尝试在这里重现它.

This is a microcosm of the problem I am facing, where I am getting an error. Let me try to reproduce it here.

我正在将 DataFrame 保存为 parquet ,但是当我从 parquet 文件重新加载 DataFrame 时,再次将其保存为 parquet ,我得到一个错误.

I am saving a DataFrame as a parquet, but when I reload the DataFrame from parquet file and save it once again as parquet, I get an error.

valuesCol = [('Male','2019-09-06'),('Female','2019-09-06'),('Male','2019-09-07')]
df = spark.createDataFrame(valuesCol,['sex','date'])
# Save as parquet
df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')

# Load it back
df = spark.read.format('parquet').load('.../temp')
df = df.where(col('sex')=='Male')
# Save it back - This produces ERROR   
df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')

错误消息-

执行器22):java.io.FileNotFoundException:请求的文件maprfs:///mapr/.../temp/part-00000-f67d5a62-36f2-4dd2-855a-846f422e623f-c000.snappy.parquet不存在.基础文件可能已更新.您可以通过运行'REFRESH来显式使Spark中的缓存无效TABLE tableName'命令(通过SQL或通过重新创建数据集/DataFrame)参与其中.

executor 22): java.io.FileNotFoundException: Requested file maprfs:///mapr/.../temp/part-00000-f67d5a62-36f2-4dd2-855a-846f422e623f-c000.snappy.parquet does not exist. It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.

另外一个问题解决了此问题问题.提议的解决方案是刷新该表类似于下面的代码,但这并没有帮助.问题在于元数据的刷新.我不知道如何刷新它.

Another SO question addresses this issue. The proposed solution was to refresh the table like the code below, but that did not help. The issue is with the refreshing of the metadata. I don't know how to refresh it.

df.createOrReplaceTempView('table_view')
spark.catalog.refreshTable('table_view')
df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')

此问题的解决方法:解决此问题的一种非优雅的方法是将 DataFrame 保存为具有不同名称的 parquet 文件,然后删除原始的 parquet 文件,最后,将此 parquet 文件重命名为旧名称.

Workaround for this problem: A non-elegant way to solve this issue is to save the DataFrame as parquet file with a different name, then delete the original parquet file and finally, rename this parquet file to the old name.

# Workaround
import os
import shutil

# Load it back
df = spark.read.format('parquet').load('.../temp')

# Save it back as temp1, as opposed to original temp      
df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp1')

# Delete the original parquet file
shutil.rmtree('.../temp')

# Renaming the parquet folder.
os.rename('.../temp1','.../temp')

但是,问题是某些DataFrame很大,这可能不是处理它的最佳方法.更不用说重命名是否会导致MetaData出现问题,我不确定.

But, the problem is that some DataFrames are quite big and this may not be the best way to deal with it. Not to mention if renaming will cause some problem with the MetaData, that I am not sure of.

推荐答案

针对此错误的一种解决方案是缓存,对df进行操作(例如: df.show()),然后将实木复合地板文件保存在覆盖"目录中.模式.

One solution for this error is to cache, make an action to the df (example: df.show()) and then save the parquet file in "overwrite" mode.

在python中:

df = spark.read.parquet("path_to_parquet")

....... make your transformation to the df

new_df.cache()
new_df.show()

df.write.format("parquet")\
                .mode(save_mode)\
                .save("path_to_parquet")

这篇关于如何覆盖在Spark中读取DataFrame所在的Parquet文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆