在火花流如何重新加载在n批次RDD查找非流 [英] In spark Streaming how to reload a lookup non stream rdd after n batches

查看:273
本文介绍了在火花流如何重新加载在n批次RDD查找非流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有做很多的步骤,然后在最后的微批次看的或加入到ploaded RDD一个$ P $的流媒体内容。我必须刷新ploaded RDD每12小时即$ P $。我怎样才能做到这一点。什么我做这并不涉及到流上下文不重播我的理解,我怎么得到这个所谓的形式流RDD之一。我需要做只有一个呼叫非物质流DSTREAM多少分区有

Suppose i have a streaming context which does lot of steps and then at the end the micro batch look's up or joins to a preloaded RDD. I have to refresh that preloaded RDD every 12 hours . how can i do this. Anything i do which does not relate to streaming context is not replayed to my understanding, how i get this called form one of the streaming RDD. I need to make only one call non matter how many partition the streaming dstream has

推荐答案

这是通过重新创建外部RDD它需要重新加载时间可能。它需要定义一个可变的变量来保存 RDD 引用,在给定时刻的活跃。在 dstream.foreachRDD 然后我们就可以检查时的瞬间RDD参考需要刷新。

This is possible by re-creating the external RDD at the time it needs to be reloaded. It requires defining a mutable variable to hold the RDD reference that's active at a given moment in time. Within the dstream.foreachRDD we can then check for the moment when the RDD reference needs to be refreshed.

这是一个关于如何将看起来像一个例子:

This is an example on how that would look like:

val stream:DStream[Int] = ??? //let's say that we have some DStream of Ints

// Some external data as an RDD of (x,x)
def externalData():RDD[(Int,Int)] = sparkContext.textFile(dataFile)
   .flatMap{line => try { Some((line.toInt, line.toInt)) } catch {case ex:Throwable => None}}
   .cache()

// this mutable var will hold the reference to the external data RDD
var cache:RDD[(Int,Int)] = externalData() 
// force materialization - useful for experimenting, not needed in reality
cache.count()
// a var to count iterations -- use to trigger the reload in this example 
var tick = 1  

stream.foreachRDD{ rdd => 

    if (tick % 5 == 0) { // will reload the RDD every 5 iterations
     // unpersist the previous RDD, otherwise it will linger in memory, taking up resources.
     cache.unpersist(false)
     // generate a new RDD  
     cache = externalData() 
    }
    // join the DStream RDD with our reference data, do something with it...
    val matches = rdd.keyBy(identity).join(cache).count() 
    updateData(dataFile, (matches + 1).toInt) // so I'm adding data to the static file in order to see when the new records become alive
    tick = (tick + 1) % 1000 
}
streaming.start

previous来使用这一解决方案,我研究了在RDD的坚持标志中发挥的可能性,但如预期没有奏效。貌似 unpersist()不强迫当它再次使用了RDD重新物化。

Previous to come with this solution, I studied the possibility to play with the persist flag in the RDD, but it didn't work as expected. Looks like unpersist() does not force re-materialization of the RDD when it's used again.

这篇关于在火花流如何重新加载在n批次RDD查找非流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆