使用Spark结构化流处理后删除文件 [英] Delete files after processing with Spark Structured Streaming

查看:169
本文介绍了使用Spark结构化流处理后删除文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在Spark中使用文件源结构流化,并希望在我处理它们后将其删除.

I am using the file source in Spark Structures Streaming and want to delete the files after I process them.

我正在读取一个填充有JSON文件(1.json2.json等)的目录,然后将它们写为Parquet文件.我要在成功处理每个文件后将其删除.

I am reading in a directory filled with JSON files (1.json, 2.json, etc) and then writing them as Parquet files. I want to remove each file after it successfully processes it.

推荐答案

编辑2 :更改了我的go脚本以读取源代码. 新脚本

EDIT 2: Changed my go script to read sources instead. new script

编辑:当前正在尝试此操作,可能正在删除文件,然后再对其进行处理.目前正在寻找更好的解决方案,并正在研究这种方法.

EDIT: Trying this out currently, and it might be deleting files before they are processed. Currently looking for a better solution and investigating this method.

我通过创建 a Go脚本来临时解决了此问题.它将扫描我在Spark中设置的checkpoints文件夹并处理其中的文件,以确定哪些文件已经从Spark中写出.如果它们存在,它将删除它们.它每10秒执行一次.

I solved this temporarily by creating a Go script. It will scan the checkpoints folder that I set in Spark and process the files in that to figure out which files have been written out of Spark already. It will then delete them if they exist. It does this every 10 seconds.

但是,它依赖于Spark的检查点文件结构和表示形式(JSON),该文件没有记录,并且随时可能更改.我也没有浏览Spark源代码,以查看我正在读取的文件(checkpoint/sources/0/...)是否是处理过的文件的真实来源.似乎可以使用ATM!比此时手动执行更好.

However, relies on Spark's checkpoint file structure and representation (JSON), which is not documented and could change at any point. I also have not looked through the Spark source code to see if the files I am reading (checkpoint/sources/0/...), are the real source of truth for processed files. Seems to be working ATM though! Better than doing it manually at this point.

这篇关于使用Spark结构化流处理后删除文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆