如何将双行与 Spark 中的阈值匹配? [英] How can I match double rows with a threshold in Spark?

查看:25
本文介绍了如何将双行与 Spark 中的阈值匹配?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个非常简单的数据框:

I have a very simple dataframe:

+--+------+
|Id|Amount|
+--+------+
|0 |3.47  |
|1 |-3.47 |
|2 |3.47  |
|3 |3.47  |
|4 |2.02  |
|5 |-2.01 |
|6 |-2.01 |
|7 |7.65  |
|8 |7.65  |
+--+------+

我想匹配在给定阈值(假设为 0.5)的情况下相互抵消的行.所以在这种情况下,匹配第0行和第1行、第4行和第5行,返回第2行和第3行.这个有几种解决方案,返回第0行和第2行也可以.

I'd like to match lines that cancel each other given a threshold value (let's say 0.5). So in this case, match line 0 and 1, 4 and 5, and return line 2 and 3. There are several solutions to this, returning lines 0 and 2 is also fine.

一般的想法是,它们应该 2 x 2 匹配,剩下的返回.如果每一行都有匹配项,它应该不返回任何内容,并且应该返回所有不能以这种方式配对的行.

The general idea is that they should be matched 2 by 2 and the leftovers returned. It should return nothing if every line has a match and should return all lines that couldn't be paired that way.

知道怎么做吗?

预期结果:

+--+------+
|Id|Amount|
+--+------+
|0 |3.47  |
|2 |3.47  |
|6 |-2.01 |
|7 |7.65  |
|8 |7.65  |
+--+------+

我一直在考虑使用 UserDefinedAggregateFunction,但我不确定它是否足够.特别是因为我认为每组行只能返回一个值.

I've been thinking about using an UserDefinedAggregateFunction, but I'm not sure whether or not it's enough. Especially because I think it can only return one value per group of lines.

推荐答案

我选择了 UDF.用 Java 编写 UDF 非常复杂...

I went with an UDF. Writing UDFs in Java is seriously overcomplicated...

如果有人能找到一种简化这种混乱的方法,请发帖或发表评论.

private UDF1<WrappedArray<Row>, Row[]> matchData() {
    return (data) -> {
        List<Data> dataList = JavaConversions.seqAsJavaList(data).stream().map(Data::fromRow).collect(Collectors.toList());
        Set<Data> matched = new HashSet<>();

        for (Data element : dataList) {
            if (matched.contains(element)) continue;

            dataList.stream().filter(e -> !matched.contains(e) && e != element)
                    .filter(e -> Math.abs(e.getAmount() + element.getAmount()) < THRESHOLD
                            && Math.signum(e.getAmount()) != Math.signum(element.getAmount()))
                    .min(Comparator.comparingDouble(e -> Math.abs(e.getAmount() + element.getAmount())))
                    .ifPresent(e -> {
                        matched.add(e);
                        matched.add(element);
                    });
        }


        if (matched.size() != dataList.size()) {
            return dataList.stream().map(Data::toRow).toArray(Row[]::new);
        } else {
            return new Row[0];
        }
    };
}

使用 Data 类(使用 Lombok):

With the Data class (using Lombok):

@AllArgsConstructor
@EqualsAndHashCode
@Data
public final class Data {
    private String name;
    private Double amount;

    public static Data fromRow(Row r) {
        return new Data(
                r.getString(r.fieldIndex("name")),
                r.getDouble(r.fieldIndex("amount")));
    }

    public Row toRow() {
        return RowFactory.create(name, amount);
    }
}

如果它不起作用,我将返回整个集合,这实际上是我所需要的.

I'm returning the whole set in case it didn't work, this is actually what I need in my case.

这篇关于如何将双行与 Spark 中的阈值匹配?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆