从查询表中使用withColumn动态添加新列 [英] adding a new column using withColumn from a lookup table dynamically

查看:791
本文介绍了从查询表中使用withColumn动态添加新列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在Java 8中使用spark-sql-2.4.1v.我有一种情况,需要从查找表中动态添加列.

I am using spark-sql-2.4.1v with Java 8. I have a scenario where I need to dynamically add a column from a look up table.

我有带有列的数据框 A,B,C,...,X,Y,Z

I have data frame with columns A, B, C , ..., X,Y, Z

当几个(原始)列(例如:A,B,C)的值为null时,我需要采用/替换列(例如:X,Y,Z)的值,否则采用原始列的值. 我将获得此映射信息作为业务逻辑的一部分. 如果是这种情况,我将遵循以下硬编码代码

When few (original) columns ( Ex: A,B,C) values are null , i need to take/substitute column( Ex: X,Y,Z) values else take the original column values. I will get this mapping information as part of business logic. If that is the case i will follow something like below hard-coded code

 Dataset<Row>  substitutedDs = ds
                  .withColumn("A",
                             when(col("A").isNull() , col("X").cast(DataTypes.StringType))
                             .otherwise(col("A").cast(DataTypes.StringType))
                          )
                  .withColumn("C",
                             when(col("C").isNull() , col("Z").cast(DataTypes.StringType))
                             .otherwise(col("C").cast(DataTypes.StringType))
                         

哪个工作正常.但是我需要动态/可配置来避免硬编码.

Which is working fine. But I need to do this dynamically/configurable to avoid hard-coding.

我将查找带有"code"列的查询表.和"code_substitutes"信息如下

I will get look up table with columns "code" and "code_substitutes" information as below

-------------------------
| Code | Code_Substitute |
-------------------------
  A         X
  B         Y
  C         Z
-------------------------

我需要动态地构建上面的取代的D". ,该怎么办?

I need to dynamically construct above "substitutedDs" , how can this be done ?

推荐答案

对于Java8,您可以使用

With Java8, you can use this Stream.reduce() overload:

final Dataset<Row> dataframe = ...;
final Map<String, String> substitutes = ...;

final Dataset<Row> afterSubstitutions = codeSubstitutes.entrySet().stream()
    .reduce(dataframe, (df, entry) ->
            df.withColumn(entry.getKey(), when(/* replace with col(entry.getValue()) when null */)),
            (left, right) -> { throw new IllegalStateException("Can't merge two dataframes. This stream should not be a parallel one!"); }
    );

合并器(最后一个参数)应该合并两个并行处理的数据帧(如果流是parallel()流),但是我们根本不允许这样做,因为我们仅在sequential()流.

The combiner (last argument) is supposed to merge two dataframes processed in parallel (if the stream was a parallel() stream), but we'll simply not allow that, as we're only invoking this logic on a sequential() stream.

更具可读性/可维护性的版本涉及将上述逻辑提取到专用方法中的额外步骤,例如:

A more readable/maintainable version involves an extra-step for extracting the above logic into dedicated methods, such as:

    // ...
    Dataset<Row> nullSafeDf = codeSubstitutes.entrySet().stream()
        .reduce(dataframe, this::replaceIfNull, this::throwingCombiner);
    // ...
}


private Dataset<Row> replaceIfNull(Dataset<Row> df, Map.Entry<String, String> substitution) {
    final String original = substitution.getKey();
    final String replacement = substitution.getValue();
    return df.withColumn(original, when(col(original).isNull(), col(replacement))
            .otherwise(col(original)));
}

private <X> X throwingCombiner(X left, X right) {
    throw new IllegalStateException("Combining not allowed");
}

这篇关于从查询表中使用withColumn动态添加新列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆