当Java 8 Stream抛出RuntimeException时,预期的行为是什么? [英] What is the expected behavior when a Java 8 Stream throw a RuntimeException?

查看:164
本文介绍了当Java 8 Stream抛出RuntimeException时,预期的行为是什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在流处理过程中遇到 RuntimeException 时,流处理是否会中止?应该先完成吗?是否应该在 Stream.close()上重新抛出异常?异常是否被重新抛出或被包裹? Stream <的JavaDoc / code> 并打包 java.util.stream 无话可说。



我发现Stackoverflow上的所有问题似乎都集中在如何操作上从功能接口中包装一个已检查的异常,以便编译它们的代码。事实上,互联网上的博客文章和类似文章都集中在同一个警告上。这不是我关心的问题。



根据我自己的经验,顺序流的处理将在<$ c $后立即中止c>抛出RuntimeException ,并按原样重新抛出此异常。仅当客户端线程抛出异常时,对于并行流,这是相同的。



但是,示例代码放在 here 演示如果在并行流处理期间由工作线程(=与调用终端操作的线程不同的线程)抛出异常,则此异常将永远丢失并且流处理完成。



示例代码首先运行 IntStream 并行。然后是正常 流式 并行。



示例将显示,



1) IntStream 如果遇到RuntimeException,则中止并行处理没有问题。异常被重新抛出,包装在另一个RuntimeException中。



2) Stream 不能很好地播放。实际上,客户端线程永远不会看到抛出RuntimeException的痕迹。流不仅完成处理; 更多元素,而不是指定的 limit()将被处理!



在示例中代码, IntStream 是使用 IntStream.range()。 普通没有范围的概念,而是由1:s组成,但 Stream.limit()将流限制为10亿个元素。



这是另一个转折点。生成IntStream的示例代码执行如下操作:

  IntStream.range(0,1_000_000_000).parallel()。forEach (..)

将其更改为生成的流,就像代码中的第二个示例一样:

  IntStream.generate(() - > 1).limit(1_000_000_000).parallel()。forEach(..)

此IntStream的结果是相同的:异常被包装并重新抛出,处理中止。但是,第二个流现在也将包装并重新抛出异常,而不是处理超出限制的元素!因此:更改第一个流的生成方式会对第二个流的行为产生副作用。对我来说,这很奇怪。



JavaDoc ForkJoinPool.invoke() ForkJoinTask 表示异常被重新抛出,这是我对平行流的期望是什么。



背景



我在处理元素时遇到了这个问题取自 Collection.stream()。parallel()的并行流(我还没有验证 Collection.parallelStream()的行为但它应该是相同的)。发生的事情是工作线程崩溃然后静静地离开,而所有其他线程成功完成了流。我的应用程序使用默认异常处理程序,用于将异常写入日志文件。但是甚至没有创建这个日志文件。线程和他的例外根本就消失了。由于我需要在捕获运行时异常时立即中止,因此一种替代方法是编写将此异常泄漏给其他工作程序的代码,使其在任何其他线程抛出异常时不愿意继续。当然,这并不能保证流实现只是继续生成尝试完成流的新线程。所以我可能最终不会使用并行流,而是使用线程池/执行器进行正常并发编程。



这表明运行时异常丢失的问题是未隔离到 Stream.generate()生成的流或使用 Stream.limit()生成的流。最重要的是,我很想知道...是预期的行为?

解决方案

行为没有区别在关于异常报告的这两个流中,问题是你将两个测试一个接一个地放入一个方法中,让它们访问共享数据结构。



有一个微妙的,可能没有充分记录(如果预期)行为:当流操作异常完成时,它不会等待所有并发操作的完成。



所以当你捕获异常时在第一个流操作中,仍有一些线程正在运行并访问您的共享数据。因此,当您重置 AtomicBoolean 时,属于第一个作业的其中一个线程将读取 false 值,将其翻转到 true ,打印消息并抛出一个丢失的异常,因为流操作已经异常完成。此外,这些线程中的一些将在重置之后引发计数器,这就是为什么它的数量高于第二个作业允许的数量。您的第二个作业没有异常完成,因为属于第二个作业的所有线程都将从 AtomicBoolean 中读取 true 值。



有一些方法可以发现这一点。



当您删除第一个流操作时,第二个流将按预期异常完成。另外,插入语句

  ForkJoinPool.commonPool()。awaitQuiescence(1,TimeUnit.DAYS);两个流操作之间的

将解决问题,因为它等待所有线程的完成。 / p>

然而,更干净的解决方案是让两个流操作都使用他们自己的计数器和标志。



那说,如果你只是交换这两个操作,有一个微妙的,依赖于实现的差异会导致问题消失。 IntStream.range 操作生成一个已知大小的流,允许将其拆分为本质上知道要处理多少元素的并发任务。这允许在如上所述的特殊情况下放弃这些任务。另一方面,将生成返回的无限流与 limit 组合起来不会产生大小的流(尽管如此)是可能的)。由于这种流被视为具有未知大小,因此子任务必须在计数器上同步以确保遵守该限制。这导致子任务(有时)完成,即使在特殊情况下也是如此。但正如所说,这是实施细节的副作用,而非故意等待完成。因为它是关于并发性的,所以如果你多次运行它,结果可能会有所不同。


When encountering a RuntimeException during stream processing, should the stream processing abort? Should it first finish? Should the exception be rethrown on Stream.close()? Is the exception rethrown as is or is it wrapped? The JavaDoc of Stream and package java.util.stream has nothing to say about it.

All questions on Stackoverflow that I have found seem to be focused on how-to wrap a checked exception from within a functional interface in order to make their code compile. Indeed, blog posts and similar articles on Internet all focus on the same caveat. This is not my concern.

I know from my own experience that the processing of sequential streams will abort as soon as a RuntimeException is thrown and this exception is rethrown as is. This is the same for parallel stream only if the exception was thrown by the client's thread.

However, example code put here demonstrate that if the exception was thrown by a "worker thread" (= not the same thread as the one invoking the terminal operation) during a parallel stream processing, then this exception will forever be lost and the stream processing completes.

The example code will first run an IntStream in parallel. Then a "normal" Stream in parallel.

The example will show that,

1) IntStream has no problem aborting parallel processing if a RuntimeException is encountered. The exception is re-thrown, wrapped in another RuntimeException.

2) Stream does not play as nice. In fact, client thread will never see a trace of the RuntimeException thrown. The stream does not only finish processing; more elements than what limit() specified will be processed!

In the example code, IntStream is generated using IntStream.range(). The "normal" Stream has no notion of a "range" and is instead made up of 1:s, but Stream.limit() is called to limit the stream to one billion elements.

Here's another twist. The example code that produce the IntStream does something like this:

IntStream.range(0, 1_000_000_000).parallel().forEach(..)

Change that to a generated stream just like the second example in the code:

IntStream.generate(() -> 1).limit(1_000_000_000).parallel().forEach(..)

The outcome for this IntStream is the same: Exception is wrapped and rethrown, the processing aborts. But, the second stream will now also wrap and rethrow the exception and not process more elements than the limit! Thus: Changing how the first stream is produced have a side-effect on how the second stream behave. To me, this is very odd.

JavaDoc of ForkJoinPool.invoke() and ForkJoinTask says that exceptions are rethrown and this is what I would have expected from a parallel stream.

Background

I encountered this "problem" when processing elements in a parallel stream taken from Collection.stream().parallel() (I haven't verified the behavior of Collection.parallelStream() but it should be the same). What happened was that a "worker thread" crashed and then went silently away while all other threads completed the stream successfully. My app uses a default exception handler that write the exception to a log file. But not even this log file was created. The thread and his exception simply disappeared. Since I need to abort as soon as a runtime exception is caught, one alternative is to write code that leak this exception to other workers making them unwilling to proceed if an exception has been thrown by any other thread. Of course, this does not guarantee that the stream implementation simply keep on spawning new threads trying to complete the stream. So I will probably end up not using parallel streams and instead do "normal" concurrent programming using a thread pool/executor.

This show that the problem of lost runtime exceptions is not isolated to streams generated by Stream.generate() or streams using Stream.limit(). And bottom line is that I would love to know what .. is the expected behavior?

解决方案

There is no difference in the behavior of these two streams regarding exception reporting, the problem is that you put both tests one after another into one method and let them access shared data structures.

There is a subtle, perhaps not sufficiently documented (if intended) behavior: when a stream operation completes exceptionally, it does not wait for the completion of all concurrent operations.

So when you catch the exception of the first stream operation, there are still some threads running and accessing your shared data. So when you reset your AtomicBoolean, one of these threads belonging to the first job will read the false value, turn it to true, print the message and throw an exception which gets lost, as the stream operation already completed exceptionally. Further, some of these threads will raise your counter after you reset it, that’s why it has a higher number than the second job would allow. Your second job does not complete exceptionally, as all threads belonging to the second job will read a true value from the AtomicBoolean.

There are some ways to spot this.

When you remove the first stream operation, the second will complete exceptionally as expected. Also, inserting the statement

ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);

between the two stream operations will fix the problem, as it waits for the completion of all threads.

However, the cleaner solution would be to let both stream operations use their own counter and flag.

That said, there is a subtle, implementation dependent difference that causes the problem to disappear if you just swap the two operations. The IntStream.range operation produces a stream with a known size, which allows splitting it into concurrent tasks which intrinsically know, how many elements to process. This allows abandoning these tasks in the exceptional case as described above. On the other hand, combining an infinite stream as returned by generate with limit does not produce a sized stream (though that would be possible). Since such a stream is treated as having an unknown size, the subtasks have to synchronize on a counter to ensure that the limit is obeyed. This causes the sub-tasks to (sometimes) complete, even in the exceptional case. But as said, that is a side effect of an implementation detail, not an intentional wait for completion. And since it’s about concurrency, the result might be different, if you run it multiple times.

这篇关于当Java 8 Stream抛出RuntimeException时,预期的行为是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆