如何在pyspark中将DataFrame转换回正常的RDD? [英] How to convert a DataFrame back to normal RDD in pyspark?

查看:65
本文介绍了如何在pyspark中将DataFrame转换回正常的RDD?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要使用

(rdd.)partitionBy(npartitions, custom_partitioner)

DataFrame 上不可用的方法.所有 DataFrame 方法都只引用 DataFrame 结果.那么如何从DataFrame数据创建一个RDD呢?

method that is not available on the DataFrame. All of the DataFrame methods refer only to DataFrame results. So then how to create an RDD from the DataFrame data?

注意:这是对 1.2.0 的更改(在 1.3.0 中).

Note: this is a change (in 1.3.0) from 1.2.0.

更新来自@dpangmao的回答:方法是.rdd.我有兴趣了解 (a) 是否公开以及 (b) 对性能有何影响.

Update from the answer from @dpangmao: the method is .rdd. I was interested to understand if (a) it were public and (b) what are the performance implications.

好吧 (a) 是肯定的和 (b) - 好吧,您可以在这里看到有重要的性能影响:必须通过调用 ma​​pPartitions 来创建新的 RDD:

Well (a) is yes and (b) - well you can see here that there are significant perf implications: a new RDD must be created by invoking mapPartitions :

dataframe.py 中(注意文件名也改变了(是 sql.py):

In dataframe.py (note the file name changed as well (was sql.py):

@property
def rdd(self):
    """
    Return the content of the :class:`DataFrame` as an :class:`RDD`
    of :class:`Row` s.
    """
    if not hasattr(self, '_lazy_rdd'):
        jrdd = self._jdf.javaToPython()
        rdd = RDD(jrdd, self.sql_ctx._sc, BatchedSerializer(PickleSerializer()))
        schema = self.schema

        def applySchema(it):
            cls = _create_cls(schema)
            return itertools.imap(cls, it)

        self._lazy_rdd = rdd.mapPartitions(applySchema)

    return self._lazy_rdd

推荐答案

@dapangmao 的回答有效,但它没有给出常规的 spark RDD,它返回一个 Row 对象.如果你想拥有常规的 RDD 格式.

@dapangmao's answer works, but it doesn't give the regular spark RDD, it returns a Row object. If you want to have the regular RDD format.

试试这个:

rdd = df.rdd.map(tuple)

rdd = df.rdd.map(list)

这篇关于如何在pyspark中将DataFrame转换回正常的RDD?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆