Spark 数据帧 reduceByKey [英] Spark dataframe reduceByKey

查看:32
本文介绍了Spark 数据帧 reduceByKey的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用的是 Spark 1.5/1.6,我想在 DataFrame 中进行 reduceByKey 操作,我不想将 df 转换为 rdd.

I am using Spark 1.5/1.6, where I want to do reduceByKey operation in DataFrame, I don't want to convert the df to rdd.

每一行看起来像,我有 id1 的多行.

Each row looks like and I have multiple rows for id1.

id1, id2, score, time

我想要这样的东西:

id1, [ (id21, score21, time21) , ((id22, score22, time22)) , ((id23, score23, time23)) ]

所以,对于每个id1",我想要一个列表中的所有记录

So, for each "id1", I want all records in a list

顺便说一句,不想将df转换为rdd的原因是因为我必须将这个(减少的)数据帧加入另一个数据帧,并且我正在对加入键进行重新分区,这使得它更快,我想 rdd 不能做同样的事情

By the way, the reason why don't want to convert df to rdd is because I have to join this (reduced) dataframe to another dataframe, and I am doing re-partitioning on the join key, which makes it faster, I guess the same cannot be done with rdd

任何帮助将不胜感激.

推荐答案

为了简单地保留已经实现的分区,然后在 reduceByKey 调用中重新使用父 RDD 分区器:

To simply preserve the partitioning already achieved then re-use the parent RDD partitioner in the reduceByKey invocation:

 val rdd = df.toRdd
 val parentRdd = rdd.dependencies(0) // Assuming first parent has the 
                                     // desired partitioning: adjust as needed
 val parentPartitioner = parentRdd.partitioner
 val optimizedReducedRdd = rdd.reduceByKey(parentPartitioner, reduceFn)

如果您指定分区程序如下:

If you were to not specify the partitioner as follows:

 df.toRdd.reduceByKey(reduceFn)  // This is non-optimized: uses full shuffle

那么您注意到的行为就会发生 - 即发生完全洗牌.那是因为将使用 HashPartitioner 代替.

then the behavior you noted would occur - i.e. a full shuffle occurs. That is because the HashPartitioner would be used instead.

这篇关于Spark 数据帧 reduceByKey的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆