使用mapWithState Spark Streaming过滤部分重复项 [英] Filter partial duplicates with mapWithState Spark Streaming

查看:197
本文介绍了使用mapWithState Spark Streaming过滤部分重复项的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们有一个DStream,例如

We have a DStream, such as

val ssc = new StreamingContext(sc, Seconds(1))

val kS = KafkaUtils.createDirectStream[String, TMapRecord](
  ssc,
  PreferConsistent,
  Subscribe[String, TMapRecord](topicsSetT, kafkaParamsInT)).
  mapPartitions(part => {
    part.map(_.value())
  }).
  mapPartitions(part1 => {
    part1.map(c => {
      TMsg(1,
        c.field1,
        c.field2, //And others
        c.startTimeSeconds
      )
    })
  })

因此,每个RDD都有一堆TMsg对象,这些对象带有一些(技术上的)关键字段,我可以使用它们来删除DStream.基本上,如果我们在一个或两个离散的RDD中有两个TMsg对象,它们具有相同的field1field2,并且它们相差不到1秒(我们在startTimeSeconds上查看), 重复 .

So each RDD has a bunch of TMsg objects with some of the (technical) key fields I can use to dediplicate DStream. Basically, if we have two TMsg objects IN ONE OR TWO DISCRETIZED RDDs with same field1 and field2, and they differ by less than 1 second (we look on startTimeSeconds), it's a duplicate.

我查看了mapWithState. 是的,我可以创建K-> V DStream之类的

I looked over mapWithState. Yes I can create K -> V DStream like

val mappedStream = kS.map(m => (m.field1, m.field2) -> m.startTimeSeconds)

所以我可以使用该功能,但不了解如何使用它来过滤重复项.

So I can use the function but don't understand how I can use it to filter duplicates.

Window函数无济于事,我也不能使用(结构化流).deduplicate函数,因为解决方案是用DStreams编写的.

Window function can't help, and I can't use (structured stream).deduplicate function since solution is written in DStreams.

有解决方案吗?谢谢

P.S. Spark版本是2.2

P.S. Spark version is 2.2

推荐答案

您可以使用手动如何使用有状态流 . 在您的情况下,您可以:

You could use mapWithState. There is a good manual how to use Stateful Streaming. In your case you could:

1.设置检查点:

val ssc = new StreamingContext(sc, Seconds(1))
ssc.checkpoint("path/to/persistent/storage")

2.定义更新功能:

def update(key: (String, String),
           value: Option[Int],
           state: State[Int]): Option[((String, String), Int)] = {
  (value, state.getOption()) match {
    case (Some(_), Some(_)) => None
    case (Some(v), _) =>
      # you can update your state in any value you want
      # it is just a marker that value not new
      state.update(value.get)
      Option((key, v))
    case (_, _) if state.isTimingOut() => None
  }
}

3.制定州规格:

val stateSpec =
  StateSpec
    .function(update _)
    # it is important to define how long 
    # you want to check duplication
    # in this example check interval is 1 second.
    .timeout(Seconds(1))

4.使用它:

ks
  # make key->value pairs
  .map(m => (m.field1, m.field2) -> m.startTimeSeconds)
  .mapWithState(stateSpec)

如果您想取值的最后一个,则更新功能可能是:

if you want to take last of values instead, update function may be:

  def update(key: (String, String),
                       value: Option[Int],
                       state: State[Int]): Option[((String, String), Int)] = {
    (value, state.getOption()) match {
      case (Some(_), Some(_)) => None
      case (Some(v), _) =>
        state.update(value.get)
        None
      case (_, _) if state.isTimingOut() => Option((key, value.get))
    }
  }

这篇关于使用mapWithState Spark Streaming过滤部分重复项的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆