什么终止了我的Java ExecutorService [英] What is terminating my Java ExecutorService

查看:96
本文介绍了什么终止了我的Java ExecutorService的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我最初用一个更复杂的子类看到了这个问题,但是我已经简化了,所以现在只包含了一些其他调试功能,并且仍然遇到相同的问题.

I originally saw this issue with a more complex subclass of ThreadPoolExecutor, but I have simplified so now contains not much more than some additional debugging, and still get the same problem.

import com.jthink.songkong.cmdline.SongKong;
import com.jthink.songkong.ui.MainWindow;
import com.jthink.songkong.util.SongKongThreadFactory;

import java.util.concurrent.*;
import java.util.logging.Level;



public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor
{
    /**
     * Uses the default CallerRunsPolicy when queue is full
     *  @param workerSize
     * @param threadFactory
     * @param queue
     */
    public TimeoutThreadPoolExecutor(int workerSize, ThreadFactory threadFactory, LinkedBlockingQueue<Runnable> queue)
    {
        super(workerSize, workerSize, 0L, TimeUnit.MILLISECONDS, queue, threadFactory, new CallerRunsPolicy());
    }

    /**
     * Allow caller to specify the RejectedExecutionPolicy
     *  @param workerSize
     * @param threadFactory
     * @param queue
     * @param reh
     */
    public TimeoutThreadPoolExecutor(int workerSize, ThreadFactory threadFactory, LinkedBlockingQueue<Runnable> queue, RejectedExecutionHandler reh)
    {
        super(workerSize, workerSize, 0L, TimeUnit.MILLISECONDS, queue, threadFactory, reh);
    }

    @Override
    public <T> FutureCallable<T> newTaskFor(Callable<T> callable) {
        return new FutureCallable<T>(callable);
    }

    /**
     * Check not been paused
     *
     * @param t
     * @param r
     */
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        SongKong.checkIn();
    }

    /**
     * After execution
     *
     * @param r
     * @param t
     */
    @Override
    protected void afterExecute(Runnable r, Throwable t)
    {
        super.afterExecute(r, t);

        if (t == null && r instanceof Future<?>)
        {
            try
            {
              Object result = ((Future<?>) r).get();
            }
            catch (CancellationException ce)
            {
                t = ce;
            }
            catch (ExecutionException ee)
            {
                t = ee.getCause();
            }
            catch (InterruptedException ie)
            {
                Thread.currentThread().interrupt(); // ignore/reset
            }
        }
        if (t != null)
        {
            MainWindow.logger.log(Level.SEVERE, "AFTER EXECUTE---" + t.getMessage(), t);
        }
    }

    @Override
    protected void terminated()
    {
        //All tasks have completed either naturally or via being cancelled by timeout task so close the timeout task
        MainWindow.logger.severe("---Terminated:"+((SongKongThreadFactory)getThreadFactory()).getName());
        MainWindow.userInfoLogger.severe("---Terminated:"+((SongKongThreadFactory)getThreadFactory()).getName());
        StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
        for(StackTraceElement ste:stackTrace)
        {
            MainWindow.logger.log(Level.SEVERE, ste.toString());
        }
        for(StackTraceElement ste:stackTrace)
        {
            MainWindow.userInfoLogger.log(Level.SEVERE, ste.toString());
        }
    }

    @Override
    public void shutdown()
    {
        MainWindow.logger.severe("---Shutdown:"+((SongKongThreadFactory)getThreadFactory()).getName());
        MainWindow.userInfoLogger.severe("---Shutdown:"+((SongKongThreadFactory)getThreadFactory()).getName());
        StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
        for(StackTraceElement ste:stackTrace)
        {
            MainWindow.logger.log(Level.SEVERE, ste.toString());
        }
        for(StackTraceElement ste:stackTrace)
        {
            MainWindow.userInfoLogger.log(Level.SEVERE, ste.toString());
        }
        super.shutdown();
    }
}

以下类正在使用此ExecutorService,该类允许实例异步提交任务,在所有提交的任务完成之前,不应关闭ExecutorService.

This ExecutorService is being used by the following class, that allow instance to asynchronously submit tasks, the ExecutorService should not be shutdown until all submitted tasks have completed.

package com.jthink.songkong.analyse.analyser;

import com.jthink.songkong.preferences.GeneralPreferences;
import com.jthink.songkong.ui.MainWindow;
import com.jthink.songkong.util.SongKongThreadFactory;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;

/**
 *  Sets a timeout of each task submitted and cancel them if take longer than the timeout
 *
 *  The timeout is set to 30 minutes, we only want to call if really broken, it should not happen under usual circumstances
 */
public class MainAnalyserService extends AnalyserService
{
    //For monitoring/controlling when finished
    private final AtomicInteger pendingItems = new AtomicInteger(0);
    private final CountDownLatch latch = new CountDownLatch(1);

    //If task has not completed 30 minutes after it started (added to queue) then it should be cancelled
    private static final int TIMEOUT_PER_TASK = 30;

    private static MainAnalyserService mas;

    public static MainAnalyserService getInstanceOf()
    {
        return mas;
    }

    public static MainAnalyserService create(String threadGroup)
    {
        mas = new MainAnalyserService(threadGroup);
        return mas;
    }

    public MainAnalyserService(String threadGroup)
    {
        super(threadGroup);
        initExecutorService();
    }

    /**
    Configure thread to match cpus but even if single cpu ensure have at least two threads to protect against
    scenario where there is only cpu and that thread is waiting on i/o rather than being cpu bound this would allow
    other thread to do something.
     */
    @Override
    protected void initExecutorService()
    {
        int workerSize = GeneralPreferences.getInstance().getWorkers();
        if(workerSize==0)
        {
            workerSize = Runtime.getRuntime().availableProcessors();
        }
        //Even if only have single cpu we still have multithread so we dont just have single thread waiting on I/O
        if(workerSize< MIN_NUMBER_OF_WORKER_THREADS)
        {
            workerSize = MIN_NUMBER_OF_WORKER_THREADS;
        }
        MainWindow.userInfoLogger.severe("Workers Configuration:"+ workerSize);
        MainWindow.logger.severe("Workers Configuration:"+ workerSize);

        executorService = new TimeoutThreadPoolExecutor(workerSize,
                new SongKongThreadFactory(threadGroup),
                new LinkedBlockingQueue<Runnable>(BOUNDED_QUEUE_SIZE),
                TIMEOUT_PER_TASK,
                TimeUnit.MINUTES,
                new EnsureIncreaseCountIfRunOnCallingThread());
    }

    public AtomicInteger getPendingItems()
    {
        return pendingItems;
    }

    /**
     * If queue is full this gets called and we log that we run task on local calling thread.
     */
    class EnsureIncreaseCountIfRunOnCallingThread implements RejectedExecutionHandler
    {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public EnsureIncreaseCountIfRunOnCallingThread() { }

        /**
         * Executes task on calling thread, ensuring we increment count
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown())
            {
                try
                {
                    MainWindow.userInfoLogger.severe(">>SubmittedLocally:" + ((FutureCallable) r).getCallable().getClass().getName() + ":" + pendingItems.get());
                    r.run();
                    MainWindow.userInfoLogger.severe(">>CompletedLocally:" + ((FutureCallable) r).getCallable().getClass().getName() + ":" +  pendingItems.get());
                }
                catch(Exception ex)
                {
                    MainWindow.userInfoLogger.log(Level.SEVERE, ex.getMessage(), ex);
                }
            }
        }
    }

    /**
     * Increase count and then Submit to ExecutorService
     *
     * @param callingTask
     * @param task
     */
    public void submit(Callable<Boolean> callingTask, Callable<Boolean> task) //throws Exception
    {
        //Ensure we increment before calling submit in case rejectionExecution comes into play
        int remainingItems = pendingItems.incrementAndGet();
        executorService.submit(task);
        MainWindow.userInfoLogger.severe(">>Submitted:" + task.getClass().getName() + ":" + remainingItems);
    }

    public ExecutorService getExecutorService()
    {
        return executorService;
    }

    /**
     * Must be called by Callable when it has finished work (or if error)
     *
     * @param task
     */
    public void workDone(Callable task)
    {
        int remainingItems = pendingItems.decrementAndGet();
        MainWindow.userInfoLogger.severe(">>WorkDone:" + task.getClass().getName() + ":" +remainingItems);
        if (remainingItems == 0)
        {
            MainWindow.userInfoLogger.severe(">Closing Latch:");
            latch.countDown();
        }
    }

    /**
     * Wait for latch to close, this should occur once all submitted aysync tasks have finished in some way
     *
     * @throws InterruptedException
     */
    public void awaitCompletion() throws InterruptedException{
        latch.await();
    }
}

主叫班有

//Just waits for all the async tasks on the list to complete/fail
analyserService.awaitCompletion();
MainWindow.userInfoLogger.severe(">MainAnalyser Completed");

对于一个客户,即使仍然有尚未完成的任务被调用terminated()方法,并且executorservice仅运行了8分钟,并且没有任务超时.我也在本地看到了问题

For one customer the terminated() method was getting called even though there are still task that have not completed, and the executorservice has only been running for 8 minutes, and no tasks have timed out. I have also seen the problem locally

调试显示

UserLog

05/07/2019 11.29.38:EDT:SEVERE: ----G14922:The Civil War:8907617:American Songs of Revolutionary Times and the Civil War Era:NoScore
05/07/2019 11.29.38:EDT:SEVERE: >>Submitted:com.jthink.songkong.analyse.analyser.SongSaver:69
05/07/2019 11.29.38:EDT:SEVERE: >>WorkDone:com.jthink.songkong.analyse.analyser.DiscogsSongGroupMatcher:68
05/07/2019 11.29.38:EDT:SEVERE: >MainAnalyser Finished
05/07/2019 11.29.38:EDT:INFO: Stop

DebugLog

   05/07/2019 11.29.38:EDT:TimeoutThreadPoolExecutor:terminated:SEVERE: ---Terminated:Worker

所以我们可以看到仍有68个任务需要完成,MainAnalyser尚未关闭闩锁,但线程池执行器已终止

So we can see there are still 68 tasks to complete, and MainAnalyser has not closed the latch, yet threadpool executor has terminated

我重写了shutdown()以查看是否调用了它,

I overridden shutdown() to see if that is called and it is not,

terminate()由runWorker()调用,runWorker()应该继续循环直到队列为空(不是这样),但是在经过更多检查后,似乎会导致它离开循环和processWorkerExit()终止整个执行器(不仅仅是一个工作线程)

terminate() is being called by runWorker(), runWorker() should continue in loop until queue is empty which it is not, but something seems to cause it to leave loop and the processWorkerExit() after doing some more checks eventually terminates the whole Executor (not just a worker thread)

10/07/2019 07.11.51:BST:MainAnalyserService:submit:SEVERE: >>Submitted:com.jthink.songkong.analyse.analyser.DiscogsSongGroupMatcher:809
10/07/2019 07.11.51:BST:MainAnalyserService:workDone:SEVERE: >>WorkDone:com.jthink.songkong.analyse.analyser.MusicBrainzSongGroupMatcher2:808
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: ---Terminated:Worker
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.lang.Thread.getStackTrace(Unknown Source)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: com.jthink.songkong.analyse.analyser.TimeoutThreadPoolExecutor.terminated(TimeoutThreadPoolExecutor.java:118)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.util.concurrent.ThreadPoolExecutor.tryTerminate(Unknown Source)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.util.concurrent.ThreadPoolExecutor.processWorkerExit(Unknown Source)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.lang.Thread.run(Unknown Source)

因为ThreadPoolExecutor是标准Java的一部分,所以我无法(轻松地)设置断点以尝试找出其作用,所以这是ThreadPoolExecutor代码(标准的Jave不是我的代码)

Because ThreadPoolExecutor is part of Standard Java I cannot (easily) set breakpoints to try and find out what it is doing, this is ThreadPoolExecutor code (standard Jave not my code)

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

我们在Executor中尝试了队列大小,默认情况下为100,因为我不希望它过大,因为队列任务将使用更多的内存,而我宁愿在队列繁忙时调用任务也可以自行运行.但是尝试解决该问题(并消除了由于队列已满而不再需要调用CallerRunPolicy的问题),我将队列大小增加到1000,这导致错误发生得更快,然后完全消除了限制并继续更加迅速地失败了

We experimented with the queue size in the Executor, by default it was 100 because I did not want it to get too large as the queue tasks will use more memory and I would rather the calling tasks just runs itself if queue is busy. But in an attempt solve the issue (and remove need for CallerRunPolicy to be called because queue full) I increased queue size to 1000 and this caused the error to occur more quickly and then removed the limit completely and continue to fail more rapidly

 new LinkedBlockingQueue<Runnable>(BOUNDED_QUEUE_SIZE),

我正在寻找ThreadExecutorPool的替代方案,并遇到了ForkJoinPool- https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ForkJoinPool.html

I was looking at an alternative to ThreadExecutorPool and came across ForkJoinPool - https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ForkJoinPool.html

我注意到的一件事情是,与在外部提交表单相比,ForkJoinPool在提交给ForkJoinPool的任务中提交任务的方法不同.我不知道为什么,但是想知道是否因为我正在执行器执行的任务中提交任务,这是否可能以某种方式引起问题?

One thing I noticed is that ForkJoinPool has different methods for submitting tasks from within a task submitted to ForkJoinPool compared to submitting form outside. I dont why this is, but wondering if because I am submitting tasks from within tasks being run by Executor whther this could cause issue in some way ?

我现在已经成功创建了自己的ThreadPoolExecutor版本,只需将代码复制/粘贴到新的Class中,重命名,并且还必须创建一个RejectedExcecutionhandler版本,该版本期望我的类而不是ThreadPoolExecutor才能运行.

I have now managed to create own version of ThreadPoolExecutor by simply copying/pasting code into new Class, renaming, and also having to create a version of RejectedExcecutionhandler that expects my class rather than ThreadPoolExecutor and got this running.

开始添加一些调试功能,看看我是否可以理解正在发生的事情,有什么想法吗?

Started to add some debugging to see if I can decipher what is going on, any ideas ?

对我添加的processWorkerExit的调用之后

 MainWindow.userInfoLogger.severe("-----------------------"+getTaskCount()
                    +":"+getActiveCount()
                    +":"+w.completedTasks
                    +":"+ completedAbruptly);

失败了

-----------------------3686:0:593:false

推荐答案

很长时间以来,我认为问题一定出在我的代码上,然后我开始认为问题出在ThreadPoolExecutor,但是在我自己的版本中添加了调试功能runWorker()的问题确实是我自己的代码.

For a long time I thought the problem must be with my code, I then started thinking the issue was with ThreadPoolExecutor, but adding debugging to my own version of runWorker() showed the problem was indeed my own code.

 final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                MainWindow.userInfoLogger.severe("-----------------------"+workQueue.size());

由此我可以看出,虽然工作队列通常更长,并且与

From this I could see that whilst the worker queue was getting generally longer and matched the value of

MainThreadAnalyzer.pendingItems -noOfWorkerThreads

在某个特定点,两个值发生了分歧,这是SongLoader进程(错误地我没有真正考虑过)完成的时间.因此MainThreadAnalyzer仍在继续提交工作,从而增加了未决项目的值,但是Executor的工作队列大小越来越小.

at a particular point the two values diverged, and this was when the SongLoader process (which mistakenly I had not really considered) finished. So MainThreadAnalyzer was continuing to submit work increasing the value of pendingItems , but the work queue size of the Executor was getting smaller.

这导致人们意识到执行器早些时候具有shutdown(),但是我们没有意识到这一点,因为只有在Songloader关闭后才检查闩锁.

This lead to realization that the Executor had shutdown() alot earlier, but we hadn't realized this because only check latch after songloader had closed.

它之所以关闭,是因为MainAnalyzerThread的早期完成工作要快于SongLoader的提交,因此,pendingItems的值被临时设置为零,从而允许闩锁被关闭.

And the reason it had shutdown was because early on the MainAnalyzerThread was completing the work more quickly then SongLoader was submitting it so the value of pendingItems was temporarily set to zero allowing the latch to be closed.

解决方法如下

添加一个布尔值标志以指示songLoader何时完成,并且仅在设置了该标志后才允许关闭闩锁.

Add a boolean flag to indicate when songLoader has completed and only allow latch to be closed once this flag is set.

private boolean songLoaderCompleted = false;
public void workDone(Callable task)
    {
        int remainingItems = pendingItems.decrementAndGet();
        MainWindow.logger.severe(">>WorkDone:" + task.getClass().getName() + ":" +remainingItems);

        if (remainingItems == 0 && songLoaderCompleted)
        {
            MainWindow.logger.severe(">Closing Latch:");
            latch.countDown();
        }
    }

然后在SongLoader完成后在主线程中设置此标志

Then in main thread set this flag once SongLoader has completed

 //Start SongLoader
ExecutorService songLoaderService = SongLoader.getExecutorService();
songLoaderService.submit(loader);

//SongLoader uses CompletionService when calls LoadFolderWorkers so shutdown wont return until all folder
//submissions completed to the MainAnalyserService
songLoaderService.shutdown();
songLoaderService.awaitTermination(10, TimeUnit.DAYS);
MainWindow.userInfoLogger.severe(">Song Loader Finished");

//Were now allowed to consider closing the latch because we know all songs have now been loaded
//so no false chance of zeroes
analyserService.setSongLoaderCompleted();

//Just waits for all the async tasks on the list to complete/fail
analyserService.awaitCompletion();
MainWindow.userInfoLogger.severe(">MainAnalyser Completed");

//This should be immediate as there should be no tasks still remaining
analyserService.getExecutorService().shutdown();
analyserService.getExecutorService().awaitTermination(10, TimeUnit.DAYS);

这篇关于什么终止了我的Java ExecutorService的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆