在星火RDD懒惰的foreach [英] Lazy foreach on a Spark RDD

查看:192
本文介绍了在星火RDD懒惰的foreach的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个字符串的一大RDD(通过几个工会获得 sc.textFile(...))

我现在要查找在RDD给定的字符串,我想要搜索时停止足够好的比赛已经找到。

我可以改造的foreach 过滤器地图用于此目的,但所有这些都将通过在该RDD每个元素迭代,无论匹配是否已到达

有没有办法短路这个过程中,避免在整个迭代RDD?


解决方案

  

我可以改造的foreach,或过滤器,或映射用于此目的,但所有这些都将通过在该RDD每个元素迭代


其实,你错了。星火引擎是足够聪明,如果你限制的结果,优化计算(使用第一

 导入numpy的是NP
从__future__进口print_functionnp.random.seed(323)ACC = sc.accumulator(0)高清good_enough(X,阈值= 7000):
    全球ACC
    ACC + = 1
    返回X>阈RDD = sc.parallelize(np.random.randint(0,10000)为我的xrange(1000000))X = rdd.filter(good_enough)。首先()

现在,让我们检查ACCUM:

 >>>打印(经过{0}物品,发现{1}。格式(acc.value,X))
经过6个项目,发现7109

和公正,以确保如果一切正常:

  ACC = sc.accumulator(0)
rdd.filter(波长X:good_enough(X,100000))。取(1)
断言acc.value == rdd.count()

同样的事情可以使用的数据帧和UDF进行,可能以更有效的方式进行。

注意:在某些情况下,甚至有可能在星火使用一个无限序列,仍然可以得到一个结果。您可以检查我的回答一个例子href=\"http://stackoverflow.com/q/31220322/1560062\">星火FlatMap功能。

I have a big RDD of Strings (obtained through a union of several sc.textFile(...)).

I now want to search for a given string in that RDD, and I want the search to stop when a "good enough" match has been found.

I could retrofit foreach, or filter, or mapfor this purpose, but all of these will iterate through every element in that RDD, regardless of whether the match has been reached.

Is there a way to short-circuit this process and avoid iterating through the whole RDD?

解决方案

I could retrofit foreach, or filter, or map for this purpose, but all of these will iterate through every element in that RDD

Actually, you're wrong. Spark engine is smart enough to optimize computations if you limit the results (using take or first):

import numpy as np
from __future__ import print_function

np.random.seed(323)

acc = sc.accumulator(0)

def good_enough(x, threshold=7000):
    global acc
    acc += 1
    return x > threshold

rdd = sc.parallelize(np.random.randint(0, 10000) for i in xrange(1000000))

x = rdd.filter(good_enough).first()

Now lets check accum:

>>> print("Checked {0} items, found {1}".format(acc.value, x))
Checked 6 items, found 7109

and just to be sure if everything works as expected:

acc = sc.accumulator(0)
rdd.filter(lambda x: good_enough(x, 100000)).take(1)
assert acc.value == rdd.count()

Same thing could be done, probably in a more efficient manner using data frames and udf.

Note: In some cases it is even possible to use an infinite sequence in Spark and still get a result. You can check my answer to Spark FlatMap function for huge lists for an example.

这篇关于在星火RDD懒惰的foreach的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆