Scala/Spark 数据框:找到最大值对应的列名 [英] Scala/Spark dataframes: find the column name corresponding to the max

查看:26
本文介绍了Scala/Spark 数据框:找到最大值对应的列名的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在 Scala/Spark 中,有一个数据框:

In Scala/Spark, having a dataframe:

val dfIn = sqlContext.createDataFrame(Seq(
  ("r0", 0, 2, 3),
  ("r1", 1, 0, 0),
  ("r2", 0, 2, 2))).toDF("id", "c0", "c1", "c2")

我想计算一个新列 maxCol ,其中包含与最大值(每行)对应的列的 名称.在这个例子中,输出应该是:

I would like to compute a new column maxCol holding the name of the column corresponding to the max value (for each row). With this example, the output should be:

+---+---+---+---+------+
| id| c0| c1| c2|maxCol|
+---+---+---+---+------+
| r0|  0|  2|  3|    c2|
| r1|  1|  0|  0|    c0|
| r2|  0|  2|  2|    c1|
+---+---+---+---+------+

实际上数据框有 60 多列.因此需要一个通用的解决方案.

Actually the dataframe have more than 60 columns. Thus a generic solution is required.

Python Pandas 中的等价物(是的,我知道,我应该与 pyspark 进行比较...)可能是:

The equivalent in Python Pandas (yes, I know, I should compare with pyspark...) could be:

dfOut = pd.concat([dfIn, dfIn.idxmax(axis=1).rename('maxCol')], axis=1) 

推荐答案

通过一个小技巧,您可以使用 greatest 功能.所需的导入:

With a small trick you can use greatest function. Required imports:

import org.apache.spark.sql.functions.{col, greatest, lit, struct}

首先让我们创建一个 structs 列表,其中第一个元素是值,第二个元素是列名:

First let's create a list of structs, where the first element is value, and the second one column name:

val structs = dfIn.columns.tail.map(
  c => struct(col(c).as("v"), lit(c).as("k"))
)

像这样的结构可以传递给 greatest 如下:

Structure like this can be passed to greatest as follows:

dfIn.withColumn("maxCol", greatest(structs: _*).getItem("k"))

+---+---+---+---+------+
| id| c0| c1| c2|maxCol|
+---+---+---+---+------+
| r0|  0|  2|  3|    c2|
| r1|  1|  0|  0|    c0|
| r2|  0|  2|  2|    c2|
+---+---+---+---+------+

请注意,如果出现平局,它将采用序列中后面出现的元素(按字典顺序(x, "c2") > (x, "c1")).如果由于某种原因这是不可接受的,您可以使用 when 显式减少:

Please note that in case of ties it will take the element which occurs later in the sequence (lexicographically (x, "c2") > (x, "c1")). If for some reason this is not acceptable you can explicitly reduce with when:

import org.apache.spark.sql.functions.when

val max_col = structs.reduce(
  (c1, c2) => when(c1.getItem("v") >= c2.getItem("v"), c1).otherwise(c2)
).getItem("k")

dfIn.withColumn("maxCol", max_col)

+---+---+---+---+------+
| id| c0| c1| c2|maxCol|
+---+---+---+---+------+
| r0|  0|  2|  3|    c2|
| r1|  1|  0|  0|    c0|
| r2|  0|  2|  2|    c1|
+---+---+---+---+------+

nullable 列的情况下,你必须调整它,例如通过 coalescing-Inf 的值.

In case of nullable columns you have to adjust this, for example by coalescing to values to -Inf.

这篇关于Scala/Spark 数据框:找到最大值对应的列名的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆