将 RDD 划分为长度为 n 的元组 [英] Partition RDD into tuples of length n

查看:22
本文介绍了将 RDD 划分为长度为 n 的元组的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对 Apache Spark 和 Python 比较陌生,想知道我要描述的内容是否可行?

我有一个 [m1, m2, m3, m4 形式的 RDD, m5, m6.......mn](你在运行 rdd.collect() 时得到这个).我想知道是否有可能将这个 RDD 转换为另一个形式为 [(m1, m2, m3),(m4, m5, m6).....(mn-2, mn-1, mn)].内部元组的大小应为 k.如果 n 不能被 k 整除,那么其中一个元组应该少于 k 个元素.

我尝试使用 map 函数,但无法获得所需的输出.似乎map函数只能返回一个元素数量与最初提供的RDD相同的RDD.

更新:我尝试使用分区并且也能够让它工作.

rdd.map(lambda l: (l, l)).partitionBy(int(n/k)).glom().map(lambda ll: [x[0] for x in ll])

解决方案

Olologin 的回答几乎已经有了,但我相信您要做的是将 RDD 分组为 3 元组,而不是将 RDD 分组为 3 组元组.要做到前者,请尝试以下操作:

rdd = sc.parallelize(["e1", "e2", "e3", "e4", "e5", "e6", "e7", "e8", "e9", "e10"])转换 = rdd.zipWithIndex().groupBy(lambda (_, i): i/3).map(lambda (_, list): tuple([elem[0] for elem in list]))

在 pyspark 中运行时,我得到以下信息:

<预><代码>>>>从 __future__ 导入 print_function>>>rdd = sc.parallelize(["e1", "e2", "e3", "e4", "e5", "e6", "e7", "e8", "e9", "e10"])>>>转换 = rdd.zipWithIndex().groupBy(lambda (_, i): i/3).map(lambda (_, list): tuple([elem[0] for elem in list]))>>>转换.foreach(打印)...('e4', 'e5', 'e6')('e10',)('e7', 'e8', 'e9')('e1', 'e2', 'e3')

I am relatively new to Apache Spark and Python and was wondering if something like what I am going to describe was doable?

I have a RDD of the form [m1, m2, m3, m4, m5, m6.......mn] (you get this when you run rdd.collect()). I was wondering if it was possible to transform this RDD into another RDD of the form [(m1, m2, m3), (m4, m5, m6).....(mn-2, mn-1, mn)]. The inner tuples should be of size k. If n is not divisible by k, then one of the tuples should have less than k elements.

I tried using the map function but was not able to get the desired output. It seems that the map function can only return a RDD with same number of elements as the RDD that was initially provided.

UPDATE : I tried using partitions and was also able to get it to work.

rdd.map(lambda l: (l, l)).partitionBy(int(n/k)).glom().map(lambda ll: [x[0] for x in ll])

解决方案

Olologin's answer almost has it but I believe what you are trying to do is group your RDD into 3-tuples instead of grouping your RDD into 3 groups of tuples. To do the former, try the following:

rdd = sc.parallelize(["e1", "e2", "e3", "e4", "e5", "e6", "e7", "e8", "e9", "e10"])
transformed = rdd.zipWithIndex().groupBy(lambda (_, i): i / 3)
                 .map(lambda (_, list): tuple([elem[0] for elem in list]))

When run in pyspark, I get the following:

>>> from __future__ import print_function    
>>> rdd = sc.parallelize(["e1", "e2", "e3", "e4", "e5", "e6", "e7", "e8", "e9", "e10"])
>>> transformed = rdd.zipWithIndex().groupBy(lambda (_, i): i / 3).map(lambda (_, list): tuple([elem[0] for elem in list]))
>>> transformed.foreach(print)
...
('e4', 'e5', 'e6')
('e10',)
('e7', 'e8', 'e9')
('e1', 'e2', 'e3')

这篇关于将 RDD 划分为长度为 n 的元组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆