如何星火执行联接+过滤器?它是可扩展性? [英] How does Spark execute a join + filter? Is it scalable?

查看:135
本文介绍了如何星火执行联接+过滤器?它是可扩展性?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

说我有两个大RDD的,A和B,包含键 - 值对。我想用钥匙加入A和B,但对(A,B)那场比赛,我只想要好的人一小部分。所以,我的加入和事后应用过滤器:

  A.join(B).filter(isGoodPair)

其中, isGoodPair <​​/ code>是告诉我,如果一对(A,B)是好还是不好布尔函数。

有关这很好地扩展,星火的调度将理想避免形成所有对A.join(B)明确。即使是一个广泛分布的基础上,这可能会导致耗时的磁盘泄漏,甚至耗尽在某些节点上的所有内存和磁盘资源。为了避免这种情况,火花应适用的滤波器作为对(A,B)的各分区中生成的

我的问题:


  1. 实际上是否星火做到这一点?

  2. 及其架构方面实现什么或prevent所期望的行为?

  3. 我应该使用也许协同组呢?在PySpark它返回一个迭代器,所以我可以申请我的过滤器迭代器,对吧?


解决方案

我在PySpark壳(运行星火1.2.1)运行的实验来回答这些问题。结论如下:


  1. 不幸的是,星火做的的适用,对通过联接产生的过滤器。它明确地在进行筛选之前生成一整套加入对的。

  2. 这可能是由于火花运行RDD变换一个在一次一。它一般不能够执行这种微妙链的优化。

  3. 将使用协同组而不是加入,我们可以手动执行所需的优化。

实验

我由含有RDD 100组,每组含整数1〜10000,和各组我数是最多1相距的整数数量:

 导入和itertools,因为它
组G = INT(1E2)#号
各组整数N = INT(1E4)#号
nPart = 32#标准分区:8个内核,每个内核4个分区
A = sc.parallelize(名单(it.product(的xrange(G)的xrange(n))的),nPart)DEF joinAndFilter(A):
    返回A.join(A)的.filter(拉姆达(K,(X1,X2)):ABS(X1 - ×2)所述; = 1)DEF cogroupAndFilter(A):
    高清乐趣(XS):
        K,(XS1,XS2)= XS
        返回[(X1,X2)在it.product(XS1,XS2)为(X1,X2)如ABS(X1 - ×2)所述; = 1]
    返回A.cogroup(A).flatMap(有趣)cogroupAndFilter(A).Count之间的()
joinAndFilter(A).Count之间的()

我没有一个简单的方法来分析的code,所以我只是看着它在我的Mac活动监视器运行:

内存使用量飙升,当我用 joinAndFilter ,presumably因为它应用了接一个滤波器之前产生的所有对大牌。其实我有杀PySpark,因为它是通过我所有的记忆吹即将导致系统崩溃。随着 cogroupAndFilter 中,对被过滤,因为它们产生的,所以内存保持在控制之下。

Say I have two large RDD's, A and B, containing key-value pairs. I want to join A and B using the key, but of the pairs (a,b) that match, I only want a tiny fraction of "good" ones. So I do the join and apply a filter afterwards:

A.join(B).filter(isGoodPair)

where isGoodPair is a boolean function that tells me if a pair (a,b) is good or not.

For this to scale well, Spark's scheduler would ideally avoid forming all pairs in A.join(B) explicitly. Even on a massively distributed basis, this could cause time-consuming disk spills, or even exhaust all memory and disk resources on some nodes. To avoid this, Spark should apply the filter as the pairs (a,b) are generated within each partition.

My questions:

  1. Does Spark actually do this?
  2. What aspects of its architecture enable or prevent the desired behavior?
  3. Should I maybe use cogroup instead? In PySpark it returns an iterator, so I can just apply my filter to the iterator, right?

解决方案

I ran an experiment in the PySpark shell (running Spark 1.2.1) to answer these questions. The conclusions are the following:

  1. Unfortunately, Spark does not apply the filter as pairs are generated by the join. It generates the entire set of join pairs explicitly before proceeding to filter them.
  2. This is probably because Spark runs RDD transformations one-at-a-time. It is generally not capable of performing this kind of subtle chaining optimization.
  3. By using cogroup instead of join, we can manually implement the desired optimization.

Experiment

I made an RDD containing 100 groups, each containing the integers 1 to 10,000, and in each group I counted the number of integers that are at most 1 apart:

import itertools as it
g = int(1e2) # number of groups
n = int(1e4) # number of integers in each group
nPart = 32 # standard partitioning: 8 cores, 4 partitions per core
A = sc.parallelize(list(it.product(xrange(g),xrange(n))),nPart) 

def joinAndFilter(A):
    return A.join(A).filter(lambda (k,(x1,x2)): abs(x1 - x2) <= 1)

def cogroupAndFilter(A):
    def fun(xs):
        k,(xs1,xs2) = xs
        return [(x1,x2) for (x1,x2) in it.product(xs1,xs2) if abs(x1 - x2) <= 1]
    return A.cogroup(A).flatMap(fun)

cogroupAndFilter(A).count()
joinAndFilter(A).count() 

I didn't have an easy way to profile the code, so I just watched it run on my mac in Activity Monitor:

Memory usage spiked big-time when I used joinAndFilter, presumably because it's generating all the pairs before applying the off-by-one filter. I actually had to kill PySpark because it was blowing through all my memory and about to crash the system. With cogroupAndFilter, the pairs are filtered as they are generated, so memory stays under control.

这篇关于如何星火执行联接+过滤器?它是可扩展性?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆