星火 - 嵌套RDD操作 [英] Spark - Nested RDD Operation

查看:1852
本文介绍了星火 - 嵌套RDD操作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个RDDS说

   rdd1 = 
id            | created     | destroyed | price   
1            | 1            | 2            | 10        
2            | 1            | 5            | 11       
3            | 2            | 3            | 11        
4            | 3            | 4            | 12        
5            | 3            | 5            | 11       

rdd2 =

[1,2,3,4,5] # lets call these value as timestamps (ts)

RDD2使用范围(intial_value,END_VALUE,间隔)基本上产生。在这里PARAMS可以改变。的大小可以是相同或不同RDD1集。这个想法是从RDD1集记录提取到基于RDD2使用过滤criertia值RDD2(从RDD1集记录可以同时获取,你可以在输出中看到重复)

rdd2 is basically generated using range(intial_value, end_value, interval). The params here can vary. The size can be same or different to rdd1. The idea is to fetch records from rdd1 into rdd2 based on the values of rdd2 using a filtering criertia(records from rdd1 can repeat while fetching as you can see in output)

rdd1.created过滤标准< = TS< rdd1.destroyed)

filtering criteria rdd1.created <= ts < rdd1.destroyed)

期望的输出:

ts             | prices  
1              | 10,11       # i.e. for ids 1,2 of rdd1      
2              | 11,11       # ids 2,3
3              | 11,12,11    # ids 2,4,5 
4              | 11,11       # ids 2,5

现在我想基于一些条件其中的过滤RDD1集使用RDD2的钥匙。 (如上所述)并返回其加入RDD2的RDD1集的键和过滤的结果的结果。

Now I want to filter RDD1 based on some condition which uses the keys of RDD2. (described above) And returns the results which joins the keys of RDD2 and filtered results of RDD1

所以,我做的:

rdd2.map(lambda x : somefilterfunction(x, rdd1))  

def somefilterfunction(x, rdd1):
    filtered_rdd1 = rdd1.filter(rdd1[1] <= x).filter(rdd1[2] > x)
    prices = filtered_rdd1.map(lambda x : x[3])
    res = prices.collect()
    return (x, list(res))

和我得到:

例外:看来您正在尝试播放的RDD或
  从动作或转换引​​用一个RDD。 RDD转换
  和动作可以只由司机被调用,而不是内部的其他的
  转换;例如,rdd1.map(波长X:rdd2.values​​.count()*
  x)是无效的,因为价值观转变和计数行动
  不能在rdd1.map改造中执行。欲了解更多
  信息,请参阅SPARK-5063。

Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

我试着使用GROUPBY,但由于这里RDD1集的元素可以被一次又一次重复的比较分组我的理解会俱乐部RDD1集的每一个元素在某些特定插槽只有一次。

I tried using groupBy , but since here elements of rdd1 can be repeated again and again as compared to grouping which I understand would club each element of rdd1 in some particular slot just once.

的唯一方法是现在使用正常的循环,并做过滤,加入最后的一切。

The only way is now to use a normal for loop and do the filtering and join everything in the end.

有什么建议?

推荐答案

由于您使用常规的范围内,没有理由在所有创建第二个RDD。可以简单地在特定范围内的每个记录生成的值:

Since you use regular range there is no reason to create a second RDD at all. You can simply generate values in a specific range for each record:

from __future__ import division # Required only for Python 2.x
from math import ceil
from itertools import takewhile

rdd1 = sc.parallelize([
    (1, 1, 2, 10),        
    (2, 1, 5, 11),       
    (3, 2, 3, 11),        
    (4, 3, 4, 12),        
    (5, 3, 5, 11),  
])


def generate(start, end, step):
    def _generate(id, created, destroyed, price):
        # Smallest ts >= created
        start_for_record = int(ceil((created - start) / step) * step + start)
        rng = takewhile(
            lambda x: created <= x < destroyed,
            xrange(start_for_record, end, step)) # In Python 3.x use range
        for i in rng:
            yield i, price

    return _generate

result = rdd1.flatMap(lambda x: generate(1, 6, 1)(*x)).groupByKey()

和结果是:

result.mapValues(list).collect()

## [(1, [10, 11]), (2, [11, 11]), (3, [11, 12, 11]), (4, [11, 11])]

这篇关于星火 - 嵌套RDD操作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆