Spark Streaming DStream RDD以获取文件名 [英] Spark streaming DStream RDD to get file name
本文介绍了Spark Streaming DStream RDD以获取文件名的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
火花流textFileStream
和fileStream
可以监视目录并处理Dstream RDD中的新文件.
Spark streaming textFileStream
and fileStream
can monitor a directory and process the new files in a Dstream RDD.
如何获取特定时间间隔内DStream RDD正在处理的文件名?
How to get the file names that are being processed by the DStream RDD at that particular interval?
推荐答案
fileStream
产生UnionRDD
个NewHadoopRDD
.关于sc.newAPIHadoopFile
创建的NewHadoopRDD
的优点在于,它们的name
被设置为其路径.
fileStream
produces UnionRDD
of NewHadoopRDD
s. The good part about NewHadoopRDD
s created by sc.newAPIHadoopFile
is that their name
s are set to their paths.
以下是您可以利用这些知识执行的操作的示例:
Here's the example of what you can do with that knowledge:
def namedTextFileStream(ssc: StreamingContext, directory: String): DStream[String] =
ssc.fileStream[LongWritable, Text, TextInputFormat](directory)
.transform( rdd =>
new UnionRDD(rdd.context,
rdd.dependencies.map( dep =>
dep.rdd.asInstanceOf[RDD[(LongWritable, Text)]].map(_._2.toString).setName(dep.rdd.name)
)
)
)
def transformByFile[U: ClassTag](unionrdd: RDD[String],
transformFunc: String => RDD[String] => RDD[U]): RDD[U] = {
new UnionRDD(unionrdd.context,
unionrdd.dependencies.map{ dep =>
if (dep.rdd.isEmpty) None
else {
val filename = dep.rdd.name
Some(
transformFunc(filename)(dep.rdd.asInstanceOf[RDD[String]])
.setName(filename)
)
}
}.flatten
)
}
def main(args: Array[String]) = {
val conf = new SparkConf()
.setAppName("Process by file")
.setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(30))
val dstream = namesTextFileStream(ssc, "/some/directory")
def byFileTransformer(filename: String)(rdd: RDD[String]): RDD[(String, String)] =
rdd.map(line => (filename, line))
val transformed = dstream.
transform(rdd => transformByFile(rdd, byFileTransformer))
// Do some stuff with transformed
ssc.start()
ssc.awaitTermination()
}
这篇关于Spark Streaming DStream RDD以获取文件名的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文