注册流“完成”钩 [英] Register a Stream "completion" hook

查看:73
本文介绍了注册流“完成”钩的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用Java 8 Stream API,我想注册一个完成挂钩,行如下:

Using the Java 8 Stream API, I would like to register a "completion hook", along the lines of:

Stream<String> stream = Stream.of("a", "b", "c");

// additional filters / mappings that I don't control
stream.onComplete((Completion c) -> {
    // This is what I'd like to do:
    closeResources();

    // This might also be useful:
    Optional<Throwable> exception = c.exception();
    exception.ifPresent(e -> throw new ExceptionWrapper(e));
});

我之所以这样做是因为我想将资源包装在<$ c中$ c> Stream 供API客户端使用,我希望 Stream 在消耗后自动清理资源。如果可能,那么客户可以致电:

The reason why I want to do that is because I want to wrap a resource in a Stream for API clients to consume, and I want that Stream to clean up the resource automatically once it is consumed. If that were possible, then the client could call:

Collected collectedInOneGo =
Utility.something()
       .niceLookingSQLDSL()
       .moreDSLFeatures()
       .stream()
       .filter(a -> true)
       .map(c -> c)
       .collect(collector);

而不是目前所需要的:

try (Stream<X> meh = Utility.something()
                            .niceLookingSQLDSL()
                            .moreDSLFeatures()
                            .stream()) {

    Collected collectedWithUglySyntacticDissonance =
    meh.filter(a -> true)
       .map(c -> c)
       .collect(collector);
}

理想情况下,我想进入 java.util.stream.ReferencePipeline 的各种方法,例如:

Ideally, I'd like to get into the java.util.stream.ReferencePipeline's various methods, such as:

@Override
final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
    try {

        // Existing loop
        do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
    }

    // These would be nice:
    catch (Throwable t) {
        completion.onFailure(t);
    }
    finally {
        completion.onSuccess();
    }
}

现有的JDK是否有一种简单的方法可以做到这一点8 API?

Is there an easy way to do this with existing JDK 8 API?

推荐答案

任何拦截终端操作的解决方案,除了 flatMap - 基于解决方案(由@Holger提出)对于以下代码将是脆弱的:

Any solution intercepting the terminal operations except flatMap-based solution (as proposed by @Holger) would be fragile to the following code:

Stream<String> stream = getAutoCloseableStream();
if(stream.iterator().hasNext()) {
    // do something if stream is non-empty
}

此类使用在规范中绝对合法。不要忘记 iterator() spliterator()是终端流操作,但在执行后你还需要一个访问流源。在任何状态下放弃 Iterator Spliterator 也是完全有效的,所以你不知道它是否会被使用进一步与否。

Such usage is absolutely legal by the specification. Do not forget that iterator() and spliterator() are terminal stream operations, but after their execution you still need an access to the stream source. Also it's perfectly valid to abandon the Iterator or Spliterator in any state, so you just cannot know whether it will be used further or not.

您可以考虑建议用户不要使用 iterator() spliterator(),但这段代码呢?

You may consider advicing users not to use iterator() and spliterator(), but what about this code?

Stream<String> stream = getAutoCloseableStream();
Stream.concat(stream, Stream.of("xyz")).findFirst();

这内部使用 spliterator()。tryAdvance()对于第一个流,然后放弃它(尽管在显式调用结果流 close()时关闭)。您还需要让用户不要使用 Stream.concat 。据我所知,你的库里面你经常使用 iterator() / spliterator(),所以你需要重新考虑所有这些地方可能出现的问题。当然,还有很多其他库也使用 iterator() / spliterator()并且可能会短之后的电路:所有这些都与你的功能不兼容。

This internally uses spliterator().tryAdvance() for the first stream, then abandons it (though closes if the resulting stream close() is called explicitly). You will need to ask your users not to use Stream.concat as well. And as far as I know internally in your library you are using iterator()/spliterator() pretty often, so you will need to revisit all these places for possible problems. And, of course there are plenty of other libraries which also use iterator()/spliterator() and may short-circuit after that: all of them would become incompatible with your feature.

为什么 flatMap 基于此的解决方案在这里工作?因为在第一次调用 hasNext() tryAdvance()时,它会转储整个将内容流式传输到中间缓冲区并关闭原始流源。因此,根据流大小,您可能会浪费很多中间内存,甚至有 OutOfMemoryError

Why flatMap-based solution works here? Because upon the first call of the hasNext() or tryAdvance() it dumps the entire stream content into the intermediate buffer and closes the original stream source. So depending on the stream size you may waste much intermediate memory or even have OutOfMemoryError.

您也可以考虑保留 PhantomReference Stream 对象并监控 ReferenceQueue 。在这种情况下,完成将由垃圾收集器触发(这也有一些缺点)。

You may also consider keeping the PhantomReferences to the Stream objects and monitoring the ReferenceQueue. In this case the completion will be triggered by garbage collector (which also has some drawbacks).

总之,我的建议是继续使用try-with-resources。

In conclusion my advice is to stay with try-with-resources.

这篇关于注册流“完成”钩的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆