Spark流式处理:跨批次缓存DStream结果 [英] Spark streaming: Cache DStream results across batches

查看:170
本文介绍了Spark流式处理:跨批次缓存DStream结果的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用Spark流(1.6),我有一个文件流,用于读取批处理大小为2s的查找数据,但是文件仅每小时复制一次到该目录. 一旦有了一个新文件,它的内容就会被流读取,这就是我想要缓存到内存中并保存在那里的内容 直到读取新文件.
我想将这个数据集加入另一个流,因此我想缓存.

Using Spark streaming (1.6) I have a filestream for reading lookup data with 2s of batch size, however files are copyied to the directory only every hour.
Once there's a new file, its content is read by the stream, this is what I want to cache into memory and keep there until new files are read.
There's another stream to which I want to join this dataset therefore I'd like to cache.

这是>用于Spark流的批量查找数据的后续问题.
答案确实适用于updateStateByKey,但是我不知道如何处理KV对是 由于updateStateByKey中的值序列不断增长,因此从查找文件中删除了. 同样,任何使用mapWithState做到这一点的提示也将很棒.

This is a follow-up question of Batch lookup data for Spark streaming.
The answer does work fine with updateStateByKey however I don't know how to deal with cases when a KV pair is deleted from the lookup files, as the Sequence of values in updateStateByKey keeps growing. Also any hint how to do this with mapWithState would be great.

这是我到目前为止尝试过的,但是数据似乎并没有持久化:

This is what I tried so far, but the data doesn't seem to be persisted:

val dictionaryStream = ssc.textFileStream("/my/dir")
dictionaryStream.foreachRDD{x => 
  if (!x.partitions.isEmpty) {
    x.unpersist(true)
    x.persist()
  }
}

推荐答案

DStreams可以使用persist方法直接持久化,该方法可以持久化流中的每个RDD:

DStreams can be persisted directly using persist method which persist every RDD in the stream:

dictionaryStream.persist

根据官方文档这自动应用于

基于窗口的操作(如reduceByWindowreduceByKeyAndWindow)以及基于状态的操作(如updateStateByKey

window-based operations like reduceByWindow and reduceByKeyAndWindow and state-based operations like updateStateByKey

因此,在您的情况下,无需显式缓存.同样,也不需要手动持久.再次引用文档:

so there should be no need for explicit caching in your case. Also there is no need for manual unpersisting. To quote the docs once again:

默认情况下,将自动清除DStream转换生成的所有输入数据和持久性RDD

by default, all input data and persisted RDDs generated by DStream transformations are automatically cleared

,并且根据管道中使用的转换自动调整保留期.

and a retention period is tuned automatically based on the transformations which are used in the pipeline.

关于mapWithState,您必须提供一个StateSpec.一个最小的示例需要一个函数,该函数采用当前value和先前状态的keyOption.假设您有DStream[(String, Long)],并且想要记录到目前为止的最大值:

Regarding mapWithState you'll have to provide a StateSpec. A minimal example requires a functions which takes key, Option of current value and previous state. Lets say you have DStream[(String, Long)] and you want to record maximum value so far:

val state = StateSpec.function(
  (key: String, current: Option[Double], state: State[Double]) => {
    val max  = Math.max(
      current.getOrElse(Double.MinValue),
      state.getOption.getOrElse(Double.MinValue)
    )
    state.update(max)
    (key, max)
  }
)

val inputStream: DStream[(String, Double)] = ??? 
inputStream.mapWithState(state).print()

还可以提供初始状态,超时间隔并捕获当前批处理时间.后两个可用于对一段时间未更新的密钥实施删除策略.

It is also possible to provide initial state, timeout interval and capture current batch time. The last two can be used to implement removal strategy for the keys which haven't been update for some period of time.

这篇关于Spark流式处理:跨批次缓存DStream结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆