Java 8 CompletableFuture,Stream和Timeouts [英] Java 8 CompletableFuture , Stream and Timeouts

查看:125
本文介绍了Java 8 CompletableFuture,Stream和Timeouts的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 CompletableFuture
同时处理一些数据到目前为止我有:

i'm trying to process some amount of data concurrently using CompletableFuture and Stream So far i have:

public static void main(String[] args) throws InterruptedException, ExecutionException {
    System.out.println("start");

    List<String> collect = Stream.of("1", "2", "3", "4", "5",
            "6", "7")
            .map(x -> CompletableFuture.supplyAsync(getStringSupplier(x)))
            .collect(Collectors.toList())
            .stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
    System.out.println("stop out!");
}


public static Supplier<String> getStringSupplier(String text) {
    return () -> {

        System.out.println("start " + text);
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("stop " + text);
        return "asd" + text;
    };
}

输出正常:

start
start 1
start 4
start 3
start 2
start 5
start 6
开始7
止损4
止损1
止损5
止损2
止损6
止损3
止损7
止损out!

但是现在我想为该工作添加超时。让我们说它应该在1 SECOND之后取消。并将null或其他值返回 collect 列表。 (我希望有一些值表明原因)。

However right now i want to add timeout to that job. Lets say it should be canceled after 1 SECOND. And return null or some other value to collect list. (I would prefer some value indicating cause).

我如何实现这一目标?

How can i achieve that ?

提前感谢您的帮助。

推荐答案

我找到了这样做的方式:

I have found the way of doing that:

 private static final ScheduledExecutorService scheduler =
        Executors.newScheduledThreadPool(
                1,
                new ThreadFactoryBuilder()
                        .setDaemon(true)
                        .setNameFormat("failAfter-%d")
                        .build());

public static void main(String[] args) throws InterruptedException, ExecutionException {
    System.out.println("start");
    final CompletableFuture<Object> oneSecondTimeout = failAfter(Duration.ofSeconds(1))
            .exceptionally(xxx -> "timeout exception");
    List<Object> collect = Stream.of("1", "2", "3", "4", "5", "6", "7")
            .map(x -> CompletableFuture.anyOf(createTaskSupplier(x)
                    , oneSecondTimeout))
            .collect(Collectors.toList())
            .stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
    System.out.println("stop out!");
    System.out.println(collect);
}

public static CompletableFuture<String> createTaskSupplier(String x) {
    return CompletableFuture.supplyAsync(getStringSupplier(x))
            .exceptionally(xx -> "PROCESSING ERROR : " + xx.getMessage());
}


public static Supplier<String> getStringSupplier(String text) {
    return () -> {

        System.out.println("start " + text);
        try {
            TimeUnit.MILLISECONDS.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if (text.equals("1")) {
            throw new RuntimeException("LOGIC ERROR");
        }
        try {
            if (text.equals("7"))
                TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("stop " + text);
        return "result " + text;
    };
}

public static <T> CompletableFuture<T> failAfter(Duration duration) {
    final CompletableFuture<T> promise = new CompletableFuture<>();
    scheduler.schedule(() -> {
        final TimeoutException ex = new TimeoutException("Timeout after " + duration);
        return promise.completeExceptionally(ex);
    }, duration.toMillis(), MILLISECONDS);
    return promise;
}

它返回:

 start
 start 1
 start 3
 start 4
 start 2
 start 5
 start 6
 start 7
 stop 6
 stop 4
 stop 3
 stop 5
 stop 2
 stop out!
 [PROCESSING ERROR : java.lang.RuntimeException: LOGIC ERROR, result 2, result 3, result 4, result 5, result 6, timeout exception]`

您如何看待,您能否发现该解决方案的任何缺陷?

What do you think about that, can you spot any flaws of that solution ?

这篇关于Java 8 CompletableFuture,Stream和Timeouts的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆