过滤器基于另一个RDD在星火 [英] Filter based on another RDD in Spark

查看:405
本文介绍了过滤器基于另一个RDD在星火的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想只保留其中确实有第二个表引用的DEPARTEMENT ID员工。

I would like to keep only the employees which does have a departement ID referenced in the second table.

Employee table
LastName    DepartmentID
Rafferty    31
Jones   33
Heisenberg  33
Robinson    34
Smith   34

Department table
DepartmentID
31  
33  

我曾尝试以下code不工作:

I have tried the following code which does not work:

employee = [['Raffery',31], ['Jones',33], ['Heisenberg',33], ['Robinson',34], ['Smith',34]]
department = [31,33]
employee = sc.parallelize(employee)
department = sc.parallelize(department)
employee.filter(lambda e: e[1] in department).collect()

Py4JError: An error occurred while calling o344.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist

任何想法?我使用的Spark 1.1.0与Python。不过,我会接受的Scala或Python的答案。

Any ideas? I am using Spark 1.1.0 with Python. However, I would accept a Scala or Python answer.

推荐答案

在这种情况下,你想实现什么是包含在部门表中的数据的每个分区进行过滤:
这将是基本解决:

In this case, what you would like to achieve is to filter at each partition with the data contained in the department table: This would be the basic solution:

val dept = deptRdd.collect.toSet
val employeesWithValidDeptRdd = employeesRdd.filter{case (employee, d) => dept.contains(d)}

如果您的部门数据量大,广播的变量将会被一次传送数据到所有节点,而不是具有每个任务序列化提高性能。

If your department data is large, a broadcast variable will improve performance by delivering the data once to all the nodes instead of having to serialize it with each task

val deptBC = sc.broadcast(deptRdd.collect.toSet)
val employeesWithValidDeptRdd = employeesRdd.filter{case (employee, d) => deptBC.value.contains(d)}

虽然使用连接会的工作,这是一个非常昂贵的解决方案,因为它需要的数据(byKey)的分布式洗牌,实现联接。鉴于该要求是一个简单的过滤器,将数据发送到每个分区(如上所示)将提供更好的性能。

Although using join would work, it's a very expensive solution as it will require a distributed shuffle of the data (byKey) to achieve the join. Given that the requirement is a simple filter, sending the data to each partition (as shown above) will provide much better performance.

这篇关于过滤器基于另一个RDD在星火的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆