星火流textFileStream不支持通配符 [英] Spark Streaming textFileStream not supporting wildcards

查看:1834
本文介绍了星火流textFileStream不支持通配符的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我设置一个简单的测试流从S3的文本文件,当我试图像

I setup a simple test to stream text files from S3 and got it to work when I tried something like

val input = ssc.textFileStream("s3n://mybucket/2015/04/03/")

和桶中我会有日志文件去那里一切都正常工作。

and in the bucket I would have log files go in there and everything would work fine.

但是,如果他们是一个子文件夹,它不会发现,得到了投入的子文件夹的任何文件(是的,我知道HDFS实际上并未使用文件夹结构)

But if their was a subfolder, it would not find any files that got put into the subfolder (and yes, I am aware that hdfs doesn't actually use a folder structure)

val input = ssc.textFileStream("s3n://mybucket/2015/04/")

所以,我想简单地做喜欢的通配符我有一个​​标准的火花应用程序之前完成

So, I tried to simply do wildcards like I have done before with a standard spark application

val input = ssc.textFileStream("s3n://mybucket/2015/04/*")

但是,当我试试这个,它抛出一个错误

But when I try this it throws an error

java.io.FileNotFoundException: File s3n://mybucket/2015/04/* does not exist.
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:506)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1483)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1523)
at org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:176)
at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:134)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at scala.Option.orElse(Option.scala:257)
.....

我知道一个事实,你可以阅读的FileInput一个标准的火花应用程序时使用通配符,但现在看来,这样做的流输入时,它不会做,也没有自动子文件夹中处理文件。是否有什么我失踪这里?

I know for a fact that you can use wildcards when reading fileInput for a standard spark applications but it appears that when doing streaming input, it doesn't do that nor does it automatically process files in subfolders. Is there something I'm missing here??

最后我需要的是一个数据流作业必须运行24/7将监控有按日期放在它的日志铲斗S3

Ultimately what I need is a streaming job to be running 24/7 that will be monitoring an S3 bucket that has logs placed in it by date

因此​​,像

s3n://mybucket/<YEAR>/<MONTH>/<DAY>/<LogfileName>

有没有办法把它最上面的文件夹,它会自动读取文件的任何文件夹中显示出来(引起明显的日期将每天增加)?

Is there any way to hand it the top most folder and it automatically read files that show up in any folder (cause obviously the date will increase every day)?

修改

所以在挖掘到的文档在<一个href=\"http://spark.apache.org/docs/latest/streaming-programming-guide.html#basic-sources\">http://spark.apache.org/docs/latest/streaming-programming-guide.html#basic-sources它指出嵌套目录,不支持。

So upon digging into the documentation at http://spark.apache.org/docs/latest/streaming-programming-guide.html#basic-sources it states that nested directories are not supported.

任何人都可以提供一些线索,为什么是这样?

Can anyone shed some light as to why this is the case?

此外,由于我的文件将根据其最新嵌套,这将是我在流应用程序解决这个问题的好办法?这是一个有点复杂,因为日志才能让写到S3几分钟,所以最后的文件正在为当日书面可能在$ p $写入pvious当天的文件夹,即使我们是几分钟到新的一天。

Also, since my files will be nested based upon their date, what would be a good way of solving this problem in my streaming application? It's a little complicated since the logs take a few minutes to get written to S3 and so the last file being written for the day could be written in the previous day's folder even though we're a few minutes into the new day.

推荐答案

一些丑陋但工作液,可以通过扩展FileInputDStream创建。
sc.textFileStream(D)等同于

Some "ugly but working solution" can be created by extending FileInputDStream. Writing sc.textFileStream(d) is equivalent to

new FileInputDStream[LongWritable, Text, TextInputFormat](streamingContext, d).map(_._2.toString)

您可以创建CustomFileInputDStream将延长FileInputDStream。自定义类将从FileInputDStream类复制计算方法和调整findNewFiles方法您的需求。

You can create CustomFileInputDStream that will extend FileInputDStream. The custom class will copy the compute method from the FileInputDStream class and adjust the findNewFiles method to your needs.

更改find​​NewFiles方式:

changing findNewFiles method from:

 private def findNewFiles(currentTime: Long): Array[String] = {
    try {
      lastNewFileFindingTime = clock.getTimeMillis()

  // Calculate ignore threshold
  val modTimeIgnoreThreshold = math.max(
    initialModTimeIgnoreThreshold,   // initial threshold based on newFilesOnly setting
    currentTime - durationToRemember.milliseconds  // trailing end of the remember window
  )
  logDebug(s"Getting new files for time $currentTime, " +
    s"ignoring files older than $modTimeIgnoreThreshold")
  val filter = new PathFilter {
    def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
  }
  val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
  val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime
  logInfo("Finding new files took " + timeTaken + " ms")
  logDebug("# cached file times = " + fileToModTime.size)
  if (timeTaken > slideDuration.milliseconds) {
    logWarning(
      "Time taken to find new files exceeds the batch size. " +
        "Consider increasing the batch size or reducing the number of " +
        "files in the monitored directory."
    )
  }
  newFiles
} catch {
  case e: Exception =>
    logWarning("Error finding new files", e)
    reset()
    Array.empty
}

}

  private def findNewFiles(currentTime: Long): Array[String] = {
    try {
      lastNewFileFindingTime = clock.getTimeMillis()

      // Calculate ignore threshold
      val modTimeIgnoreThreshold = math.max(
        initialModTimeIgnoreThreshold,   // initial threshold based on newFilesOnly setting
        currentTime - durationToRemember.milliseconds  // trailing end of the remember window
      )
      logDebug(s"Getting new files for time $currentTime, " +
        s"ignoring files older than $modTimeIgnoreThreshold")
      val filter = new PathFilter {
        def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
      }
      val directories = fs.listStatus(directoryPath).filter(_.isDirectory)
      val newFiles = ArrayBuffer[FileStatus]()

      directories.foreach(directory => newFiles.append(fs.listStatus(directory.getPath, filter) : _*))

      val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime
      logInfo("Finding new files took " + timeTaken + " ms")
      logDebug("# cached file times = " + fileToModTime.size)
      if (timeTaken > slideDuration.milliseconds) {
        logWarning(
          "Time taken to find new files exceeds the batch size. " +
            "Consider increasing the batch size or reducing the number of " +
            "files in the monitored directory."
        )
      }
      newFiles.map(_.getPath.toString).toArray
    } catch {
      case e: Exception =>
        logWarning("Error finding new files", e)
        reset()
        Array.empty
    }
  }

会检查文件中的所有一级子文件夹,你可以调整它使用批处理时间戳,以便访问相关的子目录。

will check for files in all first degree sub folders, you can adjust it to use the batch timestamp in order to access the relevant "subdirectories".

我创建了CustomFileInputDStream正如我所提到并通过调用激活它:

I created the CustomFileInputDStream as I mentioned and activated it by calling:

new CustomFileInputDStream[LongWritable, Text, TextInputFormat](streamingContext, d).map(_._2.toString)

这似乎表现我们的预期。

It seems to behave us expected.

当我写的解决方案,这样我必须补充几点考虑:

When I write solution like this I must add some points for consideration:


  • 您正在打破星火封装和创造,你将不得不完全支持随着时间的通自定义类。

  • You are breaking Spark encapsulation and creating a custom class that you would have to support solely as time pass.

我相信像这样的解决方案是不得已而为之。如果你使用的情况下,可以通过不同的方式来实现,它通常是最好避免这样的解决方案。

I believe that solution like this is the last resort. If your use case can be implemented by different way, it is usually better to avoid solution like this.

如果你将有很多的S3子目录,并会检查他们每个人它会花费你。

If you will have a lot of "subdirectories" on S3 and would check each one of them it will cost you.

这将是非常有趣的理解,如果Databricks不支持嵌套的文件,只是因为可能的性能损失与否,也许有更深层次的原因我还没有想过。

It will be very interesting to understand if Databricks doesn't support nested files just because of possible performance penalty or not, maybe there is a deeper reason I haven't thought about.

这篇关于星火流textFileStream不支持通配符的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆