Spark:将UDF应用于数据框,基于DF中的值生成新列 [英] Spark: Applying UDF to Dataframe Generating new Columns based on Values in DF

查看:117
本文介绍了Spark:将UDF应用于数据框,基于DF中的值生成新列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在Scala的DataFrame中转换值时遇到问题.我最初的DataFrame看起来像这样:

I am having problems transposing values in a DataFrame in Scala. My initial DataFramelooks like this:

+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   A|   X|   6|null|
|   B|   Z|null|   5|
|   C|   Y|   4|null|
+----+----+----+----+

col1col2String类型,而col3col4Int.

结果应如下所示:

+----+----+----+----+------+------+------+
|col1|col2|col3|col4|AXcol3|BZcol4|CYcol4|
+----+----+----+----+------+------+------+
|   A|   X|   6|null|     6|  null|  null|
|   B|   Z|null|   5|  null|     5|  null|
|   C|   Y|   4|   4|  null|  null|     4|
+----+----+----+----+------+------+------+

这意味着应该在col1col2之后命名这三个新列,并提取值的列.提取的值来自列col2col3col5,具体取决于哪个值不是null.

That means that the three new columns should be named after col1, col2 and the column the value is extracted. The extracted value comes from the column col2, col3 or col5 depending which value is not null.

那么如何实现呢?我首先想到这样的UDF:

So how to achieve that? I first thought of a UDF like this:

def myFunc (col1:String, col2:String, col3:Long, col4:Long) : (newColumn:String, rowValue:Long) = {
    if col3 == null{
        val rowValue=col4;
        val newColumn=col1+col2+"col4";
    } else{
        val rowValue=col3;
        val newColumn=col1+col2+"col3";
     }
    return (newColumn, rowValue);
}

val udfMyFunc = udf(myFunc _ ) //needed to treat it as partially applied function

但是我怎么能以正确的方式从数据框中调用它呢?

But how can I call it from the dataframe in the right way?

当然,以上所有代码都是垃圾,可能还有更好的方法.由于我只是在处理第一个代码段,所以让我知道...将Int值与null进行比较已经行不通了.

Of course, all code above is rubbish and there could be a much better way. Since I am just juggling the first code snippets let me know... Comparing the Int value to null is already not working.

感谢您的帮助!谢谢!

推荐答案

有一种更简单的方法:

val df3 = df2.withColumn("newCol", concat($"col1", $"col2")) //Step 1
          .withColumn("value",when($"col3".isNotNull, $"col3").otherwise($"col4")) //Step 2
          .groupBy($"col1",$"col2",$"col3",$"col4",$"newCol") //Step 3
          .pivot("newCol") // Step 4
          .agg(max($"value")) // Step 5
          .orderBy($"newCol") // Step 6
          .drop($"newCol") // Step 7

      df3.show()

步骤如下:

  1. 添加一个新列,其中包含与col2串联的col1的内容
  2. ///添加一个新列"value",其中包含col3或col4的非空内容
  3. 按所需列分组
  4. 在newCol上旋转,其中包含现在将成为列标题的值
  5. 按值的最大值进行汇总,如果groupBy是每个组中的一个值,则该值为值本身.或.agg(first($"value"))如果值恰好是字符串而不是数字类型-max函数只能应用于数字类型
  6. 由newCol排序,因此DF处于升序
  7. 在不再需要此列时将其删除,如果希望一列不为空的值,请跳过此步骤
  1. Add a new column which contains the contents of col1 concatenated with col2
  2. // add a new column, "value" which contains the non-null contents of either col3 or col4
  3. GroupBy the columns you want
  4. pivot on newCol, which contains the values which are now to be column headings
  5. Aggregate by the max of value, which will be the value itself if the groupBy is single-valued per group; or alternatively .agg(first($"value")) if value happens to be a string rather than a numeric type - max function can only be applied to a numeric type
  6. order by newCol so DF is in ascending order
  7. drop this column as you no longer need it, or skip this step if you want a column of values without nulls

归功于@ user8371915,他首先帮助我回答了我自己的关键问题.

Credit due to @user8371915 who helped me answer my own pivot question in the first place.

结果如下:

+----+----+----+----+----+----+----+
|col1|col2|col3|col4|  AX|  BZ|  CY|
+----+----+----+----+----+----+----+
|   A|   X|   6|null|   6|null|null|
|   B|   Z|null|   5|null|   5|null|
|   C|   Y|   4|   4|null|null|   4|
+----+----+----+----+----+----+----+

您可能必须处理列标题字符串的串联才能获得正确的结果.

You might have to play around with the column header strings concatenation to get the right result.

这篇关于Spark:将UDF应用于数据框,基于DF中的值生成新列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆