如何在Spark Streaming中定期更新rdd [英] How to update rdd periodically in spark streaming

查看:92
本文介绍了如何在Spark Streaming中定期更新rdd的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的代码如下:

sc = SparkContext()
ssc = StreamingContext(sc, 30)

initRDD = sc.parallelize('path_to_data')
lines = ssc.socketTextStream('localhost', 9999)
res = lines.transform(lambda x: x.join(initRDD))

res.pprint()

我的问题是 initRDD 需要每天每天午夜进行更新.

And my question is that initRDD need to be updated every day in midnight.

我尝试这种方式:

sc = SparkContext()
ssc = StreamingContext(sc, 30)

lines = ssc.socketTextStream('localhost', 9999)


def func(rdd):
    initRDD = rdd.context.parallelize('path_to_data')
    return rdd.join(initRDD)


res = lines.transform(func)

res.pprint()

但是似乎 initRDD 将每30秒更新一次,与 batchDuration

But it seems that initRDD will be updated per 30s which same to batchDuration

有什么理想的

推荐答案

一种选择是在 transform 之前检查截止日期.该检查是一个简单的比较,因此在每个批次间隔进行检查都很便宜:

One option would be to check for a deadline before the transform. The check is a simple comparison and hence cheap to do at each batch interval:

def nextDeadline() : Long = {
  // assumes midnight on UTC timezone.
  LocalDate.now.atStartOfDay().plusDays(1).toInstant(ZoneOffset.UTC).toEpochMilli()
}
// Note this is a mutable variable!
var initRDD = sparkSession.read.parquet("/tmp/learningsparkstreaming/sensor-records.parquet")
// Note this is a mutable variable!
var _nextDeadline = nextDeadline()

val lines = ssc.socketTextStream("localhost", 9999)
// we use the foreachRDD as a scheduling trigger. 
// We don't use the data, only the execution hook
lines.foreachRDD{ _ => 
    if (System.currentTimeMillis > _nextDeadline) {
      initRDD = sparkSession.read.parquet("/tmp/learningsparkstreaming/sensor-records.parquet")
      _nextDeadline = nextDeadline()
    }
}
// if the rdd was updated, it will be picked up in this stage.
val res = lines.transform(rdd => rdd.join(initRDD))

这篇关于如何在Spark Streaming中定期更新rdd的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆