使用 withColumn 从查找表动态添加新列 [英] adding a new column using withColumn from a lookup table dynamically
问题描述
我在 Java 8 中使用 spark-sql-2.4.1v.我有一个场景,我需要从查找表中动态添加一列.
I am using spark-sql-2.4.1v with Java 8. I have a scenario where I need to dynamically add a column from a look up table.
我有带列的数据框A, B, C , ..., X,Y, Z
I have data frame with columns A, B, C , ..., X,Y, Z
当少数(原始)列(例如:A、B、C)值为空时,我需要采用/替换列(例如:X、Y、Z)值,否则采用原始列值.我将获取此映射信息作为业务逻辑的一部分.如果是这种情况,我将遵循以下硬编码代码
When few (original) columns ( Ex: A,B,C) values are null , i need to take/substitute column( Ex: X,Y,Z) values else take the original column values. I will get this mapping information as part of business logic. If that is the case i will follow something like below hard-coded code
Dataset<Row> substitutedDs = ds
.withColumn("A",
when(col("A").isNull() , col("X").cast(DataTypes.StringType))
.otherwise(col("A").cast(DataTypes.StringType))
)
.withColumn("C",
when(col("C").isNull() , col("Z").cast(DataTypes.StringType))
.otherwise(col("C").cast(DataTypes.StringType))
哪个工作正常.但我需要动态/可配置地执行此操作以避免硬编码.
Which is working fine. But I need to do this dynamically/configurable to avoid hard-coding.
我将查找带有代码"列的查找表和code_substitutes"信息如下
I will get look up table with columns "code" and "code_substitutes" information as below
-------------------------
| Code | Code_Substitute |
-------------------------
A X
B Y
C Z
-------------------------
我需要动态构造上面的替换Ds";,这怎么办?
I need to dynamically construct above "substitutedDs" , how can this be done ?
推荐答案
有了 Java8,你可以使用这个 Stream.reduce() 重载:
With Java8, you can use this Stream.reduce() overload:
final Dataset<Row> dataframe = ...;
final Map<String, String> substitutes = ...;
final Dataset<Row> afterSubstitutions = codeSubstitutes.entrySet().stream()
.reduce(dataframe, (df, entry) ->
df.withColumn(entry.getKey(), when(/* replace with col(entry.getValue()) when null */)),
(left, right) -> { throw new IllegalStateException("Can't merge two dataframes. This stream should not be a parallel one!"); }
);
组合器(最后一个参数)应该合并并行处理的两个数据帧(如果流是 parallel()
流),但我们根本不允许这样做,因为我们是仅在 sequential()
流上调用此逻辑.
The combiner (last argument) is supposed to merge two dataframes processed in parallel (if the stream was a parallel()
stream), but we'll simply not allow that, as we're only invoking this logic on a sequential()
stream.
更具可读性/可维护性的版本涉及将上述逻辑提取到专用方法中的额外步骤,例如:
A more readable/maintainable version involves an extra-step for extracting the above logic into dedicated methods, such as:
// ...
Dataset<Row> nullSafeDf = codeSubstitutes.entrySet().stream()
.reduce(dataframe, this::replaceIfNull, this::throwingCombiner);
// ...
}
private Dataset<Row> replaceIfNull(Dataset<Row> df, Map.Entry<String, String> substitution) {
final String original = substitution.getKey();
final String replacement = substitution.getValue();
return df.withColumn(original, when(col(original).isNull(), col(replacement))
.otherwise(col(original)));
}
private <X> X throwingCombiner(X left, X right) {
throw new IllegalStateException("Combining not allowed");
}
这篇关于使用 withColumn 从查找表动态添加新列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!