Spark文件流式获取文件名 [英] Spark File Streaming get File Names

查看:375
本文介绍了Spark文件流式获取文件名的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要知道从输入目录流式传输的输入文件的文件名.

I need to know the fileName for the input file that is streamed from the input dir.

下面是scala编程中的spark FileStreaming代码

Below is the spark FileStreaming code in scala programming

object FileStreamExample {
  def main(args: Array[String]): Unit = {

    val sparkSession = SparkSession.builder.master("local").getOrCreate()

    val input_dir = "src/main/resources/stream_input"
    val ck = "src/main/resources/chkpoint_dir"

    //create stream from folder
    val fileStreamDf = sparkSession.readStream.csv(input_dir)

    def fileNames() = fileStreamDf.inputFiles.foreach(println(_))

    println("Streaming Started...\n")
    //fileNames() //even here it is throwing the same exception
    val query = fileStreamDf.writeStream
      .format("console")
      .outputMode(OutputMode.Append())
      .option("checkpointLocation", ck)
      .start()

    fileNames();

    query.awaitTermination()

  }}

但在流式传输时面临以下异常

But facing the below exception, while streaming

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[src/main/resources/stream_input]

推荐答案

您可以使用org.apache.spark.sql.functions._中定义的input_file_name()函数来获取文件名,行将从该文件名导入到数据框中.

You can use input_file_name() function defined in org.apache.spark.sql.functions._ to get the file name from which the rows are imported into the dataframe.

sparkSession.readStream.csv(input_dir).withColumn("FileName", input_file_name())

这篇关于Spark文件流式获取文件名的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆