如何在Spark中将双行与阈值匹配? [英] How can I match double rows with a threshold in Spark?
问题描述
我有一个非常简单的数据框:
I have a very simple dataframe:
+--+------+
|Id|Amount|
+--+------+
|0 |3.47 |
|1 |-3.47 |
|2 |3.47 |
|3 |3.47 |
|4 |2.02 |
|5 |-2.01 |
|6 |-2.01 |
|7 |7.65 |
|8 |7.65 |
+--+------+
我想匹配在给定阈值(假设为0.5)的情况下彼此抵消的线.因此,在这种情况下,请匹配0和1、4和5行,以及返回2和3行.对此有多种解决方案,返回0和2行也可以.
I'd like to match lines that cancel each other given a threshold value (let's say 0.5). So in this case, match line 0 and 1, 4 and 5, and return line 2 and 3. There are several solutions to this, returning lines 0 and 2 is also fine.
通常的想法是,它们应该以2比2匹配,剩余的将返回.如果每一行都有匹配项,它应该不返回任何内容,并且应该返回无法以这种方式配对的所有行.
The general idea is that they should be matched 2 by 2 and the leftovers returned. It should return nothing if every line has a match and should return all lines that couldn't be paired that way.
有什么想法吗?
预期结果:
+--+------+
|Id|Amount|
+--+------+
|0 |3.47 |
|2 |3.47 |
|6 |-2.01 |
|7 |7.65 |
|8 |7.65 |
+--+------+
我一直在考虑使用 UserDefinedAggregateFunction
,但是我不确定是否足够.尤其是因为我认为每行只能返回一个值.
I've been thinking about using an UserDefinedAggregateFunction
, but I'm not sure whether or not it's enough. Especially because I think it can only return one value per group of lines.
推荐答案
我选择了UDF.用Java编写UDF非常复杂...
I went with an UDF. Writing UDFs in Java is seriously overcomplicated...
如果有人可以看到简化这种麻烦的方法,请发表评论.
private UDF1<WrappedArray<Row>, Row[]> matchData() {
return (data) -> {
List<Data> dataList = JavaConversions.seqAsJavaList(data).stream().map(Data::fromRow).collect(Collectors.toList());
Set<Data> matched = new HashSet<>();
for (Data element : dataList) {
if (matched.contains(element)) continue;
dataList.stream().filter(e -> !matched.contains(e) && e != element)
.filter(e -> Math.abs(e.getAmount() + element.getAmount()) < THRESHOLD
&& Math.signum(e.getAmount()) != Math.signum(element.getAmount()))
.min(Comparator.comparingDouble(e -> Math.abs(e.getAmount() + element.getAmount())))
.ifPresent(e -> {
matched.add(e);
matched.add(element);
});
}
if (matched.size() != dataList.size()) {
return dataList.stream().map(Data::toRow).toArray(Row[]::new);
} else {
return new Row[0];
}
};
}
使用Data类(使用Lombok):
With the Data class (using Lombok):
@AllArgsConstructor
@EqualsAndHashCode
@Data
public final class Data {
private String name;
private Double amount;
public static Data fromRow(Row r) {
return new Data(
r.getString(r.fieldIndex("name")),
r.getDouble(r.fieldIndex("amount")));
}
public Row toRow() {
return RowFactory.create(name, amount);
}
}
如果不起作用,我将退还整套设备,这实际上是我需要的情况.
I'm returning the whole set in case it didn't work, this is actually what I need in my case.
这篇关于如何在Spark中将双行与阈值匹配?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!