带有火花流的多个writeStream [英] multiple writeStream with spark streaming
问题描述
我正在使用Spark Streaming,并且在尝试实现多个写入流时遇到一些问题.下面是我的代码
I am working with spark streaming and I am facing some issues trying to implement multiple writestreams. Below is my code
DataWriter.writeStreamer(firstTableData,"parquet",CheckPointConf.firstCheckPoint,OutputConf.firstDataOutput)
DataWriter.writeStreamer(secondTableData,"parquet",CheckPointConf.secondCheckPoint,OutputConf.secondDataOutput)
DataWriter.writeStreamer(thirdTableData,"parquet", CheckPointConf.thirdCheckPoint,OutputConf.thirdDataOutput)
其中writeStreamer的定义如下:
where writeStreamer is defined as follows :
def writeStreamer(input: DataFrame, checkPointFolder: String, output: String) = {
val query = input
.writeStream
.format("orc")
.option("checkpointLocation", checkPointFolder)
.option("path", output)
.outputMode(OutputMode.Append)
.start()
query.awaitTermination()
}
我面临的问题是,只有第一个表是用spark writeStream编写的,其他所有表都没有任何反应.您对此有任何想法吗?
the problem I am facing is that only the first table is written with spark writeStream , nothing happens for all other tables . Do you have any idea about this please ?
推荐答案
query.awaitTermination()
应该在创建最后一个流之后之后完成.
query.awaitTermination()
should be done after the last stream is created.
writeStreamer
函数,以返回 StreamingQuery
而不是那时的awaitTermination(因为它是 blocking ):
writeStreamer
function can be modified to return a StreamingQuery
and not awaitTermination at that point (as it is blocking):
def writeStreamer(input: DataFrame, checkPointFolder: String, output: String): StreamingQuery = {
input
.writeStream
.format("orc")
.option("checkpointLocation", checkPointFolder)
.option("path", output)
.outputMode(OutputMode.Append)
.start()
}
那么您将拥有:
val query1 = DataWriter.writeStreamer(...)
val query2 = DataWriter.writeStreamer(...)
val query3 = DataWriter.writeStreamer(...)
query3.awaitTermination()
这篇关于带有火花流的多个writeStream的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!