如何在CSV文件中的更新行上运行流查询? [英] How to run streaming query on updated lines in CSV file?

查看:50
本文介绍了如何在CSV文件中的更新行上运行流查询?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的文件夹中有一个csv文件,并且该文件会不断更新.我需要从此csv文件中获取输入并产生一些事务.我怎样才能从不断更新的csv文件中获取数据,可以说每5分钟一次?

I have one csv file in a folder that is keep on updating continuously. I need to take inputs from this csv file and produce some transactions. How can I take data from the csv file that is keep on updating , lets say every 5 minutes?

我尝试了以下方法:

val csvDF = spark
  .readStream
  .option("sep", ",")
  .schema(userSchema)
  .csv("file:///home/location/testFiles")

但是问题是它正在监视是否已创建任何新文件的文件夹...但是我的问题只是一个不断更新的文件.

but the issue is it is monitoring the folder that any new files has been created or not... but my issue is only one file that is keep on updating.

推荐答案

我在1个文件夹中有1个csv文件,并且每次都在不断更新.我需要从此csv文件中获取输入并产生一些交易.我要如何从不断更新的csv文件中获取数据,可以说每5分钟一次.

I have 1 csv file in 1 folder location that is keep on updating everytime. i need to take inputs from this csv file and produce some transactions. how can i take data from csv file that is keep on updating , lets say every 5 minutes.

tl; dr .它将无法正常工作.

tl;dr It won't work.

Spark结构化流默认情况下会监视目录中的文件,并且对于每个新文件都会触发计算.一旦文件被处理,该文件将不再被处理.这是默认的实现.

Spark Structured Streaming by default monitors files in a directory and for every new file triggers a computation. Once a file has been processed, the file will never be processed again. That's the default implementation.

您可以编写自己的流式源,该流源可以监视文件中的更改,但这是自定义源开发(在大多数情况下,这是不值得的工作,但仍然可行).

You could write your own streaming source that could monitor a file for changes, but that's a custom source development (which in most cases is not worth the effort yet doable).

这篇关于如何在CSV文件中的更新行上运行流查询?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆