如何在pyspark中将DataFrame转换回普通RDD? [英] How to convert a DataFrame back to normal RDD in pyspark?

查看:170
本文介绍了如何在pyspark中将DataFrame转换回普通RDD?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要使用

(rdd.)partitionBy(npartitions, custom_partitioner)

DataFrame上不可用的

方法.所有DataFrame方法仅引用DataFrame结果.那么如何从DataFrame数据创建RDD?

method that is not available on the DataFrame. All of the DataFrame methods refer only to DataFrame results. So then how to create an RDD from the DataFrame data?

注意:这是对1.2.0的更改(在1.3.0中).

Note: this is a change (in 1.3.0) from 1.2.0.

更新:方法为.rdd.我很想了解(a)是否公开,以及(b)对性能的影响.

Update from the answer from @dpangmao: the method is .rdd. I was interested to understand if (a) it were public and (b) what are the performance implications.

(a)是,(b)-好,您会在这里看到明显的性能影响:必须通过调用 mapPartitions 创建新的RDD:

Well (a) is yes and (b) - well you can see here that there are significant perf implications: a new RDD must be created by invoking mapPartitions :

dataframe.py 中(请注意,文件名也已更改(以前是sql.py):

In dataframe.py (note the file name changed as well (was sql.py):

@property
def rdd(self):
    """
    Return the content of the :class:`DataFrame` as an :class:`RDD`
    of :class:`Row` s.
    """
    if not hasattr(self, '_lazy_rdd'):
        jrdd = self._jdf.javaToPython()
        rdd = RDD(jrdd, self.sql_ctx._sc, BatchedSerializer(PickleSerializer()))
        schema = self.schema

        def applySchema(it):
            cls = _create_cls(schema)
            return itertools.imap(cls, it)

        self._lazy_rdd = rdd.mapPartitions(applySchema)

    return self._lazy_rdd

推荐答案

@dapangmao的答案有效,但是它没有给出常规的火花RDD,而是返回Row对象.如果要使用常规的RDD格式.

@dapangmao's answer works, but it doesn't give the regular spark RDD, it returns a Row object. If you want to have the regular RDD format.

尝试一下:

rdd = df.rdd.map(tuple)

rdd = df.rdd.map(list)

这篇关于如何在pyspark中将DataFrame转换回普通RDD?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆