PySpark:计算列子集的行最大值并添加到现有数据帧 [英] PySpark: compute row maximum of the subset of columns and add to an exisiting dataframe

查看:15
本文介绍了PySpark:计算列子集的行最大值并添加到现有数据帧的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想计算每一行的列子集的最大值,并将其添加为现有 Dataframe 的新列.

I would like to compute the maximum of a subset of columns for each row and add it as a new column for the existing Dataframe.

我以一种非常尴尬的方式做到了这一点:

I managed to do this in very awkward way:

def add_colmax(df,subset_columns,colnm):
     '''
     calculate the maximum of the selected "subset_columns" from dataframe df for each row, 
     new column containing row wise maximum is added to dataframe df. 

     df: dataframe. It must contain subset_columns as subset of columns
     colnm: Name of the new column containing row-wise maximum of subset_columns
     subset_columns: the subset of columns from w
     '''
     from pyspark.sql.functions import monotonicallyIncreasingId
     from pyspark.sql import Row
     def get_max_row_with_None(row):
         return float(np.max(row))

     df_subset = df.select(subset_columns)
     rdd = df_subset.map( get_max_row_with_None)
     df_rowsum = rdd.map(Row(colnm)).toDF()
     df_rowsum = df_rowsum.withColumn("id",monotonicallyIncreasingId())
     df = df.withColumn("id",monotonicallyIncreasingId())
     df = df.join(df_rowsum,df.id == df_rowsum.id).drop(df.id).drop(df_rowsum.id)
     return df

这个函数的作用是:

rdd1 =  sc.parallelize([("foo", 1.0,3.0,None), 
                    ("bar", 2.0,2.0,-10), 
                    ("baz", 3.3,1.2,10.0)])


df1 = sqlContext.createDataFrame(rdd1, ('v1', 'v2','v3','v4'))
df_new = add_colmax(df1,['v2','v3','v4'],"rowsum")   
df_new.collect()

返回:

 [Row(v1=u'bar', v2=2.0, v3=2.0, v4=-10, rowsum=2.0),
  Row(v1=u'baz', v2=3.3, v3=1.2, v4=None, rowsum=3.3),
  Row(v1=u'foo', v2=1.0, v3=3.0, v4=None, rowsum=3.0)]

我认为,如果我可以将用户定义的函数与 withColumn 一起使用,这可以更简单地完成.但我不知道该怎么做.如果您有更简单的方法来实现这一点,请告诉我.我使用的是 Spark 1.6

I think that if I could use user defined functions with withColumn, this can be done much simpler. But I could not figure out how to do it. Please let me know if you have simpler way to achieve this. I am using Spark 1.6

推荐答案

让我们从几个导入开始

from pyspark.sql.functions import col, lit, coalesce, greatest

接下来定义减无穷大文字:

Next define minus infinity literal:

minf = lit(float("-inf"))

映射列并将结果传递给greatest:

Map columns and pass the result to greatest:

rowmax = greatest(*[coalesce(col(x), minf) for x in ['v2','v3','v4']])

最后withColumn:

df1.withColumn("rowmax", rowmax)

结果:

+---+---+---+----+------+
| v1| v2| v3|  v4|rowmax|
+---+---+---+----+------+
|foo|1.0|3.0|null|   3.0|
|bar|2.0|2.0| -10|   2.0|
|baz|3.3|1.2|null|   3.3|
+---+---+---+----+------+

您可以使用具有不同行明智操作的相同模式将 minf 替换为中性元素.例如:

You can use the same pattern with different row wise operations replacing minf with neutral element. For example:

rowsum = sum([coalesce(col(x), lit(0)) for x in ['v2','v3','v4']])

或:

from operator import mul
from functools import reduce

rowproduct = reduce(
  mul, 
  [coalesce(col(x), lit(1)) for x in ['v2','v3','v4']]
)

使用 udf 可以显着简化您自己的代码:

Your own code could be significantly simplified with udf:

from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf

def get_max_row_with_None_(*cols):
    return float(max(x for x in cols if x is not None))

get_max_row_with_None = udf(get_max_row_with_None_, DoubleType())
df1.withColumn("rowmax", get_max_row_with_None('v2','v3','v4'))

minf 替换为 lit(float("inf")),将 greatest 替换为 least 以获得每个的最小值行.

Replace minf with lit(float("inf")) and greatest with least to get the smallest value per row.

这篇关于PySpark:计算列子集的行最大值并添加到现有数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆