如何将Java流中的RuntimeExceptions映射到“恢复”来自无效的流元素 [英] How to map RuntimeExceptions in Java streams to "recover" from invalid stream elements

查看:150
本文介绍了如何将Java流中的RuntimeExceptions映射到“恢复”来自无效的流元素的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

想象一下,我正在构建一个图书馆,它将会收到一个整数流,所有的图书馆代码都需要用字符串表示形式返回一串字符串。

Imagine I'm building a library, that will receive a Stream of Integers, and all the library code needs to do is return a stream of Strings with the string representation of the number.

public Stream<String> convertToString(Stream<Integer> input) {
  return input.map(n -> String.valueOf(n));
}

但是,假设有人决定使用以下代码调用我的功能: p>

However, imagine someone decides to invoke my function with the following code:

Stream<Integer> input = Stream.of(1,2,3)
  .map(n -> {
    if (n %3 ) { throw new RuntimeException("3s are not welcome here!"); }
    else { return n; }
  }

Stream<String> output = convertToString(input);

由于我希望我的API合约成为可以处理这种情况并仍然返回一串串的字符串,我想知道如何更改我的代码,以便检测到流中有一个异常,并通过将其映射到特殊的NaN值,从异常中恢复。

Since I want my API contract to be one that can handle this situation and still return a Stream of Strings, I'd like to know how to change my code so it detects that there's an exception in the stream and "recover" from the exception by mapping it to a special "NaN" value.

最后,我希望我的输出流为1,2,NaN

In the end, I want my output Stream to be "1","2","NaN"

注意:


  • 我不希望我的API必须为输入定义一个特殊的包装器。

  • 由于依赖关系,我不想依赖第三方库。

  • 我不知道流是多大还是以什么速度将被生成,所以我不能使用任何缓存/ p重新加载所有值。

Java 8 Streams可以这样吗?

Is this possible with Java 8 Streams?

有一个迭代器,我想我可以做到:

With an Iterator, I imagine I could have done:

public class SafeIntegerToStringIterator implements Iterator<String> {
  Iterator<Integer> innerIterator;
  ...
  public String next() throws NoSuchElementException {
    try { return String.valueOf(this.innerIterator.next()); }
    catch (RuntimeException e) { return "NaN"; }
  }

}
...
public Iterator<String> convertToString(Iterator<Integer> input) {
  return new SafeIntegerToStringIterator(input);
}

谢谢

推荐答案

注意:请参阅本文末尾的修改,修正了原始答案中的错误。我正在离开原来的答案,因为它在许多情况下仍然有用,我认为这有助于解决OP的问题,至少有一些限制。

Note: Please see the edit at the end of this post, which fixes a bug in my original answer. I'm leaving my original answer anyway, because it's still useful for many cases and I think it helps solve OP's question, at least with some restrictions.

您使用 Iterator 的方法走向正确的方向。该解决方案可能如下所示:将流转换为迭代器,按照已经完成的方式包装迭代器,然后从包装器迭代器创建流,但您应该使用 Spliterator 。以下是代码:

Your approach with Iterator goes in the right direction. The solution might be drafted as follows: convert the stream to an iterator, wrap the iterator as you have already done, and then create a stream from the wrapper iterator, except that you should use a Spliterator instead. Here's the code:

private static <T> Stream<T> asNonThrowingStream(
        Stream<T> stream,
        Supplier<? extends T> valueOnException) {

    // Get spliterator from original stream
    Spliterator<T> spliterator = stream.spliterator();

    // Return new stream from wrapper spliterator
    return StreamSupport.stream(

        // Extending AbstractSpliterator is enough for our purpose
        new Spliterators.AbstractSpliterator<T>(
                spliterator.estimateSize(),
                spliterator.characteristics()) {

            // We only need to implement tryAdvance
            @Override
            public boolean tryAdvance(Consumer<? super T> action) {
                try {
                    return spliterator.tryAdvance(action);
                } catch (RuntimeException e) {
                    action.accept(valueOnException.get());
                    return true;
                }
            }
        }, stream.isParallel());
}

我们正在扩展 AbstractSpliterator 将包装器返回由原始流。我们只需要实现 tryAdvance 方法,它们可以委托给原始拼写器的 tryAdvance 方法,或者捕获 RuntimeException 并使用提供的 valueOnException 值调用操作。

We are extending AbstractSpliterator to wrap the spliterator returned by the original stream. We only need to implement the tryAdvance method, which either delegates to the original spliterator's tryAdvance method, or catches RuntimeException and invokes the action with the supplied valueOnException value.

Spliterator 的合同规定了 tryAdvance 必须是 true ,如果该消息被消耗,所以如果一个 RuntimeException 被捕获,那意味着原来的拼写已经从自己的 tryAdvance 方法中抛出了它。因此,在这种情况下,我们返回 true ,这意味着元素被消耗了。

Spliterator's contract specifies that the return value of tryAdvance must be true if the action is consumed, so if a RuntimeException is catched, it means that the original spliterator has thrown it from within its own tryAdvance method. Thus, we return true in this case, meaning that the element was consumed anyway.

原始拼写器的估计大小通过将这些值作为参数传递给 AbstractSpliterator 的构造函数来保留特征。

The original spliterator's estimate size and characteristics are preserved by passing these values as arguments to the constructor of AbstractSpliterator.

最后,我们创建一个新的通过 StreamSupport.stream 方法从新的拼接器流。如果原来的流也是平行的,新流是平行的。

Finally, we create a new stream from the new spliterator via the StreamSupport.stream method. The new stream is parallel if the original one was also parallel.

以下是使用上述方法的方法:

Here's how to use the above method:

public Stream<String> convertToString(Stream<Integer> input) {
    return asNonThrowingStream(input.map(String::valueOf), () -> "NaN");
}






编辑



根据 Holger的评论用户holi-java 已经提供了一个解决方案,避免了Holger指出的陷阱。


Edit

As per Holger's comment below, user holi-java has kindly provided a solution that avoids the pitfalls pointed out by Holger.

这是代码:

<T> Stream<T> exceptionally(Stream<T> source, BiConsumer<Exception, Consumer<? super T>> handler) {
    class ExceptionallySpliterator extends AbstractSpliterator<T>
            implements Consumer<T> {

        private Spliterator<T> source;
        private T value;
        private long fence;

        ExceptionallySpliterator(Spliterator<T> source) {
            super(source.estimateSize(), source.characteristics());
            this.fence = source.getExactSizeIfKnown();
            this.source = source;
        }

        @Override
        public Spliterator<T> trySplit() {
            Spliterator<T> it = source.trySplit();
            return it == null ? null : new ExceptionallySpliterator(it);
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            return fence != 0 && consuming(action);
        }

        private boolean consuming(Consumer<? super T> action) {
            Boolean state = tryConsuming(action);
            if (state == null) {
                return true;
            }
            if (state) {
                action.accept(value);
                value = null;
                return true;
            }
            return false;
        }


        private Boolean tryConsuming(Consumer<? super T> action) {
            fence--;
            try {
                return source.tryAdvance(this);
            } catch (Exception ex) {
                handler.accept(ex, action);
                return null;
            }
        }

        @Override
        public void accept(T value) {
            this.value = value;
        }
    }

    return stream(new ExceptionallySpliterator(source.spliterator()), source.isParallel()).onClose(source::close);
}

请参考到测试,如果你想进一步了解这个解决方案。

Please refer to the tests if you want to further know about this solution.

这篇关于如何将Java流中的RuntimeExceptions映射到“恢复”来自无效的流元素的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆