在火花流如何重新加载在n批次RDD查找非流 [英] In spark Streaming how to reload a lookup non stream rdd after n batches
问题描述
假设我有做很多的步骤,然后在最后的微批次看的或加入到ploaded RDD一个$ P $的流媒体内容。我必须刷新ploaded RDD每12小时即$ P $。我怎样才能做到这一点。什么我做这并不涉及到流上下文不重播我的理解,我怎么得到这个所谓的形式流RDD之一。我需要做只有一个呼叫非物质流DSTREAM多少分区有
Suppose i have a streaming context which does lot of steps and then at the end the micro batch look's up or joins to a preloaded RDD. I have to refresh that preloaded RDD every 12 hours . how can i do this. Anything i do which does not relate to streaming context is not replayed to my understanding, how i get this called form one of the streaming RDD. I need to make only one call non matter how many partition the streaming dstream has
推荐答案
这是通过重新创建外部RDD它需要重新加载时间可能。它需要定义一个可变的变量来保存 RDD
引用,在给定时刻的活跃。在 dstream.foreachRDD
然后我们就可以检查时的瞬间RDD参考需要刷新。
This is possible by re-creating the external RDD at the time it needs to be reloaded. It requires defining a mutable variable to hold the RDD
reference that's active at a given moment in time. Within the dstream.foreachRDD
we can then check for the moment when the RDD reference needs to be refreshed.
这是一个关于如何将看起来像一个例子:
This is an example on how that would look like:
val stream:DStream[Int] = ??? //let's say that we have some DStream of Ints
// Some external data as an RDD of (x,x)
def externalData():RDD[(Int,Int)] = sparkContext.textFile(dataFile)
.flatMap{line => try { Some((line.toInt, line.toInt)) } catch {case ex:Throwable => None}}
.cache()
// this mutable var will hold the reference to the external data RDD
var cache:RDD[(Int,Int)] = externalData()
// force materialization - useful for experimenting, not needed in reality
cache.count()
// a var to count iterations -- use to trigger the reload in this example
var tick = 1
stream.foreachRDD{ rdd =>
if (tick % 5 == 0) { // will reload the RDD every 5 iterations
// unpersist the previous RDD, otherwise it will linger in memory, taking up resources.
cache.unpersist(false)
// generate a new RDD
cache = externalData()
}
// join the DStream RDD with our reference data, do something with it...
val matches = rdd.keyBy(identity).join(cache).count()
updateData(dataFile, (matches + 1).toInt) // so I'm adding data to the static file in order to see when the new records become alive
tick = (tick + 1) % 1000
}
streaming.start
previous来使用这一解决方案,我研究了在RDD的坚持
标志中发挥的可能性,但如预期没有奏效。貌似 unpersist()
不强迫当它再次使用了RDD重新物化。
Previous to come with this solution, I studied the possibility to play with the persist
flag in the RDD, but it didn't work as expected. Looks like unpersist()
does not force re-materialization of the RDD when it's used again.
这篇关于在火花流如何重新加载在n批次RDD查找非流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!