如何在不覆盖的情况下将Spark Streaming输出写入HDFS [英] How to write Spark Streaming output to HDFS without overwriting
问题描述
经过一些处理后,我有了一个DStream [String,ArrayList [String]],因此当我使用saveAsTextFile将其写入hdfs时,每批处理后它都会覆盖数据,因此如何通过附加到以前的结果中来写入新结果
After some processing I have a DStream[String , ArrayList[String]] , so when I am writing it to hdfs using saveAsTextFile and after every batch it overwrites the data , so how to write new result by appending to previous results
output.foreachRDD(r => {
r.saveAsTextFile(path)
})
: 如果有人可以帮助我将输出转换为avro格式,然后通过附加内容写入HDFS
Edit :: If anyone could help me in converting the output to avro format and then writing to HDFS with appending
推荐答案
saveAsTextFile
不支持附加.如果使用固定的文件名进行调用,则每次都会覆盖它.我们可以每次执行 saveAsTextFile(path + timestamp)
保存到一个新文件.这是 DStream.saveAsTextFiles(path)
saveAsTextFile
does not support append. If called with a fixed filename, it will overwrite it every time.
We could do saveAsTextFile(path+timestamp)
to save to a new file every time. That's the basic functionality of DStream.saveAsTextFiles(path)
Parquet是一种支持 append
的易于访问的格式.我们首先将数据RDD转换为 DataFrame
或 Dataset
,然后我们可以从该抽象之上提供的写支持中受益.
An easily accessible format that supports append
is Parquet. We first transform our data RDD to a DataFrame
or Dataset
and then we can benefit from the write support offered on top of that abstraction.
case class DataStructure(field1,..., fieldn)
... streaming setup, dstream declaration, ...
val structuredOutput = outputDStream.map(record => mapFunctionRecordToDataStructure)
structuredOutput.foreachRDD(rdd =>
import sparkSession.implicits._
val df = rdd.toDF()
df.write.format("parquet").mode("append").save(s"$workDir/$targetFile")
})
请注意,随着时间的流逝,追加到Parquet文件中会变得更加昂贵,因此仍然需要不时地旋转目标文件.
Note that appending to Parquet files gets more expensive over time, so rotating the target file from time to time is still a requirement.
这篇关于如何在不覆盖的情况下将Spark Streaming输出写入HDFS的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!