每次触发后如何更新HDFS文件? [英] How to update HDFS files after every trigger?

查看:154
本文介绍了每次触发后如何更新HDFS文件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将处理后的数据写入HDFS位置.经过长时间的尝试和错误方法后,我正在将数据写入HDFS位置,但是现在的问题是,每当我将新文件添加到我的目录(指向readStream)时,旧文件就会在HDFS位置进行处理和更新

让我说我已经开始流媒体并且我在目录中添加了file-1.csv ...什么都没有写入HDFS位置..然后我已经添加了file-2.csv ..仍然没有将其写入HDFS ,接下来我添加了file-3.csv ..这一次,file-1.csv的已处理文件正在写入HDFS ...

File 1 - no prcess
File 2 - no process
File 3 - process and written file 1 data to HDFS
file4 - process and written file 2 data to HDFS 

即使是结构化流,我也不确定为什么会发生

有人可以解决这个问题吗?

我的输出命令如下:

FetRepo
  .writeStream
  .outputMode("append")
  .partitionBy("data_dt")
  .format("csv")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .option("path", "hdfs://ffff/apps/hive/warehouse/area.db/fin_repo/")
  .start

解决方案

问自己我多久添加一次文件?"以及这有什么关系 Trigger.ProcessingTime("10 seconds")?使用该配置,您不应期望10秒之内会发生任何事情.

要查看的另一件事是,您使用outputMode("append")只会输出自上次触发以来添加的聚合(组)的行.

来自基本概念 :

追加模式-仅将自上次触发以来在结果表中追加的新行写入外部存储空间.

请注意(引用相同的文档):这仅适用于预期结果表中现有行不会更改的查询."

I am trying to write my processed data to HDFS location. I am getting data writing to the HDFS location after long trial and error methods, BUT the issue now is whenever i add new files to my directory(where i am pointing for readStream) that the old file getting processed and updated in HDFS location

lets say i have started streaming and I have added file-1.csv in my directory...nothing is written to HDFS location..Then i have added file-2.csv..still its not getting written to HDFS , next i have added file-3.csv..this time the processed file of file-1.csv is getting written to HDFS...

File 1 - no prcess
File 2 - no process
File 3 - process and written file 1 data to HDFS
file4 - process and written file 2 data to HDFS 

am not sure why it is happening even it is structured streaming

can somebody how to resolve this issue ?

my output command given below:

FetRepo
  .writeStream
  .outputMode("append")
  .partitionBy("data_dt")
  .format("csv")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .option("path", "hdfs://ffff/apps/hive/warehouse/area.db/fin_repo/")
  .start

解决方案

Ask yourself "How often do I add the files?" and how does this relate to Trigger.ProcessingTime("10 seconds")? With the configuration you should not expect anything happen within 10 seconds.

Another thing to look at is that you use outputMode("append") that will only output rows for aggregations (groups) that were added since the last trigger.

From Basic Concepts:

Append Mode - Only the new rows appended in the Result Table since the last trigger will be written to the external storage.

Please note that (quoting the same document): "This is applicable only on the queries where existing rows in the Result Table are not expected to change."

这篇关于每次触发后如何更新HDFS文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆