分区上的Spark Scala DataFrame功能 [英] spark scala dataframe function over partition

查看:83
本文介绍了分区上的Spark Scala DataFrame功能的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有2千万行,每千个组看起来像这样

I have 200 Mil rows with 1K groups looking like this

Group     X             Y             Z          Q           W
group1  0.054464866 0.002248819 0.299069804 0.763352879 0.395905106
group2  0.9986218   0.023649037 0.50762069  0.212225807 0.619571705
group1  0.839928517 0.290339179 0.050407454 0.75837838  0.495466007
group1  0.021003132 0.663366686 0.687928832 0.239132224 0.020848608
group1  0.393843426 0.006299292 0.141103438 0.858481036 0.715860852
group2  0.045960198 0.014858905 0.672267793 0.59750871  0.893646818

我想为每个函数运行相同的函数(例如,对 [X,Z,Q,W] 进行 X 线性回归)组中的.我本可以做 Window.partition 等,但是我有自己的功能.目前,我执行以下操作:

I want to run the same function (say linear regression of X on [X, Z, Q, W]) for each of the groups. I could have done Window.partition etc. but I have my own function. At the moment, I do the following:

df.select("Group").distinct.collect.toList.foreach{group => 
val dfGroup = df.filter(col("Group")===group
dfGroup.withColumn("res", myUdf(col("X"), col("Y"), col("Z"), col("Q"), col("W"))}

想知道是否有更好的方法吗?

Wonder if there is a better way to do?

推荐答案

根据您的喜好,您至少有两个选项:DataFrame或Dataset.

You have minimum two options depending what you prefer: DataFrame or Dataset.

df
  .groupBy("group")
  .agg(myUdaf(col("col1"), col("col2")))

其中 myUdaf 是UDAF

在这里您可以找到如何实施UDAF的示例: https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html

Here you can find example how to implement UDAF: https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html

您可以使用Dataset API中的 groupByKey mapGroups 转换:

You can use groupByKey and mapGroups transformations from Dataset API:

ds
  .groupByKey(_.group)
  .mapGroups{case (group, values) =>
    (group, aggregator(values))
  }

其中 aggregator 是Scala函数,负责聚合对象集合.

where aggregator is Scala function responsible for aggregating collection of objects.

如果您不需要汇总,则可以使用 map 转换来映射 values ,例如:

If you don't need aggregating you can just map values using map transformation, example:

values.map(v => fun(...))

这篇关于分区上的Spark Scala DataFrame功能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆