如何终止多线程中超时的任务? [英] How can I terminate Tasks that have timed out in multithreading?

查看:887
本文介绍了如何终止多线程中超时的任务?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要创建一个库,其中包含同步和异步方法。

I need to make a library in which I will have synchronous and asynchronous methods in it.


  • executeSynchronous () - 等到我有结果,返回结果。

  • executeAsynchronous() - 返回如果需要的话,可以在完成其他事情后立即处理的未来。

  • executeSynchronous() - waits until I have a result, returns the result.
  • executeAsynchronous() - returns a Future immediately which can be processed after other things are done, if needed.

我的图书馆的核心逻辑

客户将使用我们的库,他们将通过传递 DataKey 构建器对象来调用它。然后,我们将使用该 DataKey 对象构造一个URL,并通过执行它来对该URL进行HTTP客户端调用,并在我们将响应作为JSON字符串返回后,我们将通过创建 DataResponse 对象将JSON字符串发送回我们的客户。有些客户会调用 executeSynchronous(),有些可能会调用 executeAsynchronous(),这就是为什么我需要单独提供两种方法在我的图书馆。

The customer will use our library and they will call it by passing DataKey builder object. We will then construct a URL by using that DataKey object and make a HTTP client call to that URL by executing it and after we get the response back as a JSON String, we will send that JSON String back to our customer as it is by creating DataResponse object. Some customer will call executeSynchronous() and some might call executeAsynchronous() so that's why I need to provide two method separately in my library.

接口:

public interface Client {

    // for synchronous
    public DataResponse executeSynchronous(DataKey key);

    // for asynchronous
    public Future<DataResponse> executeAsynchronous(DataKey key);
}

然后我的 DataClient 实现上述客户端界面:

And then I have my DataClient which implements the above Client interface:

public class DataClient implements Client {

    private RestTemplate restTemplate = new RestTemplate();
    private ExecutorService executor = Executors.newFixedThreadPool(10);

    // for synchronous call
    @Override
    public DataResponse executeSynchronous(DataKey key) {
        DataResponse dataResponse = null;
        Future<DataResponse> future = null;

        try {
            future = executeAsynchronous(key);
            dataResponse = future.get(key.getTimeout(), TimeUnit.MILLISECONDS);
        } catch (TimeoutException ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.TIMEOUT_ON_CLIENT, key);
            dataResponse = new DataResponse(null, DataErrorEnum.TIMEOUT_ON_CLIENT, DataStatusEnum.ERROR);
            // does this look right the way I am doing it?
            future.cancel(true); // terminating tasks that have timed out.
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
            dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
        }

        return dataResponse;
    }

    //for asynchronous call
    @Override
    public Future<DataResponse> executeAsynchronous(DataKey key) {
        Future<DataResponse> future = null;

        try {
            Task task = new Task(key, restTemplate);
            future = executor.submit(task); 
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
        }

        return future;
    }
}

将执行实际任务的简单类:

Simple class which will perform the actual task:

public class Task implements Callable<DataResponse> {

    private DataKey key;
    private RestTemplate restTemplate;

    public Task(DataKey key, RestTemplate restTemplate) {
        this.key = key;
        this.restTemplate = restTemplate;
    }

    @Override
    public DataResponse call() {
        DataResponse dataResponse = null;
        String response = null;

        try {
            String url = createURL();
            response = restTemplate.getForObject(url, String.class);

            // it is a successful response
            dataResponse = new DataResponse(response, DataErrorEnum.NONE, DataStatusEnum.SUCCESS);
        } catch (RestClientException ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.SERVER_DOWN, key);
            dataResponse = new DataResponse(null, DataErrorEnum.SERVER_DOWN, DataStatusEnum.ERROR);
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
            dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
        }

        return dataResponse;
    }

    // create a URL by using key object
    private String createURL() {
        String url = somecode;
        return url;
    }
}

问题陈述: -

当我开始研究这个解决方案时,我没有终止已经超时的任务。我向客户端报告超时,但任务继续在线程池中运行(可能长时间占用我有限的10个线程之一)。所以我在网上进行了一些研究,发现我可以在 future 上使用取消取消已经超时的任务如下所示 -

When I started working on this solution, I was not terminating the tasks that have timed out. I was reporting the timeout to the client, but the task continues to run in the thread pool (potentially occupying one of my limited 10 threads for a long time). So I did some research online and I found that I can cancel my tasks those have timed out by using cancel on future as shown below -

future.cancel(true);

但我想确定,我在<$ c中的表现是否正确$ c> executeSynchronous 取消已经超时的任务的方法?

But I wanted to make sure, does it look right the way I am doing in my executeSynchronous method to cancel the tasks that have got timedout?

因为我在 Future cancel() >如果任务仍然在队列中,那将阻止它运行所以我不确定我在做什么是对的?这样做的正确方法是什么?

Since I am calling cancel() on theFuture which will stop it from running if tasks is still in the queue so I am not sure what I am doing is right or not? What is the right approach to do this?

如果有更好的方法,那么任何人都能提供一个例子吗?

If there is any better way, then can anyone provide an example for that?

推荐答案

如果任务仍然在队列中,那么只需调用 future.cancel()就可以取消它,但很明显你不知道那是否在队列中。即使你要求 future 来中断任务,它也可能无法工作,因为你的任务仍然可以做一些忽略线程中断状态的事情。

If task is still in the queue then cancelling it by simply calling future.cancel() is ok but obviously you don't know if that is in the queue. Also even if you ask future to interrupt the task it may not work as your task can still be doing something which is ignoring the thread interrupted status.

所以你可以使用 future.cancel(true)但你需要确保你的任务(线程)确实认为线程中断状态。例如,如您所述,您进行了http调用,因此您可能需要在线程中断时立即关闭http客户端资源。

So you can use the future.cancel(true) but you need to make sure that your task (thread) does regard the thread interrupted status. For example as you mentioned you make http call, so you might need to close the http client resource as soon as thread is interrupted.

请参阅下面的示例。

我试图实现任务取消方案。通常,线程可以检查 isInterrupted()并尝试终止自身。但是当你使用线程池执行器时,这变得更加复杂,可调用,如果任务不像那么(!Thread.isInterrupted()){//执行任务}

I have tried to implement the task cancellation scenario. Normally a thread can check isInterrupted() and try to terminate itself. But this becomes more complex when you are using thread pool executors, callable and if the task is not really like while(!Thread.isInterrupted()) {// execute task}.

在这个例子中,一个任务是写一个文件(我没有使用http调用来保持简单)。线程池执行程序开始运行任务,但调用者想要在100毫秒后取消它。现在将来将中断信号发送给线程,但是可调用任务在写入文件时不能立即检查它。因此,要使这种情况发生可调用,请维护一个将要使用的IO资源列表,并且一旦未来想要取消该任务,它就会在所有IO资源上调用 cancel()使用IOException终止任务,然后完成线程。

In this example, a task is writing a file (I did not use http call to keep the it simple). A thread pool executor starts running the task but the caller wants to cancel it just after 100 milli seconds. Now future sends the interrupt signal to the thread but the callable task can not check it immediately while writing to file. So to make this happen callable maintains a list of IO resources it is going to use and as soon as future wants to cancel the task it just calls cancel() on all IO resources which terminates the task with IOException and then thread finishes.

public class CancellableTaskTest {

    public static void main(String[] args) throws Exception {
        CancellableThreadPoolExecutor threadPoolExecutor = new CancellableThreadPoolExecutor(0, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
        long startTime = System.currentTimeMillis();
        Future<String> future = threadPoolExecutor.submit(new CancellableTask());
        while (System.currentTimeMillis() - startTime < 100) {
            Thread.sleep(10);
        }
        System.out.println("Trying to cancel task");
        future.cancel(true);
    }
}

class CancellableThreadPoolExecutor extends ThreadPoolExecutor {

    public CancellableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new CancellableFutureTask<T>(callable);
    }
}

class CancellableFutureTask<V> extends FutureTask<V> {

    private WeakReference<CancellableTask> weakReference;

    public CancellableFutureTask(Callable<V> callable) {
        super(callable);
        if (callable instanceof CancellableTask) {
            this.weakReference = new WeakReference<CancellableTask>((CancellableTask) callable);
        }
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        boolean result = super.cancel(mayInterruptIfRunning);
        if (weakReference != null) {
            CancellableTask task = weakReference.get();
            if (task != null) {
                try {
                    task.cancel();
                } catch (Exception e) {
                    e.printStackTrace();
                    result = false;
                }
            }
        }
        return result;
    }
}

class CancellableTask implements Callable<String> {

    private volatile boolean cancelled;
    private final Object lock = new Object();
    private LinkedList<Object> cancellableResources = new LinkedList<Object>();

    @Override
    public String call() throws Exception {
        if (!cancelled) {
            System.out.println("Task started");
            // write file
            File file = File.createTempFile("testfile", ".txt");
            BufferedWriter writer = new BufferedWriter(new FileWriter(file));
            synchronized (lock) {
                cancellableResources.add(writer);
            }
            try {
                long lineCount = 0;
                while (lineCount++ < 100000000) {
                    writer.write("This is a test text at line: " + lineCount);
                    writer.newLine();
                }
                System.out.println("Task completed");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                writer.close();
                file.delete();
                synchronized (lock) {
                    cancellableResources.clear();
                }
            }
        }
        return "done";
    }

    public void cancel() throws Exception {
        cancelled = true;
        Thread.sleep(1000);
        boolean success = false;
        synchronized (lock) {
            for (Object cancellableResource : cancellableResources) {
                if (cancellableResource instanceof Closeable) {
                    ((Closeable) cancellableResource).close();
                    success = true;
                }
            }
        }
        System.out.println("Task " + (success ? "cancelled" : "could not be cancelled. It might have completed or not started at all"));
    }
}

对于您的REST Http客户端相关要求,您可以修改像这样的工厂类 -

For your REST Http client related requirement you can modify the factory class something like this -

public class CancellableSimpleClientHttpRequestFactory extends SimpleClientHttpRequestFactory {

    private List<Object> cancellableResources;

    public CancellableSimpleClientHttpRequestFactory() {
    }

    public CancellableSimpleClientHttpRequestFactory(List<Object> cancellableResources) {
        this.cancellableResources = cancellableResources;
    }

    protected HttpURLConnection openConnection(URL url, Proxy proxy) throws IOException {
        HttpURLConnection connection = super.openConnection(url, proxy);
        if (cancellableResources != null) {
            cancellableResources.add(connection);
        }
        return connection;
    }
}

这里你需要在创建<$时使用这个工厂你的runnable任务中的c $ c> RestTemplate 。

    RestTemplate template = new RestTemplate(new CancellableSimpleClientHttpRequestFactory(this.cancellableResources));

确保您传递了在 CancellableTask 。

现在你需要在<修改 cancel()方法code> CancellableTask 喜欢这个 -

Now you need to modify the cancel() method in CancellableTask like this -

synchronized (lock) {
    for (Object cancellableResource : cancellableResources) {
        if (cancellableResource instanceof HttpURLConnection) {
            ((HttpURLConnection) cancellableResource).disconnect();
            success = true;
        }
    }
}

这篇关于如何终止多线程中超时的任务?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆