如果取消时正在运行runnable,如何取消ShceduledFuture并等待runnable停止? [英] How to cancel ShceduledFuture and wait for runnable to stop, if runnable is in progress at the moment of cancellation?

查看:202
本文介绍了如果取消时正在运行runnable,如何取消ShceduledFuture并等待runnable停止?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当在任何ScheduledExecutorService上以固定速率调度任何命令时,它返回ScheduledFuture,也可以将其取消. 但是取消"不能保证取消返回后命令仍未执行,例如,因为调用取消"时命令已经在执行中间.

When any command scheduled with fixed rate at any ScheduledExecutorService, it returns ScheduledFuture which can be cancelled as well. But "cancel" does not provide guarantee that command is not still executing after cancel returns, for example because command was already in the middle of execution when "cancell" was called.

对于大多数用例来说,它已经足够了.但是我需要在取消后阻塞当前线程的情况下处理用例,如果命令已经在执行中,请等到命令完成.换句话说,如果命令仍在执行,则称为取消"的线程不应前进.使用mayInterruptIfRunning = true取消也是不合适的,因为我不想破坏当前执行,我只需要等待正常完成即可.

For mostly use cases it is enough functionality. But I have deal with usecase when need to block current thread after cancel, if command already is in progress, and wait until command done. In other words thread which called cancel should not go forward if command still executing. Cancelling with mayInterruptIfRunning=true also is not suitable, because I do not want to broke current executions, I just need to wait for normal complete.

我没有找到如何通过标准JDK类达到此要求的方法. 问题1 :我是错的并且存在这种功能吗?

I did not found how to achieve this requirements via standard JDK classes. Question1: Was I wrong and this kind of functionality exists?

因此,我决定自己实施: 导入java.util.concurrent.*;

So I decided to implement it by itself: import java.util.concurrent.*;

public class GracefullyStoppingScheduledFutureDecorator implements ScheduledFuture {

/**
 * @return the scheduled future with method special implementation of "cancel" method, 
 * which in additional to standard implementation, 
 * provides strongly guarantee that command is not in the middle of progress when "cancel" returns  
 */
public static ScheduledFuture schedule(Runnable command, long initialDelay, long period, TimeUnit unit, ScheduledExecutorService scheduler) {
    CancellableCommand cancellableCommand = new CancellableCommand(command);
    ScheduledFuture future = scheduler.scheduleAtFixedRate(cancellableCommand, initialDelay, period, unit);
    return new GracefullyStoppingScheduledFutureDecorator(future, cancellableCommand);
}

private GracefullyStoppingScheduledFutureDecorator(ScheduledFuture targetFuture, CancellableCommand command) {
    this.targetFuture = targetFuture;
    this.runnable = command;
}

private final ScheduledFuture targetFuture;
private final CancellableCommand runnable;

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
    runnable.cancel();
    return targetFuture.cancel(mayInterruptIfRunning);
}

@Override
public long getDelay(TimeUnit unit) {
    return targetFuture.getDelay(unit);
}

@Override
public int compareTo(Delayed o) {
    return targetFuture.compareTo(o);
}

@Override
public boolean isCancelled() {
    return targetFuture.isCancelled();
}

@Override
public boolean isDone() {
    return targetFuture.isDone();
}

@Override
public Object get() throws InterruptedException, ExecutionException {
    return targetFuture.get();
}

@Override
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    return targetFuture.get(timeout, unit);
}

private static class CancellableCommand implements Runnable {

    private final Object monitor = new Object();
    private final Runnable target;
    private boolean cancelled = false;

    private CancellableCommand(Runnable target) {
        this.target = target;
    }

        public void cancel() {
            synchronized (monitor) {
                cancelled = true;
            }
        }

        @Override
        public void run() {
            synchronized (monitor) {
                if (!cancelled) {
                    target.run();
                }
            }
        }

    }

}

问题2 :有人可以在上面的代码中发现错误吗?

Question2: Could anybody find errors in the code above?

推荐答案

问题2:有人可以在上面的代码中发现错误吗?

Question2: Could anybody find errors in the code above?

有一个假想的死锁,可以通过以下情形重现:

There is hypothetical deadlock which can be reproduced by following scenario:

  1. 具有容纳监视器M1的线程T1
  2. 计划任务正在线程T2上执行(将其监视器M2保留)并想进入M1,因此T2需要等待,直到T1退出监视器M1.
  3. T1决定取消任务,但是由于其监视器M2被任务本身锁定,因此我们陷入了僵局.

最有可能出现的abovr场景是不真实的,但是为了保护所有可能的情况,我决定以无锁的方式重写代码:

Most likely scenario abovr is unreal, but to protect from all possible cases, I decided to rewrite code in lock-free manner:

public class GracefullyStoppingScheduledFuture {

/**
 * @return the scheduled future with method special implementation of "cancel" method,
 * which in additional to standard implementation,
 * provides strongly guarantee that command is not in the middle of progress when "cancel" returns
 */
public static GracefullyStoppingScheduledFuture cheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit, ScheduledExecutorService scheduler) {
    CancellableCommand cancellableCommand = new CancellableCommand(command);
    ScheduledFuture future = scheduler.scheduleAtFixedRate(cancellableCommand, initialDelay, period, unit);
    return new GracefullyStoppingScheduledFuture(future, cancellableCommand);
}

private GracefullyStoppingScheduledFuture(ScheduledFuture targetFuture, CancellableCommand command) {
    this.targetFuture = targetFuture;
    this.runnable = command;
}

private final ScheduledFuture targetFuture;
private final CancellableCommand runnable;

public void cancelAndBeSureOfTermination(boolean mayInterruptIfRunning) throws InterruptedException, ExecutionException {
    try {
        targetFuture.cancel(mayInterruptIfRunning);
    } finally {
        runnable.cancel();
    }
}

private static class CancellableCommand implements Runnable {

    private static final int NOT_EXECUTING = 0;
    private static final int IN_PROGRESS = 1;
    private static final int CANCELLED_WITHOUT_OBSTRUCTION = 2;
    private static final int CANCELLED_IN_MIDDLE_OF_PROGRESS = 3;

    private final AtomicInteger state = new AtomicInteger(NOT_EXECUTING);
    private final AtomicReference<Thread> executionThread = new AtomicReference<>();
    private final CompletableFuture<Void> cancellationFuture = new CompletableFuture<>();
    private final Runnable target;

    private CancellableCommand(Runnable target) {
        this.target = target;
    }

    public void cancel() throws ExecutionException, InterruptedException {
        if (executionThread.get() == Thread.currentThread()) {
            // cancel method was called from target by itself
            state.set(CANCELLED_IN_MIDDLE_OF_PROGRESS);
            return;
        }
        while (true) {
            if (state.get() == CANCELLED_WITHOUT_OBSTRUCTION) {
                return;
            }
            if (state.get() == CANCELLED_IN_MIDDLE_OF_PROGRESS) {
                cancellationFuture.get();
                return;
            }
            if (state.compareAndSet(NOT_EXECUTING, CANCELLED_WITHOUT_OBSTRUCTION)) {
                return;
            }
            if (state.compareAndSet(IN_PROGRESS, CANCELLED_IN_MIDDLE_OF_PROGRESS)) {
                cancellationFuture.get();
                return;
            }
        }
    }

    @Override
    public void run() {
        if (!state.compareAndSet(NOT_EXECUTING, IN_PROGRESS)) {
            notifyWaiters();
            return;
        }

        try {
            executionThread.set(Thread.currentThread());
            target.run();
        } finally {
            executionThread.set(null);
            if (!state.compareAndSet(IN_PROGRESS, NOT_EXECUTING)) {
                notifyWaiters();
            }
        }
    }

    private void notifyWaiters() {
        if (state.get() == CANCELLED_WITHOUT_OBSTRUCTION) {
            // no need to notify anything
            return;
        }
        // someone waits for cancelling
        cancellationFuture.complete(null);
        return;
    }

}

这篇关于如果取消时正在运行runnable,如何取消ShceduledFuture并等待runnable停止?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆