Spark:将UDF应用于数据框,基于DF中的值生成新列 [英] Spark: Applying UDF to Dataframe Generating new Columns based on Values in DF
问题描述
我在Scala的DataFrame
中转换值时遇到问题.我最初的DataFrame
看起来像这样:
I am having problems transposing values in a DataFrame
in Scala. My initial DataFrame
looks like this:
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
| A| X| 6|null|
| B| Z|null| 5|
| C| Y| 4|null|
+----+----+----+----+
col1
和col2
是String
类型,而col3
和col4
是Int
.
结果应如下所示:
+----+----+----+----+------+------+------+
|col1|col2|col3|col4|AXcol3|BZcol4|CYcol4|
+----+----+----+----+------+------+------+
| A| X| 6|null| 6| null| null|
| B| Z|null| 5| null| 5| null|
| C| Y| 4| 4| null| null| 4|
+----+----+----+----+------+------+------+
这意味着应该在col1
,col2
之后命名这三个新列,并提取值的列.提取的值来自列col2
,col3
或col5
,具体取决于哪个值不是null
.
That means that the three new columns should be named after col1
, col2
and the column the value is extracted. The extracted value comes from the column col2
, col3
or col5
depending which value is not null
.
那么如何实现呢?我首先想到这样的UDF
:
So how to achieve that? I first thought of a UDF
like this:
def myFunc (col1:String, col2:String, col3:Long, col4:Long) : (newColumn:String, rowValue:Long) = {
if col3 == null{
val rowValue=col4;
val newColumn=col1+col2+"col4";
} else{
val rowValue=col3;
val newColumn=col1+col2+"col3";
}
return (newColumn, rowValue);
}
val udfMyFunc = udf(myFunc _ ) //needed to treat it as partially applied function
但是我怎么能以正确的方式从数据框中调用它呢?
But how can I call it from the dataframe in the right way?
当然,以上所有代码都是垃圾,可能还有更好的方法.由于我只是在处理第一个代码段,所以让我知道...将Int
值与null
进行比较已经行不通了.
Of course, all code above is rubbish and there could be a much better way. Since I am just juggling the first code snippets let me know... Comparing the Int
value to null
is already not working.
感谢您的帮助!谢谢!
推荐答案
有一种更简单的方法:
val df3 = df2.withColumn("newCol", concat($"col1", $"col2")) //Step 1
.withColumn("value",when($"col3".isNotNull, $"col3").otherwise($"col4")) //Step 2
.groupBy($"col1",$"col2",$"col3",$"col4",$"newCol") //Step 3
.pivot("newCol") // Step 4
.agg(max($"value")) // Step 5
.orderBy($"newCol") // Step 6
.drop($"newCol") // Step 7
df3.show()
步骤如下:
- 添加一个新列,其中包含与col2串联的col1的内容
- ///添加一个新列"value",其中包含col3或col4的非空内容
- 按所需列分组
- 在newCol上旋转,其中包含现在将成为列标题的值
- 按值的最大值进行汇总,如果groupBy是每个组中的一个值,则该值为值本身.或
.agg(first($"value"))
如果值恰好是字符串而不是数字类型-max函数只能应用于数字类型 - 由newCol排序,因此DF处于升序
- 在不再需要此列时将其删除,如果希望一列不为空的值,请跳过此步骤
- Add a new column which contains the contents of col1 concatenated with col2
- // add a new column, "value" which contains the non-null contents of either col3 or col4
- GroupBy the columns you want
- pivot on newCol, which contains the values which are now to be column headings
- Aggregate by the max of value, which will be the value itself if the groupBy is single-valued per group; or alternatively
.agg(first($"value"))
if value happens to be a string rather than a numeric type - max function can only be applied to a numeric type - order by newCol so DF is in ascending order
- drop this column as you no longer need it, or skip this step if you want a column of values without nulls
归功于@ user8371915,他首先帮助我回答了我自己的关键问题.
Credit due to @user8371915 who helped me answer my own pivot question in the first place.
结果如下:
+----+----+----+----+----+----+----+
|col1|col2|col3|col4| AX| BZ| CY|
+----+----+----+----+----+----+----+
| A| X| 6|null| 6|null|null|
| B| Z|null| 5|null| 5|null|
| C| Y| 4| 4|null|null| 4|
+----+----+----+----+----+----+----+
您可能必须处理列标题字符串的串联才能获得正确的结果.
You might have to play around with the column header strings concatenation to get the right result.
这篇关于Spark:将UDF应用于数据框,基于DF中的值生成新列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!