如何在流中将1个完成的未来分成许多可完成的未来? [英] How to divide 1 completablefuture to many completablefuture in stream?

查看:101
本文介绍了如何在流中将1个完成的未来分成许多可完成的未来?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

例如我有这样的方法:

public CompletableFuture<Page> getPage(int i) {
    ...
}
public CompletableFuture<Document> getDocument(int i) {
    ...
}
public CompletableFuture<Void> parseLinks(Document doc) {
    ...
}

我的流程:

List<CompletableFuture> list = IntStream
    .range(0, 10)
    .mapToObj(i -> getPage(i))

    // I want method like this:
    .thenApplyAndSplit(CompletableFuture<Page> page -> {
        List<CompletableFuture<Document>> docs = page.getDocsId()
            .stream()
            .map(i -> getDocument(i))
            .collect(Collectors.toList());
        return docs;
    })
    .map(CompletableFuture<Document> future -> {
        return future.thenApply(Document doc -> parseLink(doc);
    })
    .collect(Collectors.toList());

它应该像 flatMap()那样 CompletableFuture ,所以我想实现这个流程:

It should by something like flatMap() for CompletableFuture, so I want to implement this flow:

List<Integer> -> Stream<CompletableFuture<Page>>
              -> Stream<CompletableFuture<Document>>
              -> parse each

UPDATE

Stream<CompletableFuture<Page>> pagesCFS = IntStream
        .range(0, 10)
        .mapToObj(i -> getPage(i));

Stream<CompletableFuture<Document>> documentCFS = listCFS.flatMap(page -> {
    // How to return stream of Document when page finishes?
    // page.thenApply( ... )
})


推荐答案

我还想尝试实施 Spliterator 用于 CompletableFutures 的流,所以这是我的尝试。

I also wanted to give a shot at implementing a Spliterator for streams of CompletableFutures, so here is my attempt.

注意如果您在 parallel 模式下使用此功能,请注意为流使用不同的 ForkJoinPool 以及在<?后面运行的任务code> CompletableFuture 的。流将等待期货完成,因此如果它们共享相同的执行程序,或者甚至遇到死锁,您实际上可能会失去性能。

Note that, if you are using this in parallel mode, pay attention to use a different ForkJoinPool for the stream and the tasks that are running behind the CompletableFuture's. The stream will wait for the futures to complete, so you could actually loose performance if they share the same executor, or even run into deadlocks.

所以这是实现:

public static <T> Stream<T> flattenStreamOfFutures(Stream<CompletableFuture<? extends T>> stream, boolean parallel) {
    return StreamSupport.stream(new CompletableFutureSpliterator<T>(stream), parallel);
}

public static <T> Stream<T> flattenStreamOfFuturesOfStream(Stream<CompletableFuture<? extends Stream<T>>> stream,
                                                           boolean parallel) {
    return flattenStreamOfFutures(stream, parallel).flatMap(Function.identity());
}

public static class CompletableFutureSpliterator<T> implements Spliterator<T> {
    private List<CompletableFuture<? extends T>> futures;

    CompletableFutureSpliterator(Stream<CompletableFuture<? extends T>> stream) {
        futures = stream.collect(Collectors.toList());
    }

    CompletableFutureSpliterator(CompletableFuture<T>[] futures) {
        this.futures = new ArrayList<>(Arrays.asList(futures));
    }

    CompletableFutureSpliterator(final List<CompletableFuture<? extends T>> futures) {
        this.futures = new ArrayList<>(futures);
    }

    @Override
    public boolean tryAdvance(final Consumer<? super T> action) {
        if (futures.isEmpty())
            return false;
        CompletableFuture.anyOf(futures.stream().toArray(CompletableFuture[]::new)).join();
        // now at least one of the futures has finished, get its value and remove it
        ListIterator<CompletableFuture<? extends T>> it = futures.listIterator(futures.size());
        while (it.hasPrevious()) {
            final CompletableFuture<? extends T> future = it.previous();
            if (future.isDone()) {
                it.remove();
                action.accept(future.join());
                return true;
            }
        }
        throw new IllegalStateException("Should not reach here");
    }

    @Override
    public Spliterator<T> trySplit() {
        if (futures.size() > 1) {
            int middle = futures.size() >>> 1;
            // relies on the constructor copying the list, as it gets modified in place
            Spliterator<T> result = new CompletableFutureSpliterator<>(futures.subList(0, middle));
            futures = futures.subList(middle, futures.size());
            return result;
        }
        return null;
    }

    @Override
    public long estimateSize() {
        return futures.size();
    }

    @Override
    public int characteristics() {
        return IMMUTABLE | SIZED | SUBSIZED;
    }
}

它通过转换给定的将< CompletableFuture< T>> 输入到那些期货的列表中 - 假设构建流很快,努力工作正在进行由期货本身完成,因此制作一份清单不应该是昂贵的。这也确保所有任务都已被触发,因为它强制处理源流。

It works by transforming the given Stream<CompletableFuture<T>> into a List of those futures — it is assumed that building the stream is fast, the hard work being done by the futures themselves, so making a list out of it shouldn't be costly. This also makes sure that all tasks are already triggered, as it forces to process the source stream.

为了生成输出流,它只是等待任何将来完成流式传输它的值。

For generating the output stream, it simply waits for any future to complete before streaming its value.

一个简单的非并行使用示例(执行程序用于 CompletableFuture ,为了同时启动所有这些):

A simple non-parallel usage example (the executor is used for the CompletableFutures, in order to start them all at the same time):

ExecutorService executor = Executors.newFixedThreadPool(20);
long start = System.currentTimeMillis();
flattenStreamOfFutures(IntStream.range(0, 20)
        .mapToObj(i -> CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep((i % 10) * 1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            System.out.println("Finished " + i + " @ " + (System.currentTimeMillis() - start) + "ms");
            return i;
        }, executor)), false)
        .forEach(x -> {
            System.out.println(Thread.currentThread().getName() + " @ " + (System.currentTimeMillis() - start) + "ms handle result: " + x);
        });
executor.shutdown();

输出:

Finished 10 @ 103ms
Finished 0 @ 105ms
main @ 114ms handle result: 10
main @ 114ms handle result: 0
Finished 1 @ 1102ms
main @ 1102ms handle result: 1
Finished 11 @ 1104ms
main @ 1104ms handle result: 11
Finished 2 @ 2102ms
main @ 2102ms handle result: 2
Finished 12 @ 2104ms
main @ 2105ms handle result: 12
Finished 3 @ 3102ms
main @ 3102ms handle result: 3
Finished 13 @ 3104ms
main @ 3105ms handle result: 13
…

如您所见,流几乎立即生成值,即使期货未按顺序完成。

As you can see, the stream produces the values almost instantly, even though the futures do not complete in order.

将其应用于问题中的示例,这将给出(假设 parseLinks()返回 CompletableFuture< String> 而不是〜< Void> ):

Applying it to the example in the question, this would give (assuming parseLinks() returns a CompletableFuture<String> instead of ~<Void>):

flattenStreamOfFuturesOfStream(IntStream.range(0, 10)
                .mapToObj(this::getPage)
                // the next map() will give a Stream<CompletableFuture<Stream<String>>>
                // hence the need for flattenStreamOfFuturesOfStream()
                .map(pcf -> pcf
                        .thenApply(page -> flattenStreamOfFutures(page
                                        .getDocsId()
                                        .stream()
                                        .map(this::getDocument)
                                        .map(docCF -> docCF.thenCompose(this::parseLinks)),
                                false))),
        false)
.forEach(System.out::println);

这篇关于如何在流中将1个完成的未来分成许多可完成的未来?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆