如何在Scala中使用Spark RDD删除重复项(更多为基于多个属性的过滤器)? [英] How to remove duplicates (more like filter based on multiple properties) with Spark RDD in Scala?

查看:1616
本文介绍了如何在Scala中使用Spark RDD删除重复项(更多为基于多个属性的过滤器)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

作为一项政策,我们不更新我们的文档,但是我们使用更新的值重新创建。当我处理这些事件时,我只想保留更新的内容,所以我想根据多个值过滤RDD中的项目。例如,说一个项目是:

As a policy, we do not update our documents, but we recreate with updated values. When I will process the events, I would like to keep only the updated ones, so I would like to filter items out of my RDD based on multiple values. For instance, say an item would be:

{
    "name": "Sample",
    "someId": "123",
    "createdAt": "2016-09-21T02:16:32+00:00"
}

,何时更新:

{
    "name": "Sample-Updated",
    "someId": "123", # This remains the same
    "createdAt": "2016-09-21T03:16:32+00:00" # This is greater than the one of above, since the update operation is done after the document is generated
}

我一直在做的是:

items = items.toList.
      .sortBy(_.createdAt).reverse

    items = items
      .groupBy(_.someId)
      .map(_._2.head)(breakOut)

但这显然将RDD转换为列表;火花结束如何实现?

but this obviously converts RDD into a list; end of Spark. How do I achieve this?

更新

到目前为止,这是通过查看评论,但没有运气添加到集合:

So far, I have achieved this by looking into comments, but no luck when adding into set:

// Is this correct? (1)
val initialSet = sc.parallelize(List[(String, Event)]())

val addToSet = (eventSet: RDD[(String, Event)],
                event: Event) => {
    // What to do here? (2)
}

// Is this correct? (3)
val mergeSets = (p1: RDD[(String, Event)],
                 p2: RDD[(String, Event)]) => p1.union(p2)

// resultSet is of type RDD[(String, RDD[(String, Event)])]. How to get it as RDD[(String, Event)]? (4)
val resultSet = initialSet.aggregateByKey(initialSet)(addToSet, mergeSets)


推荐答案

你应该可以在这里使用 reduceByKey

You should be able to use reduceByKey here:

rdd
  .keyBy(_.someId)
  .reduceByKey((x, y) => if (x.createdAt > y.createdAt) x else y)
  .values

其中初始 keyBy 创建(id,object) reduceByKey 选择最近的对象, drop keys。

where initial keyBy creates (id, object), reduceByKey selects the most recent object, and values drops keys.

这篇关于如何在Scala中使用Spark RDD删除重复项(更多为基于多个属性的过滤器)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆