如何在不使用 Pyspark 中的 collect() 方法的情况下将 pyspark.rdd.PipelinedRDD 转换为数据帧? [英] How to convert pyspark.rdd.PipelinedRDD to Data frame with out using collect() method in Pyspark?

查看:23
本文介绍了如何在不使用 Pyspark 中的 collect() 方法的情况下将 pyspark.rdd.PipelinedRDD 转换为数据帧?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有 pyspark.rdd.PipelinedRDD (Rdd1).当我在做 Rdd1.collect() 时,它给出如下结果.

I have pyspark.rdd.PipelinedRDD (Rdd1). when I am doing Rdd1.collect(),it is giving result like below.

 [(10, {3: 3.616726727464709, 4: 2.9996439803387602, 5: 1.6767412921625855}),
 (1, {3: 2.016527311459324, 4: -1.5271512313750577, 5: 1.9665475696370045}),
 (2, {3: 6.230272144805092, 4: 4.033642544526678, 5: 3.1517805604906313}),
 (3, {3: -0.3924680103722977, 4: 2.9757316477407443, 5: -1.5689126834176417})]

现在我想使用 collect() 方法将 pyspark.rdd.PipelinedRDD 转换为数据帧

Now I want to convert pyspark.rdd.PipelinedRDD to Data frame with out using collect() method

我的最终数据框应如下所示.df.show() 应如下所示:

My final data frame should be like below.df.show() should be like:

+----------+-------+-------------------+
|CId       |IID    |Score              |
+----------+-------+-------------------+
|10        |4      |2.9996439803387602 |
|10        |5      |1.6767412921625855 |
|10        |3      |3.616726727464709  |
|1         |4      |-1.5271512313750577|
|1         |5      |1.9665475696370045 |
|1         |3      |2.016527311459324  |
|2         |4      |4.033642544526678  |
|2         |5      |3.1517805604906313 |
|2         |3      |6.230272144805092  |
|3         |4      |2.9757316477407443 |
|3         |5      |-1.5689126834176417|
|3         |3      |-0.3924680103722977|
+----------+-------+-------------------+

我可以通过应用 collect() 、迭代和最后的数据帧来实现转换为 rdd .

I can achieve this converting to rdd next applying collect() ,iteration and finally Data frame.

但现在我想不使用任何 collect() 方法将 pyspark.rdd.PipelinedRDD (RDD1) 转换为数据帧.

but now I want to convert pyspark.rdd.PipelinedRDD (RDD1) to Data frame with out using any collect() method.

请告诉我如何实现这一目标?

please let me know how to achieve this?

推荐答案

在这里你想做两件事:1. 扁平化你的数据2. 放入数据框

You want to do two things here: 1. flatten your data 2. put it into a dataframe

一种方法如下:

首先,让我们扁平化字典:

First, let us flatten the dictionary:

rdd2 = Rdd1.flatMapValues(lambda x : [ (k, x[k]) for k in x.keys()])

收集数据时,您会得到如下信息:

When collecting the data, you get something like this:

[(10, (3, 3.616726727464709)), (10, (4, 2.9996439803387602)), ...

然后我们可以格式化数据并将其转换为数据框:

Then we can format the data and turn it into a dataframe:

rdd2.map(lambda x : (x[0], x[1][0], x[1][1]))\
    .toDF(("CId", "IID", "Score"))\
    .show()

这给了你这个:

+---+---+-------------------+
|CId|IID|              Score|
+---+---+-------------------+
| 10|  3|  3.616726727464709|
| 10|  4| 2.9996439803387602|
| 10|  5| 1.6767412921625855|
|  1|  3|  2.016527311459324|
|  1|  4|-1.5271512313750577|
|  1|  5| 1.9665475696370045|
|  2|  3|  6.230272144805092|
|  2|  4|  4.033642544526678|
|  2|  5| 3.1517805604906313|
|  3|  3|-0.3924680103722977|
|  3|  4| 2.9757316477407443|
|  3|  5|-1.5689126834176417|
+---+---+-------------------+

这篇关于如何在不使用 Pyspark 中的 collect() 方法的情况下将 pyspark.rdd.PipelinedRDD 转换为数据帧?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆