Spark文件流式获取文件名 [英] Spark File Streaming get File Names
本文介绍了Spark文件流式获取文件名的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我需要知道从输入目录流式传输的输入文件的文件名.
I need to know the fileName for the input file that is streamed from the input dir.
下面是scala编程中的spark FileStreaming代码
Below is the spark FileStreaming code in scala programming
object FileStreamExample {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder.master("local").getOrCreate()
val input_dir = "src/main/resources/stream_input"
val ck = "src/main/resources/chkpoint_dir"
//create stream from folder
val fileStreamDf = sparkSession.readStream.csv(input_dir)
def fileNames() = fileStreamDf.inputFiles.foreach(println(_))
println("Streaming Started...\n")
//fileNames() //even here it is throwing the same exception
val query = fileStreamDf.writeStream
.format("console")
.outputMode(OutputMode.Append())
.option("checkpointLocation", ck)
.start()
fileNames();
query.awaitTermination()
}}
但在流式传输时面临以下异常
But facing the below exception, while streaming
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[src/main/resources/stream_input]
推荐答案
您可以使用org.apache.spark.sql.functions._
中定义的input_file_name()
函数来获取文件名,行将从该文件名导入到数据框中.
You can use input_file_name()
function defined in org.apache.spark.sql.functions._
to get the file name from which the rows are imported into the dataframe.
sparkSession.readStream.csv(input_dir).withColumn("FileName", input_file_name())
这篇关于Spark文件流式获取文件名的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文