分区RDD进长度为n的元组 [英] Partition RDD into tuples of length n

查看:687
本文介绍了分区RDD进长度为n的元组的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是比较新的Apache的Spark和Python和不知道是否像我所要描述是可行的?

我有以下形式的RDD [M 1 ,男 2 ,男 3 ,男 4 ,男 5 ,男 6 .......米<子> N ](你得到这个当您运行rdd.collect()) 。我在想,如果有可能这个RDD转换成表格[另一RDD(M 1 ,男 2 ,男 3 ), (M 4 ,男 5 ,男 6 ).....(M N-2 ,男<子> N-1 ,男<子> N )。内的元组应大小为k。如果n不用k除尽,那么元组中的一个应该比k个元素少

我尝试使用地图功能,但无法获得所需的输出。看来,地图功能只能与相同数量的元素,最初提供的RDD返回一个RDD。

更新:我尝试使用分区,也能得到它的工作。

  rdd.map(拉姆达L:(1,1))。partitionBy(INT(N / K))据为己有()图(拉姆达LL:[X [0]对于x在11])


解决方案

Olologin的回答几乎都有,但我相信你正在尝试做的是组你RDD到3元组,而不是分组的RDD分成3组的元组。要做到前者,请尝试以下操作:

  RDD = sc.parallelize([E1,E2,E3,E4,E5,E6,E7,E8 E9,E10])
变换= rdd.zipWithIndex()GROUPBY(拉姆达(_,ⅰ):ⅰ/ 3)。
                 .MAP(拉姆达(_,列表):元组([ELEM [0]在列表ELEM))

当在pyspark跑,我得到如下:

 &GT;&GT;&GT;从__future__进口print_function
&GT;&GT;&GT; RDD = sc.parallelize([E1,E2,E3,E4,E5,e6的,e7中,E8,E9,E10])
&GT;&GT;&GT;变换= rdd.zipWithIndex()GROUPBY(拉姆达(_,ⅰ):ⅰ/ 3)。.MAP(拉姆达(_,列表):元组([ELEM [0]在表ELEM]))
&GT;&GT;&GT; transformed.foreach(打印)
...
(E4,E5,E6)
('E10',)
('E7','E8','E9')
(E1,E2,E3)

I am relatively new to Apache Spark and Python and was wondering if something like what I am going to describe was doable?

I have a RDD of the form [m1, m2, m3, m4, m5, m6.......mn] (you get this when you run rdd.collect()). I was wondering if it was possible to transform this RDD into another RDD of the form [(m1, m2, m3), (m4, m5, m6).....(mn-2, mn-1, mn)]. The inner tuples should be of size k. If n is not divisible by k, then one of the tuples should have less than k elements.

I tried using the map function but was not able to get the desired output. It seems that the map function can only return a RDD with same number of elements as the RDD that was initially provided.

UPDATE : I tried using partitions and was also able to get it to work.

rdd.map(lambda l: (l, l)).partitionBy(int(n/k)).glom().map(lambda ll: [x[0] for x in ll])

解决方案

Olologin's answer almost has it but I believe what you are trying to do is group your RDD into 3-tuples instead of grouping your RDD into 3 groups of tuples. To do the former, try the following:

rdd = sc.parallelize(["e1", "e2", "e3", "e4", "e5", "e6", "e7", "e8", "e9", "e10"])
transformed = rdd.zipWithIndex().groupBy(lambda (_, i): i / 3)
                 .map(lambda (_, list): tuple([elem[0] for elem in list]))

When run in pyspark, I get the following:

>>> from __future__ import print_function    
>>> rdd = sc.parallelize(["e1", "e2", "e3", "e4", "e5", "e6", "e7", "e8", "e9", "e10"])
>>> transformed = rdd.zipWithIndex().groupBy(lambda (_, i): i / 3).map(lambda (_, list): tuple([elem[0] for elem in list]))
>>> transformed.foreach(print)
...
('e4', 'e5', 'e6')
('e10',)
('e7', 'e8', 'e9')
('e1', 'e2', 'e3')

这篇关于分区RDD进长度为n的元组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆