CompletableFuture没有得到执行.如果我使用ExecutorService池,则其工作正常,但不使用默认的forkJoin公共池 [英] CompletableFuture is not getting executed. If I use the ExecutorService pool its work as expected but not with the default forkJoin common pool

查看:89
本文介绍了CompletableFuture没有得到执行.如果我使用ExecutorService池,则其工作正常,但不使用默认的forkJoin公共池的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图运行以下类,使其终止而不执行CompletableFuture.

public class ThenApplyExample {

public static void main(String[] args) throws Exception {
    //ExecutorService es = Executors.newCachedThreadPool();
    CompletableFuture<Student> studentCompletableFuture = CompletableFuture.supplyAsync(() -> {

        try {

            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 3;
    })// If I put executorservice created n commented above, programme work as expected.
            .thenApply(i -> {

                for (int j = 0; j <= i; j++) {
                    System.out.println("Inside first then apply");
                }
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("First then apply is finished");
                return ++i;
            })
            .thenApply(i -> {
                System.out.println("Inside 2nd then apply");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Inside 2nd then apply stopped");

                return i++;
            })
            .thenApply(i -> {
                System.out.println("Inside 3nd then apply");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Inside 3nd then apply stopped");
                return "The i is ::: " + i;
            })
            .thenApply(s -> Student.builder().id(1).name(s).address("Some address").build());
    System.out.println("Executing..");
    System.out.println("Executing..");
    System.out.println("Executing..");
    System.out.println("Executing..");
    System.out.println("Executing..");

    //es.shutdown();
}
} 

我得到的输出是

Executing..
Executing..
Executing..
Executing..
Executing..

预期输出为

Executing..
Executing..
Executing..
Executing..
Executing..
Inside first then apply
Inside first then apply
Inside first then apply
Inside first then apply
First then apply is finished
Inside 2nd then apply
Inside 2nd then apply stopped
Inside 3nd then apply
Inside 3nd then apply stopped

注意::在上述程序中,我没有使用StudentCompletableFuture.get().我不想使用它,因为它会阻塞代码.

如果我在程序的最后添加了 studentCompletableFuture.get(),它会按预期运行,或者如果我在supplyAsync第二个参数中添加了 executorservice (请检查在节目中发表评论),它会再次按预期运行.

我的问题是,当程序使用默认的ForkJoin公共池时,为什么它会终止?

解决方案

Thread 的Javadoc(强调我的):

Java虚拟机启动时,通常只有一个 非守护程序线程(通常调用某些线程的名为main的方法 指定课程). Java虚拟机继续执行 线程,直到发生以下任何一种情况:

  • 已调用类Runtimeexit方法,并且安全管理器已允许进行退出操作.
  • 不是守护程序线程的所有线程都已死亡,方法是从调用返回到run方法,或者抛出异常 传播到run方法之外.

在您的代码中,您具有以下内容(简体):

public static void main(String[] args) {
    CompletableFuture<Student> future = CompletableFuture.supplyAsync(/* Supplier */)
            .andThen(/* Function One */)
            .andThen(/* Function Two */)
            .andThen(/* Function Three */)
            .andThen(/* Final Function */);
}

这使用了常见的ForkJoinPool,如上所述,它使用了守护程序线程.异步代码是从主线程启动的,但是您不必等待它完成.这意味着主线程退出,因此JVM也存在.这是在您的异步代码有机会完成之前发生的.

然后您尝试了呼叫get():

public static void main(String[] args) throws Exception {
    Student student = CompletableFuture.supplyAsync(/* Supplier */)
            .andThen(/* Function One */)
            .andThen(/* Function Two */)
            .andThen(/* Function Three */)
            .andThen(/* Final Function */)
            .get();
}

这是有效的,因为get()是阻止呼叫;这意味着主线程现在要等到异步代码完成后才能继续.换句话说,您将使主线程保持活动状态,从而使JVM保持活动状态.

当您使用来自Executors.newCachedThreadPool()的自己的ExeuctorService时,执行线程是非守护程序.这意味着异步代码正在这些非守护进程线程上运行,这使JVM保持活动状态,直到所述代码完成为止.实际上,即使您不调用ExecutorService.shutdown(),异步代码完成后 也可以使JVM保持活动状态(尽管缓存的线程池可能允许所有线程在一定时间后死亡). /p>

在问题注释中,您询问是否存在一种更优雅"的方法来使主线程保持活动状态(get()除外).我不确定您对优雅"的定义是什么,但是有

我不知道为什么会这样.运行此代码:

public static void main(String[] args) {
    CompletableFuture.runAsync(() -> {
        Thread t = Thread.currentThread();
        System.out.printf("Thread_Name: %s, Daemon: %s%n", t.getName(), t.isDaemon());
    }).join();
}

给我这个:

 Thread_Name: ForkJoinPool.commonPool-worker-9, Daemon: true
 

I am trying to run the following class its getting terminated without executing the CompletableFuture.

public class ThenApplyExample {

public static void main(String[] args) throws Exception {
    //ExecutorService es = Executors.newCachedThreadPool();
    CompletableFuture<Student> studentCompletableFuture = CompletableFuture.supplyAsync(() -> {

        try {

            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 3;
    })// If I put executorservice created n commented above, programme work as expected.
            .thenApply(i -> {

                for (int j = 0; j <= i; j++) {
                    System.out.println("Inside first then apply");
                }
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("First then apply is finished");
                return ++i;
            })
            .thenApply(i -> {
                System.out.println("Inside 2nd then apply");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Inside 2nd then apply stopped");

                return i++;
            })
            .thenApply(i -> {
                System.out.println("Inside 3nd then apply");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Inside 3nd then apply stopped");
                return "The i is ::: " + i;
            })
            .thenApply(s -> Student.builder().id(1).name(s).address("Some address").build());
    System.out.println("Executing..");
    System.out.println("Executing..");
    System.out.println("Executing..");
    System.out.println("Executing..");
    System.out.println("Executing..");

    //es.shutdown();
}
} 

Output I am getting is

Executing..
Executing..
Executing..
Executing..
Executing..

Whereas expected output is

Executing..
Executing..
Executing..
Executing..
Executing..
Inside first then apply
Inside first then apply
Inside first then apply
Inside first then apply
First then apply is finished
Inside 2nd then apply
Inside 2nd then apply stopped
Inside 3nd then apply
Inside 3nd then apply stopped

Note : In the above programme, I am not using studentCompletableFuture.get(). I dont want to use it since it blocks the code.

If I add studentCompletableFuture.get() at the very end of the programme, it works as expected or if I add the executorservice in the supplyAsync 2nd argument(check comment in programme), it works again as expected.

My question is why it is terminating when programme uses the default ForkJoin common pool?

解决方案

The threads in the ForkJoinPool are daemon threads (at least by default). Here's the relevant Javadoc (emphasis mine):

A ForkJoinPool differs from other kinds of ExecutorService mainly by virtue of employing work-stealing: all threads in the pool attempt to find and execute tasks submitted to the pool and/or created by other active tasks (eventually blocking waiting for work if none exist). This enables efficient processing when most tasks spawn other subtasks (as do most ForkJoinTasks), as well as when many small tasks are submitted to the pool from external clients. Especially when setting asyncMode to true in constructors, ForkJoinPools may also be appropriate for use with event-style tasks that are never joined. All worker threads are initialized with Thread.isDaemon() set true.

The threads in the ExecutorService created by Executors are non-daemon threads. The exception, I believe, is Executors.newWorkStealingPool. I couldn't find documentation on this but this is how it's currently implemented at least. You can change this by supplying a custom ThreadFactory.

Daemon threads do not keep the JVM alive as stated in Thread's Javadoc (emphasis mine):

When a Java Virtual Machine starts up, there is usually a single non-daemon thread (which typically calls the method named main of some designated class). The Java Virtual Machine continues to execute threads until either of the following occurs:

  • The exit method of class Runtime has been called and the security manager has permitted the exit operation to take place.
  • All threads that are not daemon threads have died, either by returning from the call to the run method or by throwing an exception that propagates beyond the run method.

In your code you have the following (simplified):

public static void main(String[] args) {
    CompletableFuture<Student> future = CompletableFuture.supplyAsync(/* Supplier */)
            .andThen(/* Function One */)
            .andThen(/* Function Two */)
            .andThen(/* Function Three */)
            .andThen(/* Final Function */);
}

This uses the common ForkJoinPool which, as stated, uses daemon threads. The async code is launched from the main thread but you don't wait for it to complete. This means the main thread exits and therefore the JVM also exists. This happens before your async code had a chance to complete.

Then you tried a call to get():

public static void main(String[] args) throws Exception {
    Student student = CompletableFuture.supplyAsync(/* Supplier */)
            .andThen(/* Function One */)
            .andThen(/* Function Two */)
            .andThen(/* Function Three */)
            .andThen(/* Final Function */)
            .get();
}

This works because get() is a blocking call; meaning the main thread now waits until the async code completes before continuing. In other words, you are keeping the main thread alive which keeps the JVM alive.

When you use your own ExeuctorService from Executors.newCachedThreadPool() the executing threads are non-daemon. This means the async code is being run on these non-daemon threads which keeps the JVM alive until said code completes. In fact, it would keep the JVM alive even after the async code completes if you don't call ExecutorService.shutdown() (though a cached thread pool might allow all threads to die after a certain time).

In the question comments you ask if there is a more "elegant" way to keep the main thread alive (other than get()). I'm not sure what your definition of "elegant" is but there is the CompletableFuture.join() method. It waits, like get(), for the future to complete (normally or exceptionally) before it returns. Unlike get(), however, it doesn't throw checked exceptions; but the wait also cannot be interrupted.


You also state,

I just checked ForkJoinPool common threads are not daemon thread by using System.out.println("The thread is :: "+Thread.currentThread().getName() + Thread.currentThread().isDaemon());

I don't know why that's the case. Running this code:

public static void main(String[] args) {
    CompletableFuture.runAsync(() -> {
        Thread t = Thread.currentThread();
        System.out.printf("Thread_Name: %s, Daemon: %s%n", t.getName(), t.isDaemon());
    }).join();
}

Gives me this:

Thread_Name: ForkJoinPool.commonPool-worker-9, Daemon: true

这篇关于CompletableFuture没有得到执行.如果我使用ExecutorService池,则其工作正常,但不使用默认的forkJoin公共池的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆