超时后在Java CompletableFuture中停止线程 [英] Stopping a thread in java CompletableFuture after timeout

查看:564
本文介绍了超时后在Java CompletableFuture中停止线程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的Java代码中有一个异步链,我想在某个超时后停止所以我创建了一个带有一些线程的threadPool,并这样称呼CompletableFuture

  ExecutorService池= Executors.newFixedThreadPool(10); 

比我有一种循环方法,一旦所有CompletableFutures完成,它就会再次从db中加载数据并执行一些任务

  CompletableFuture< MyObject>futureTask =CompletableFuture.supplyAsync(()->候选者,池).thenApply(Task1 :: doWork).thenApply(Task2 :: doWork).thenApply(Task3 :: doWork).thenApply(Task4 :: doWork).thenApply(Task5 :: doWork).orTimeout(30,TimeUnit.SECONDS).thenApply(Task6 :: doWork).orTimeout(30,TimeUnit.SECONDS).exceptionally(ExceptionHandlerService :: handle); 

我的问题出在 task6 上,它有一个非常繁重的任务(它的网络连接任务有时会永远挂起)我注意到30秒后我的orTimeout被正确触发,但是运行Task6的线程仍在运行

经过这样的几个周期后,我所有的线程都被耗尽,我的应用程序死掉了

如何在超时后取消池中正在运行的线程?(不调用pool.shutdown())

更新 *在主线程中,我做了一个简单的检查,如下所示

  for(int i = TIME_OUT_SECONDS; i> = 0; i--){unfinishedTasks = handleFutureTasks(unfinishedTasks,totalBatchSize);if(unfinishedTasks.isEmpty()){休息;}if(i == 0){//处理任务的取消for(CompletableFuture< ComplianceCandidate>任务:unfinishedTasks){** task.cancel(true); **log.error(达到的任务超时,被取消:{}",task.isCancelled());}休息;}尝试 {TimeUnit.SECONDS.sleep(1);} catch(ex Exception){}} 

我看到的是几个周期后,所有任务都抱怨超时...在最初的1-2个周期中,我仍然得到预期的响应(尽管有线程来处理它)

我仍然觉得线程池已耗尽

解决方案

我知道您说调用 pool.shutDown ,但是根本没有其他方法.不过,当您查看阶段时,它们将在附加"线程中的任何一个中运行.它们(添加那些 thenApply )或您定义的该池中的线程.也许一个例子应该更有意义.

 公共类SO64743332 {静态ExecutorService池= Executors.newFixedThreadPool(10);公共静态void main(String [] args){CompletableFuture< String>f1 = CompletableFuture.supplyAsync(()-> dbCall(),pool);//simulateWork(4);CompletableFuture< String>f2 = f1.thenApply(x-> {System.out.println(Thread.currentThread().getName());返回transformationOne(x);});CompletableFuture< String>f3 = f2.thenApply(x-> {System.out.println(Thread.currentThread().getName());返回TransformationTwo(x);});f3.join();}私有静态字符串dbCall(){SimulationWork(2);返回"a";}私人静态字串TransformationOne(字串输入){返回输入+"b";}私人静态字串TransformationTwo(字串输入){返回输入+"b";}私人静态无效的SimulationWork(int秒){尝试 {Thread.sleep(TimeUnit.SECONDS.toMillis(seconds));} catch(InterruptedException e){System.out.println("Interrupted!");e.printStackTrace();}}} 

上面代码的关键点是: simulateWork(4); .运行带有注释的代码,然后取消注释.看看实际上哪个线程将执行所有 thenApply .它是池中的 main same 线程,这意味着尽管您定义了一个池-它只是该池中的一个线程,它将执行所有这些阶段./p>

在这种情况下,您可以定义一个将执行所有这些阶段的单线程执行器(比如说在一个方法内部).这样,您可以控制何时调用 shutDownNow 并可能中断正在运行的任务(如果您的代码响应中断).这是一个模拟的人造例子:

 公共类SO64743332 {公共静态void main(String [] args){执行();}公共静态无效execute(){ExecutorService池= Executors.newSingleThreadExecutor();CompletableFuture< String>cf1 = CompletableFuture.supplyAsync(()-> dbCall(),pool);CompletableFuture< String>cf2 = cf1.thenApply(x-> TransformationOne(x));//给足够的时间让TransformationOne开始,但没有完成SimulationWork(2);尝试 {CompletableFuture< String>cf3 = cf2.thenApply(x-> TransformationTwo(x)).orTimeout(4,TimeUnit.SECONDS);cf3.get(10,TimeUnit.SECONDS);} catch(ExecutionException | InterruptedException | TimeoutException e){pool.shutdownNow();}}私有静态字符串dbCall(){System.out.println(开始的数据库调用");SimulationWork(1);System.out.println(通过数据库调用完成");返回"a";}私人静态字串TransformationOne(字串输入){System.out.println(开始工作");SimulationWork(10);System.out.println(完成工作");返回输入+"b";}私人静态字串TransformationTwo(字串输入){System.out.println(开始转换二");返回输入+"b";}私人静态无效的SimulationWork(int秒){尝试 {Thread.sleep(TimeUnit.SECONDS.toMillis(seconds));} catch(InterruptedException e){System.out.println("Interrupted!");e.printStackTrace();}}} 

运行此命令,您应该注意到 transformationOne 已启动,但是由于 shutDownNow 而被中断.

这样做的缺点很明显,每次调用 execute 都会创建一个新的线程池...

I have an async chain in my java code that i want to stop after a certain timeout so i created a threadPool with some threads and called the CompletableFuture like this

ExecutorService pool = Executors.newFixedThreadPool(10);

than i have a cyclic method that loads data from the db and executes some task on it, once all the CompletableFutures are completed its doing it again

CompletableFuture<MyObject> futureTask =
                CompletableFuture.supplyAsync(() -> candidate, pool)
                .thenApply(Task1::doWork).thenApply(Task2::doWork).thenApply(Task3::doWork)
                .thenApply(Task4::doWork).thenApply(Task5::doWork).orTimeout(30,TimeUnit.SECONDS)
                .thenApply(Task6::doWork).orTimeout(30,TimeUnit.SECONDS)
                .exceptionally(ExceptionHandlerService::handle);

My problem is in task6, that has a very intensive task (its a network connection task that sometimes hangs forever) i noticed that my orTimeout is being fired correctly after 30 seconds, but the thread running Task6 is still being running

after few cycles like this, all my threads are drained and my app dies

How can i cancel the running threads on the pool after the timeout has reached? (without calling pool.shutdown())

UPDATE* inside the main thread i did a simple check as shown here

for (int i = TIME_OUT_SECONDS; i >= 0; i--) {
                unfinishedTasks = handleFutureTasks(unfinishedTasks, totalBatchSize);
                if(unfinishedTasks.isEmpty()) {
                    break;
                }
                if(i==0) {
                    //handle cancelation of the tasks
                    for(CompletableFuture<ComplianceCandidate> task: unfinishedTasks) {
                        **task.cancel(true);**
                        log.error("Reached timeout on task, is canceled: {}", task.isCancelled());
                    }
                    break;
                }
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (Exception ex) {
                }
            }

What i see is that after few cycles, all the tasks complain about timeout... in the first 1-2 cycles, i still get epected responses (while there are threads to process it)

i still feel that the thread pool is exhausted

解决方案

I know you said without calling pool.shutDown, but there is simply no other way. When you look at your stages though, they will run in either the thread that "appends" them (adding those thenApply) or a thread from that pool that you define. May be an example should make more sense.

public class SO64743332 {

    static ExecutorService pool = Executors.newFixedThreadPool(10);

    public static void main(String[] args) {

        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> dbCall(), pool);

        //simulateWork(4);

        CompletableFuture<String> f2 = f1.thenApply(x -> {
            System.out.println(Thread.currentThread().getName());
            return transformationOne(x);
        });

        CompletableFuture<String> f3 = f2.thenApply(x -> {
            System.out.println(Thread.currentThread().getName());
            return transformationTwo(x);
        });

        f3.join();
    }

    private static String dbCall() {
        simulateWork(2);
        return "a";
    }

    private static String transformationOne(String input) {
        return input + "b";
    }

    private static String transformationTwo(String input) {
        return input + "b";
    }

    private static void simulateWork(int seconds) {
        try {
            Thread.sleep(TimeUnit.SECONDS.toMillis(seconds));
        } catch (InterruptedException e) {
            System.out.println("Interrupted!");
            e.printStackTrace();
        }
    }
}

They key point of the above code is this : simulateWork(4);. Run the code with it commented out and then uncomment it. See what thread is actually going to execute all those thenApply. It is either main or the same thread from the pool, meaning although you have a pool defined - it's only a single thread from that pool that will execute all those stages.

In this context, you could define a single thread executor (inside a method let's say) that will run all those stages. This way you could control when to call shutDownNow and potentially interrupt (if your code responds to interrupts) the running task. Here is a made-up example that simulates that:

public class SO64743332 {

    public static void main(String[] args) {
        execute();
    }


    public static void execute() {

        ExecutorService pool = Executors.newSingleThreadExecutor();

        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> dbCall(), pool);
        CompletableFuture<String> cf2 = cf1.thenApply(x -> transformationOne(x));

        // give enough time for transformationOne to start, but not finish
        simulateWork(2);

        try {
            CompletableFuture<String> cf3 = cf2.thenApply(x -> transformationTwo(x))
                                               .orTimeout(4, TimeUnit.SECONDS);
            cf3.get(10, TimeUnit.SECONDS);
        } catch (ExecutionException | InterruptedException | TimeoutException e) {
            pool.shutdownNow();
        }

    }

    private static String dbCall() {
        System.out.println("Started DB call");
        simulateWork(1);
        System.out.println("Done with DB call");
        return "a";
    }

    private static String transformationOne(String input) {
        System.out.println("Started work");
        simulateWork(10);
        System.out.println("Done work");
        return input + "b";
    }

    private static String transformationTwo(String input) {
        System.out.println("Started transformation two");
        return input + "b";
    }

    private static void simulateWork(int seconds) {
        try {
            Thread.sleep(TimeUnit.SECONDS.toMillis(seconds));
        } catch (InterruptedException e) {
            System.out.println("Interrupted!");
            e.printStackTrace();
        }
    }
}

Running this you should notice that transformationOne starts, but it is interrupted because of the shutDownNow.

The drawback of this should be obvious, every invocation of execute will create a new thread pool...

这篇关于超时后在Java CompletableFuture中停止线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆