如何在没有isFinite()和isOrdered()方法的情况下安全地使用Java Streams? [英] How to safely consume Java Streams safely without isFinite() and isOrdered() methods?

查看:56
本文介绍了如何在没有isFinite()和isOrdered()方法的情况下安全地使用Java Streams?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

关于Java方法是否应返回集合或流的问题,其中Brian Goetz回答说,即使对于有限序列,通常也应首选Streams.

但是在我看来,当前对来自其他地方的Streams的许多操作无法安全地执行,并且防御性的代码保护措施是不可能的,因为Streams不会显示它们是无限的还是无序的.

如果要在Stream()上执行的操作遇到并行问题,我可以调用isParallel()进行检查或顺序执行,以确保计算是并行的(如果我记得的话).

但是,如果有序性或无穷大与我的程序的安全性相关,则我无法编写保护措施.

假设我使用了一个实现此虚拟接口的库:

 public interface CoordinateServer {
    public Stream<Integer> coordinates();
    // example implementations:
    // IntStream.range(0, 100).boxed()   // finite, ordered, sequential
    // final AtomicInteger atomic = new AtomicInteger();
    // Stream.generate(() -> atomic2.incrementAndGet()) // infinite, unordered, sequential
    // Stream.generate(() -> atomic2.incrementAndGet()).parallel() // infinite, unordered, parallel
}
 

那我可以安全地对此流调用哪些操作以编写正确的算法?

看来,如果我可能想将元素写入文件中是有副作用的,我需要担心流是并行的:

// if stream is parallel, which order will be written to file?
coordinates().peek(i -> {writeToFile(i)}).count();
// how should I remember to always add sequential() in  such cases?

并且如果它是并行的,则基于什么线程池是并行的?

如果我想对流进行排序(或其他非短路操作),则在某种程度上需要谨慎对待它的无限性:

coordinates().sorted().limit(1000).collect(toList()); // will this terminate?
coordinates().allMatch(x -> x > 0); // will this terminate?

我可以在排序之前强加一个限制,但是如果我希望有一个未知大小的有限流,那应该是哪个幻数呢?

最后,也许我想并行计算以节省时间,然后收集结果:

// will result list maintain the same order as sequential?
coordinates().map(i -> complexLookup(i)).parallel().collect(toList());

但是,如果不对流进行排序(在该库的该版本中),则由于并行处理,结果可能会混乱.但是除了不使用并行(这违反了性能目的)之外,我该如何防范呢?

集合是明确的关于是有限的还是无限的,关于是否有序,并且它们不带有处理模式或线程池.这些似乎是API的宝贵属性.

此外,有时可能需要关闭流,但最常见的是.如果我使用某个方法的流(来自某个方法参数的),通常应该调用close吗?

另外,流可能已经被消耗掉了,能够优雅地处理该情况将是一个好习惯,因此检查流是否已被消耗;

我希望有一些代码片段可以用于在处理之前验证有关流的假设,例如>

 Stream<X> stream = fooLibrary.getStream();
Stream<X> safeStream = StreamPreconditions(
    stream, 
    /*maxThreshold or elements before IllegalArgumentException*/
    10_000,
    /* fail with IllegalArgumentException if not ordered */
    true
    )

 

解决方案

经过一番研究(一些实验和 Venkat 找到一条推文为止-是一个可能无限的操作,但是您要等到找到这样的推文.因此,在这种情况下,只需使用stream.filter(tweet -> isByVenkat(tweet)).findAny()-它就会重复进行,直到出现这样一条推文(或永远如此).

  • 另一种情况,可能是更常见的情况,是想对所有元素执行某项操作,或者只想尝试一定的时间(类似于超时).为此,我建议始终先调用stream.limit(x),然后再调用您的操作(collectallMatch或类似方法),其中x是您愿意承受的尝试次数.
  • 毕竟,我只是提到我认为返回流通常不是一个好主意,除非有很大的好处,否则我会尽量避免使用它.

    There is the question on whether java methods should return Collections or Streams, in which Brian Goetz answers that even for finite sequences, Streams should usually be preferred.

    But it seems to me that currently many operations on Streams that come from other places cannot be safely performed, and defensive code guards are not possible because Streams do not reveal if they are infinite or unordered.

    If parallel was a problem to the operations I want to perform on a Stream(), I can call isParallel() to check or sequential to make sure computation is in parallel (if i remember to).

    But if orderedness or finity(sizedness) was relevant to the safety of my program, I cannot write safeguards.

    Assuming I consume a library implementing this fictitious interface:

    public interface CoordinateServer {
        public Stream<Integer> coordinates();
        // example implementations:
        // IntStream.range(0, 100).boxed()   // finite, ordered, sequential
        // final AtomicInteger atomic = new AtomicInteger();
        // Stream.generate(() -> atomic2.incrementAndGet()) // infinite, unordered, sequential
        // Stream.generate(() -> atomic2.incrementAndGet()).parallel() // infinite, unordered, parallel
    }
    

    Then what operations can I safely call on this stream to write a correct algorithm?

    It seems if I maybe want to do write the elements to a file as a side-effect, I need to be concerned about the stream being parallel:

    // if stream is parallel, which order will be written to file?
    coordinates().peek(i -> {writeToFile(i)}).count();
    // how should I remember to always add sequential() in  such cases?
    

    And also if it is parallel, based on what Threadpool is it parallel?

    If I want to sort the stream (or other non-short-circuit operations), I somehow need to be cautious about it being infinite:

    coordinates().sorted().limit(1000).collect(toList()); // will this terminate?
    coordinates().allMatch(x -> x > 0); // will this terminate?
    

    I can impose a limit before sorting, but which magic number should that be, if I expect a finite stream of unknown size?

    Finally maybe I want to compute in parallel to save time and then collect the result:

    // will result list maintain the same order as sequential?
    coordinates().map(i -> complexLookup(i)).parallel().collect(toList());
    

    But if the stream is not ordered (in that version of the library), then the result might become mangled due to the parallel processing. But how can I guard against this, other than not using parallel (which defeats the performance purpose)?

    Collections are explicit about being finite or infinite, about having an order or not, and they do not carry the processing mode or threadpools with them. Those seem like valuable properties for APIs.

    Additionally, Streams may sometimes need to be closed, but most commonly not. If I consume a stream from a method (of from a method parameter), should I generally call close?

    Also, streams might already have been consumed, and it would be good to be able to handle that case gracefully, so it would be good to check if the stream has already been consumed;

    I would wish for some code snippet that can be used to validate assumptions about a stream before processing it, like>

    Stream<X> stream = fooLibrary.getStream();
    Stream<X> safeStream = StreamPreconditions(
        stream, 
        /*maxThreshold or elements before IllegalArgumentException*/
        10_000,
        /* fail with IllegalArgumentException if not ordered */
        true
        )
    
    

    解决方案

    After looking at things a bit (some experimentation and here) as far as I see, there is no way to know definitely whether a stream is finite or not.

    More than that, sometimes even it is not determined except at runtime (such as in java 11 - IntStream.generate(() -> 1).takeWhile(x -> externalCondition(x))).

    What you can do is:

    1. You can find out with certainty if it is finite, in a few ways (notice that receiving false on these does not mean it is infinite, only that it may be so):

      1. stream.spliterator().getExactSizeIfKnown() - if this has an known exact size, it is finite, otherwise it will return -1.

      2. stream.spliterator().hasCharacteristics(Spliterator.SIZED) - if it is SIZED will return true.

    2. You can safe-guard yourself, by assuming the worst (depends on your case).

      1. stream.sequential()/stream.parallel() - explicitly set your preferred consumption type.
      2. With potentially infinite stream, assume your worst case on each scenario.

        1. For example assume you want listen to a stream of tweets until you find one by Venkat - it is a potentially infinite operation, but you'd like to wait until such a tweet is found. So in this case, simply go for stream.filter(tweet -> isByVenkat(tweet)).findAny() - it will iterate until such a tweet comes along (or forever).
        2. A different scenario, and probably the more common one, is wanting to do something on all the elements, or only to try a certain amount of time (similar to timeout). For this, I'd recommend always calling stream.limit(x) before calling your operation (collect or allMatch or similar) where x is the amount of tries you're willing to tolerate.

    After all this, I'll just mention that I think returning a stream is generally not a good idea, and I'd try to avoid it unless there are large benefits.

    这篇关于如何在没有isFinite()和isOrdered()方法的情况下安全地使用Java Streams?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆