Spark:将 UDF 应用于 Dataframe 基于 DF 中的值生成新列 [英] Spark: Applying UDF to Dataframe Generating new Columns based on Values in DF
问题描述
我在 Scala 中转置 DataFrame
中的值时遇到问题.我最初的 DataFrame
看起来像这样:
I am having problems transposing values in a DataFrame
in Scala. My initial DataFrame
looks like this:
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
| A| X| 6|null|
| B| Z|null| 5|
| C| Y| 4|null|
+----+----+----+----+
col1
和 col2
是 String
类型,col3
和 col4
是 Int
.
col1
and col2
are type String
and col3
and col4
are Int
.
结果应该是这样的:
+----+----+----+----+------+------+------+
|col1|col2|col3|col4|AXcol3|BZcol4|CYcol4|
+----+----+----+----+------+------+------+
| A| X| 6|null| 6| null| null|
| B| Z|null| 5| null| 5| null|
| C| Y| 4| 4| null| null| 4|
+----+----+----+----+------+------+------+
这意味着这三个新列应该以 col1
、col2
和提取值的列命名.提取的值来自 col2
、col3
或 col5
列,具体取决于哪个值不是 null
.
That means that the three new columns should be named after col1
, col2
and the column the value is extracted. The extracted value comes from the column col2
, col3
or col5
depending which value is not null
.
那么如何实现呢?我首先想到了这样的 UDF
:
So how to achieve that? I first thought of a UDF
like this:
def myFunc (col1:String, col2:String, col3:Long, col4:Long) : (newColumn:String, rowValue:Long) = {
if col3 == null{
val rowValue=col4;
val newColumn=col1+col2+"col4";
} else{
val rowValue=col3;
val newColumn=col1+col2+"col3";
}
return (newColumn, rowValue);
}
val udfMyFunc = udf(myFunc _ ) //needed to treat it as partially applied function
但是我如何以正确的方式从数据框中调用它?
But how can I call it from the dataframe in the right way?
当然,上面的所有代码都是垃圾,可能有更好的方法.因为我只是在处理第一个代码片段,所以让我知道......将 Int
值与 null
进行比较已经不起作用.
Of course, all code above is rubbish and there could be a much better way. Since I am just juggling the first code snippets let me know... Comparing the Int
value to null
is already not working.
感谢任何帮助!谢谢!
推荐答案
有一个更简单的方法:
val df3 = df2.withColumn("newCol", concat($"col1", $"col2")) //Step 1
.withColumn("value",when($"col3".isNotNull, $"col3").otherwise($"col4")) //Step 2
.groupBy($"col1",$"col2",$"col3",$"col4",$"newCol") //Step 3
.pivot("newCol") // Step 4
.agg(max($"value")) // Step 5
.orderBy($"newCol") // Step 6
.drop($"newCol") // Step 7
df3.show()
步骤如下:
- 添加一个新列,其中包含与 col2 连接的 col1 的内容
- //添加一个新列,value",其中包含 col3 或 col4 的非空内容
- 按你想要的列分组
- 以 newCol 为中心,其中包含现在将成为列标题的值
- 按值的最大值聚合,如果 groupBy 是每个组的单值,则该值本身就是值;或者
.agg(first($"value"))
如果 value 恰好是字符串而不是数字类型 - max 函数只能应用于数字类型 - 按 newCol 排序,所以 DF 是升序
- 在您不再需要时删除此列,如果您想要一列没有空值的值,请跳过此步骤
- Add a new column which contains the contents of col1 concatenated with col2
- // add a new column, "value" which contains the non-null contents of either col3 or col4
- GroupBy the columns you want
- pivot on newCol, which contains the values which are now to be column headings
- Aggregate by the max of value, which will be the value itself if the groupBy is single-valued per group; or alternatively
.agg(first($"value"))
if value happens to be a string rather than a numeric type - max function can only be applied to a numeric type - order by newCol so DF is in ascending order
- drop this column as you no longer need it, or skip this step if you want a column of values without nulls
感谢@user8371915,他首先帮助我回答了我自己的枢轴问题.
Credit due to @user8371915 who helped me answer my own pivot question in the first place.
结果如下:
+----+----+----+----+----+----+----+
|col1|col2|col3|col4| AX| BZ| CY|
+----+----+----+----+----+----+----+
| A| X| 6|null| 6|null|null|
| B| Z|null| 5|null| 5|null|
| C| Y| 4| 4|null|null| 4|
+----+----+----+----+----+----+----+
您可能需要使用列标题字符串连接来获得正确的结果.
You might have to play around with the column header strings concatenation to get the right result.
这篇关于Spark:将 UDF 应用于 Dataframe 基于 DF 中的值生成新列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!