Spark Streaming textFileStream监视RDD.saveAsTextFile的输出 [英] Spark Streaming textFileStream watch output of RDD.saveAsTextFile

查看:134
本文介绍了Spark Streaming textFileStream监视RDD.saveAsTextFile的输出的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

运行Spark 1.6.2(YARN模式)

Running Spark 1.6.2 (YARN mode)

首先,我从这篇文章中获取了一些代码,以在其中获取文件名Spark Streaming ,因此可能是问题所在,但希望不是问题.

Firstly, I have some code from this post to get filenames within Spark Streaming, so that could be the issue, but hopefully not.

基本上,我有第一份工作.

Basically, I have this first job.

import org.apache.spark.SparkContext
import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

def getStream(ssc: StreamingContext, dir: String): DStream[String] = {
    ssc.fileStream[LongWritable, Text, TextInputFormat](dir)
}

val sc = SparkContext.getOrCreate
val ssc = new StreamingContext(sc, Seconds(5))

val inputDir = "hdfs:///tmp/input"
val outputDir = "hdfs:///tmp/output1"

val stream1 = getStream(ssc, inputDir)
stream1.foreachRDD(rdd => rdd.saveAsTextFile(outputDir))

ssc.start()
ssc.awaitTermination()

我还有第二个工作,在这个例子中,看起来几乎是相同的,只是更改 inputDir outputDir ,然后移动到新的 outputDir="hdfs:///tmp/output2" .

And I also have a second job that, for this example, looks practically identical, just change around inputDir and outputDir, and move to a new outputDir = "hdfs:///tmp/output2".

无论如何,所以我必须在第一个作业之前开始第二个流作业,因为它需要监视新文件.有道理...

Anyway, so I have to start the second streaming job before the first job because it needs to watch for new files. Makes sense...

然后,我开始第一个作业,并根据API将hadoop fs -copyFromLocal 的一些文件放入输入文件夹中

Then, I start the first job and hadoop fs -copyFromLocal some files into the input folder since per the API

必须通过从同一文件系统中的另一个位置移动"文件来将文件写入受监视的目录.以.开头的文件名.被忽略.

Files must be written to the monitored directory by "moving" them from another location within the same file system. File names starting with . are ignored.

当我尝试运行此命令时,它最终因包含该命令的堆栈跟踪而崩溃

When I try to run this, it eventually crashes with a stacktrace that contains this

17/02/01 11:48:35 INFO FileInputDStream: Finding new files took 7 ms
17/02/01 11:48:35 INFO FileInputDStream: New files at time 1485949715000 ms:
hdfs://sandbox.hortonworks.com:8020/tmp/output1/_SUCCESS
17/02/01 11:48:35 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 355.9 KB, free 356.8 KB)
17/02/01 11:48:35 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 28.9 KB, free 385.7 KB)
17/02/01 11:48:35 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:43097 (size: 28.9 KB, free: 511.1 MB)
17/02/01 11:48:35 INFO SparkContext: Created broadcast 1 from fileStream at FileStreamTransformer.scala:45
17/02/01 11:48:35 ERROR JobScheduler: Error generating jobs for time 1485949715000 ms
org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://sandbox.hortonworks.com:8020/output1/_SUCCESS
  at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:323)
  at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:265)
  at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:387)
  at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:120)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:242)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:240)
  at scala.Option.getOrElse(Option.scala:120)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:240)
  at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$4.apply(FileInputDStream.scala:276)
  at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$4.apply(FileInputDStream.scala:266)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.AbstractTraversable.map(Traversable.scala:105)
  at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:266)
  at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:153)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
  at scala.Option.orElse(Option.scala:257)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
  at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
  at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.immutable.List.foreach(List.scala:318)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.AbstractTraversable.map(Traversable.scala:105)
  at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
  at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
  at scala.Option.orElse(Option.scala:257)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
  at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
  at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.immutable.List.foreach(List.scala:318)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.AbstractTraversable.map(Traversable.scala:105)
  at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
  at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
  at scala.Option.orElse(Option.scala:257)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
  at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
  at scala.Option.orElse(Option.scala:257)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
  at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
  at scala.Option.orElse(Option.scala:257)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
  at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)
  at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
  at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
  at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
  at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114)
  at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:253)
  at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:251)
  at scala.util.Try$.apply(Try.scala:161)
  at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:251)
  at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182)
  at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
  at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Exception in thread "main" org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://sandbox.hortonworks.com:8020/tmp/output1/_SUCCESS
  at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:323)
  at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:265)
  at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:387)
  at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:120)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:242)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:240)
  at scala.Option.getOrElse(Option.scala:120)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:240)
  at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$4.apply(FileInputDStream.scala:276)
  at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$4.apply(FileInputDStream.scala:266)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.AbstractTraversable.map(Traversable.scala:105)
  at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:266)
  at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:153)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
  at scala.Option.orElse(Option.scala:257)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
  at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
  at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.immutable.List.foreach(List.scala:318)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.AbstractTraversable.map(Traversable.scala:105)
  at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
  at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
  at scala.Option.orElse(Option.scala:257)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
  at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
  at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.immutable.List.foreach(List.scala:318)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.AbstractTraversable.map(Traversable.scala:105)
  at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
  at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
  at scala.Option.orElse(Option.scala:257)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
  at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
  at scala.Option.orElse(Option.scala:257)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
  at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
  at scala.Option.orElse(Option.scala:257)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
  at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)
  at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
  at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
  at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
  at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114)
  at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:253)
  at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:251)
  at scala.util.Try$.apply(Try.scala:161)
  at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:251)
  at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182)
  at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
  at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
17/02/01 11:48:35 INFO StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook
17/02/01 11:48:35 INFO JobGenerator: Stopping JobGenerator immediately
17/02/01 11:48:35 INFO RecurringTimer: Stopped timer for JobGenerator after time 1485949715000
17/02/01 11:48:35 INFO JobGenerator: Stopped JobGenerator
17/02/01 11:48:35 INFO JobScheduler: Stopped JobScheduler
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/streaming,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/streaming/batch,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/static/streaming,null}
17/02/01 11:48:35 INFO StreamingContext: StreamingContext stopped successfully
17/02/01 11:48:35 INFO SparkContext: Invoking stop() from shutdown hook
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/streaming/batch/json,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/streaming/json,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/api,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/static,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/json,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment/json,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd/json,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/json,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool/json,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/json,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/json,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/json,null}
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs,null}
17/02/01 11:48:35 INFO SparkUI: Stopped Spark web UI at http://172.17.0.2:4040
17/02/01 11:48:35 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
17/02/01 11:48:35 INFO MemoryStore: MemoryStore cleared
17/02/01 11:48:35 INFO BlockManager: BlockManager stopped
17/02/01 11:48:35 INFO BlockManagerMaster: BlockManagerMaster stopped
17/02/01 11:48:35 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
17/02/01 11:48:35 INFO SparkContext: Successfully stopped SparkContext
17/02/01 11:48:35 INFO ShutdownHookManager: Shutdown hook called
17/02/01 11:48:35 INFO ShutdownHookManager: Deleting directory /tmp/spark-85bb28ad-e3e1-4b2a-8795-04ac1c6a0ea5
17/02/01 11:48:35 INFO ShutdownHookManager: Deleting directory /tmp/spark-85bb28ad-e3e1-4b2a-8795-04ac1c6a0ea5/httpd-65e6e9f0-dcb8-4b66-86f6-f775e2e497c0
17/02/01 11:48:35 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
17/02/01 11:48:35 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
17/02/01 11:48:35 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.

我知道 _SUCCESS 文件是由 rdd.saveAsTextFile 编写的,所以这不是问题,但是我的问题如下:

And I know that _SUCCESS file is being written by rdd.saveAsTextFile, so that's not the issue, but my issues are as follows:

  1. 存在文件.可以使用 hadoop fs -ls
  2. 看到它
  3. 即使该文件不存在,该API也会设计为提取个文件.为什么会被读取?
  4. 该文件为空,那么为什么仍要对其进行处理?
  5. 这有可能吗? Spark Streaming可以观看另一个Spark作业的输出吗?
  1. The file does exist. Can see it using hadoop fs -ls
  2. Even if the file didn't exist, the API is designed to pickup new files. Why is that being read?
  3. That file is empty, so why should it be processed anyway?
  4. Is this even possible? Can Spark Streaming watch the output of another Spark job?

推荐答案

明确要求仅处理新文件,并确保跳过跳过_SUCCESS的触摸文件,我们可以使用以下 fileStream 签名

to explicitly enforce that only new files are processed and to ensure touch files liek _SUCCESS are skipped we can use the below signature of fileStream

def getStream(ssc: StreamingContext, dir: String): DStream[String] = {
   ssc.fileStream[LongWritable, Text, TextInputFormat](dir,
      (path: org.apache.hadoop.fs.Path) => (!path.getName.startsWith("_")) || (!path.getName().startsWith(".")),
      newFilesOnly = true)
}

未按如下所示指定时,newFileOnly默认为true

The newFileOnly defaults to true when not specified as shown here. So ideally _SUCCESS should not have been processed in your setup too.

这篇关于Spark Streaming textFileStream监视RDD.saveAsTextFile的输出的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆