如何计算pyspark中每行某些列的最大值 [英] how to calculate max value in some columns per row in pyspark
问题描述
我在 pyspark 中有一个使用 sqlContext.sql 函数读取的数据框.这包含 4 个数字列,其中包含每个客户端的信息(这是键 ID).我需要计算每个客户端的最大值并将此值加入数据框:
I have a data frame read with sqlContext.sql function in pyspark. This contains 4 numerics columns with information per client (this is the key id). I need to calculate the max value per client and join this value to the data frame:
+--------+-------+-------+-------+-------+
|ClientId|m_ant21|m_ant22|m_ant23|m_ant24|
+--------+-------+-------+-------+-------+
| 0| null| null| null| null|
| 1| null| null| null| null|
| 2| null| null| null| null|
| 3| null| null| null| null|
| 4| null| null| null| null|
| 5| null| null| null| null|
| 6| 23| 13| 17| 8|
| 7| null| null| null| null|
| 8| null| null| null| null|
| 9| null| null| null| null|
| 10| 34| 2| 4| 0|
| 11| 0| 0| 0| 0|
| 12| 0| 0| 0| 0|
| 13| 0| 0| 30| 0|
| 14| null| null| null| null|
| 15| null| null| null| null|
| 16| 37| 29| 29| 29|
| 17| 0| 0| 16| 0|
| 18| 0| 0| 0| 0|
| 19| null| null| null| null|
+--------+-------+-------+-------+-------+
在这种情况下,客户端六"的最大值为 23,客户端十"的最大值为 30.空"在新列中自然为空.
In this case, the max value to the client "six" is 23 and the client "ten" is 30. the "null" is naturally null in the new column.
请帮助我展示如何进行此操作.
Please help me showing how can i do this operation.
推荐答案
我认为将值组合到列表中而不是在列表中查找最大值是最简单的方法.
I think combing values to a list and than finding max on it would be the simplest approach.
from pyspark.sql.types import *
schema = StructType([
StructField("ClientId", IntegerType(), True),
StructField("m_ant21", IntegerType(), True),
StructField("m_ant22", IntegerType(), True),
StructField("m_ant23", IntegerType(), True),
StructField("m_ant24", IntegerType(), True)
])
df = spark\
.createDataFrame(
data=[(0, None, None, None, None),
(1, 23, 13, 17, 99),
(2, 0, 0, 0, 1),
(3, 0, None, 1, 0)],
schema=schema)
import pyspark.sql.functions as F
def agg_to_list(m21,m22,m23,m24):
return [m21,m22,m23,m24]
u_agg_to_list = F.udf(agg_to_list, ArrayType(IntegerType()))
df2 = df.withColumn('all_values', u_agg_to_list('m_ant21', 'm_ant22', 'm_ant23', 'm_ant24'))\
.withColumn('max', F.sort_array("all_values", False)[0])\
.select('ClientId', 'max')
df2.show()
输出:
+--------+----+
|ClientId|max |
+--------+----+
|0 |null|
|1 |99 |
|2 |1 |
|3 |1 |
+--------+----+
这篇关于如何计算pyspark中每行某些列的最大值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!