在后台线程中运行的可编辑任务队列 [英] Editable queue of tasks running in background thread
问题描述
我知道这个问题已经回答了很多遍,但是我一直在努力了解它的工作原理.
因此,在我的应用程序中,用户必须能够选择要添加到队列中的项目(使用ObservableList<Task>
显示在ListView
中),并且每个项目都需要由ExecutorService
顺序处理.
该队列也应该是可编辑的(更改顺序并从列表中删除项目).
private void handleItemClicked(MouseEvent event) {
if (event.getClickCount() == 2) {
File item = listView.getSelectionModel().getSelectedItem();
Task<Void> task = createTask(item);
facade.getTaskQueueList().add(task); // this list is bound to a ListView, where it can be edited
Future result = executor.submit(task);
// where executor is an ExecutorService of which type?
try {
result.get();
} catch (Exception e) {
// ...
}
}
}
与executor = Executors.newFixedThreadPool(1)
进行了尝试,但我无法控制队列.
我读到有关ThreadPoolExecutor
和队列的信息,但是由于对Concurrency还是很陌生,所以我很难理解它.
我需要在后台线程中运行该方法handleItemClicked
,以使UI不会冻结,我该怎么做才是最好的方法?
总结:如何实现一个任务队列,该任务队列可以由后台线程编辑并按顺序处理?
请帮助我弄清楚
编辑
使用vanOekel的SerialTaskQueue
类对我有帮助,现在我想将任务列表绑定到我的ListView
.
ListProperty<Runnable> listProperty = new SimpleListProperty<>();
listProperty.set(taskQueue.getTaskList()); // getTaskList() returns the LinkedList from SerialTaskQueue
queueListView.itemsProperty().bind(listProperty);
显然,这是行不通的,因为它需要一个ObservableList.有一种优雅的方法吗?
我能想到的最简单的解决方案是将任务列表维护在执行程序之外,并使用回调向执行程序提供下一个任务(如果有) .不幸的是,它涉及到任务列表上的同步和AtomicBoolean
来指示任务正在执行.
回调只是一个Runnable
,它包装了要运行的原始任务,然后回调"以查看是否还有另一个任务要执行,如果有,请使用(后台)执行程序来执行.
需要同步,以使任务列表保持顺序并处于已知状态.任务列表可以同时由两个线程修改:通过在执行程序(后台)线程中运行的回调,以及通过UI前台线程执行的handleItemClicked
方法.反过来这意味着,例如,当任务列表为空时,永远不会完全知道它.为了使任务列表保持顺序并处于已知的固定状态,需要同步任务列表.
这仍然留下一个模糊的时刻来决定何时准备执行任务.这是AtomicBoolean
进入的地方:总是立即可以使用值集,并且其他任何线程都可以读取该值集,而compareAndSet
方法将始终确保只有一个线程获得确定".
将同步和AtomicBoolean
的使用结合在一起,可以创建一个带有关键部分"的方法,该方法可以由前台线程和后台线程同时调用以触发新任务的执行如果可能的话.下面的代码以一种可以存在这样一种方法(runNextTask
)的方式进行设计和设置.优良作法是使并发代码中的关键部分"尽可能简单明了(这反过来通常会导致有效的关键部分").
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
public class SerialTaskQueue {
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
// all operations on this list must be synchronized on the list itself.
SerialTaskQueue tq = new SerialTaskQueue(executor);
try {
// test running the tasks one by one
tq.add(new SleepSome(10L));
Thread.sleep(5L);
tq.add(new SleepSome(20L));
tq.add(new SleepSome(30L));
Thread.sleep(100L);
System.out.println("Queue size: " + tq.size()); // should be empty
tq.add(new SleepSome(10L));
Thread.sleep(100L);
} catch (Exception e) {
e.printStackTrace();
} finally {
executor.shutdownNow();
}
}
// all lookups and modifications to the list must be synchronized on the list.
private final List<Runnable> tasks = new LinkedList<Runnable>();
// atomic boolean used to ensure only 1 task is executed at any given time
private final AtomicBoolean executeNextTask = new AtomicBoolean(true);
private final Executor executor;
public SerialTaskQueue(Executor executor) {
this.executor = executor;
}
public void add(Runnable task) {
synchronized(tasks) { tasks.add(task); }
runNextTask();
}
private void runNextTask() {
// critical section that ensures one task is executed.
synchronized(tasks) {
if (!tasks.isEmpty()
&& executeNextTask.compareAndSet(true, false)) {
executor.execute(wrapTask(tasks.remove(0)));
}
}
}
private CallbackTask wrapTask(Runnable task) {
return new CallbackTask(task, new Runnable() {
@Override public void run() {
if (!executeNextTask.compareAndSet(false, true)) {
System.out.println("ERROR: programming error, the callback should always run in execute state.");
}
runNextTask();
}
});
}
public int size() {
synchronized(tasks) { return tasks.size(); }
}
public Runnable get(int index) {
synchronized(tasks) { return tasks.get(index); }
}
public Runnable remove(int index) {
synchronized(tasks) { return tasks.remove(index); }
}
// general callback-task, see https://stackoverflow.com/a/826283/3080094
static class CallbackTask implements Runnable {
private final Runnable task, callback;
public CallbackTask(Runnable task, Runnable callback) {
this.task = task;
this.callback = callback;
}
@Override public void run() {
try {
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
callback.run();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
// task that just sleeps for a while
static class SleepSome implements Runnable {
static long startTime = System.currentTimeMillis();
private final long sleepTimeMs;
public SleepSome(long sleepTimeMs) {
this.sleepTimeMs = sleepTimeMs;
}
@Override public void run() {
try {
System.out.println(tdelta() + "Sleeping for " + sleepTimeMs + " ms.");
Thread.sleep(sleepTimeMs);
System.out.println(tdelta() + "Slept for " + sleepTimeMs + " ms.");
} catch (Exception e) {
e.printStackTrace();
}
}
private String tdelta() { return String.format("% 4d ", (System.currentTimeMillis() - startTime)); }
}
}
更新:如果需要按顺序执行多组任务,请此处. >
I know this question was answered many times, but I'm struggling to understand how it works.
So in my application the user must be able to select items which will be added to a queue (displayed in a ListView
using an ObservableList<Task>
) and each item needs to be processed sequentially by an ExecutorService
.
Also that queue should be editable (change the order and remove items from the list).
private void handleItemClicked(MouseEvent event) {
if (event.getClickCount() == 2) {
File item = listView.getSelectionModel().getSelectedItem();
Task<Void> task = createTask(item);
facade.getTaskQueueList().add(task); // this list is bound to a ListView, where it can be edited
Future result = executor.submit(task);
// where executor is an ExecutorService of which type?
try {
result.get();
} catch (Exception e) {
// ...
}
}
}
Tried it with executor = Executors.newFixedThreadPool(1)
but I don't have control over the queue.
I read about ThreadPoolExecutor
and queues, but I'm struggling to understand it as I'm quite new to Concurrency.
I need to run that method handleItemClicked
in a background thread, so that the UI does not freeze, how can I do that the best way?
Summed up: How can I implement a queue of tasks, which is editable and sequentially processed by a background thread?
Please help me figure it out
EDIT
Using the SerialTaskQueue
class from vanOekel helped me, now I want to bind the List of tasks to my ListView
.
ListProperty<Runnable> listProperty = new SimpleListProperty<>();
listProperty.set(taskQueue.getTaskList()); // getTaskList() returns the LinkedList from SerialTaskQueue
queueListView.itemsProperty().bind(listProperty);
Obviously this doesn't work as it's expecting an ObservableList. There is an elegant way to do it?
The simplest solution I can think of is to maintain the task-list outside of the executor and use a callback to feed the executor the next task if it is available. Unfortunately, it involves synchronization on the task-list and an AtomicBoolean
to indicate a task executing.
The callback is simply a Runnable
that wraps the original task to run and then "calls back" to see if there is another task to execute, and if so, executes it using the (background) executor.
The synchronization is needed to keep the task-list in order and at a known state. The task-list can be modified by two threads at the same time: via the callback running in the executor's (background) thread and via handleItemClicked
method executed via the UI foreground thread. This in turn means that it is never exactly known when the task-list is empty for example. To keep the task-list in order and at a known fixed state, synchronization of the task-list is needed.
This still leaves an ambiguous moment to decide when a task is ready for execution. This is where the AtomicBoolean
comes in: a value set is always immediatly availabe and read by any other thread and the compareAndSet
method will always ensure only one thread gets an "OK".
Combining the synchronization and the use of the AtomicBoolean
allows the creation of one method with a "critical section" that can be called by both foreground- and background-threads at the same time to trigger the execution of a new task if possible. The code below is designed and setup in such a way that one such method (runNextTask
) can exist. It is good practice to make the "critical section" in concurrent code as simple and explicit as possible (which, in turn, generally leads to an efficient "critical section").
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
public class SerialTaskQueue {
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
// all operations on this list must be synchronized on the list itself.
SerialTaskQueue tq = new SerialTaskQueue(executor);
try {
// test running the tasks one by one
tq.add(new SleepSome(10L));
Thread.sleep(5L);
tq.add(new SleepSome(20L));
tq.add(new SleepSome(30L));
Thread.sleep(100L);
System.out.println("Queue size: " + tq.size()); // should be empty
tq.add(new SleepSome(10L));
Thread.sleep(100L);
} catch (Exception e) {
e.printStackTrace();
} finally {
executor.shutdownNow();
}
}
// all lookups and modifications to the list must be synchronized on the list.
private final List<Runnable> tasks = new LinkedList<Runnable>();
// atomic boolean used to ensure only 1 task is executed at any given time
private final AtomicBoolean executeNextTask = new AtomicBoolean(true);
private final Executor executor;
public SerialTaskQueue(Executor executor) {
this.executor = executor;
}
public void add(Runnable task) {
synchronized(tasks) { tasks.add(task); }
runNextTask();
}
private void runNextTask() {
// critical section that ensures one task is executed.
synchronized(tasks) {
if (!tasks.isEmpty()
&& executeNextTask.compareAndSet(true, false)) {
executor.execute(wrapTask(tasks.remove(0)));
}
}
}
private CallbackTask wrapTask(Runnable task) {
return new CallbackTask(task, new Runnable() {
@Override public void run() {
if (!executeNextTask.compareAndSet(false, true)) {
System.out.println("ERROR: programming error, the callback should always run in execute state.");
}
runNextTask();
}
});
}
public int size() {
synchronized(tasks) { return tasks.size(); }
}
public Runnable get(int index) {
synchronized(tasks) { return tasks.get(index); }
}
public Runnable remove(int index) {
synchronized(tasks) { return tasks.remove(index); }
}
// general callback-task, see https://stackoverflow.com/a/826283/3080094
static class CallbackTask implements Runnable {
private final Runnable task, callback;
public CallbackTask(Runnable task, Runnable callback) {
this.task = task;
this.callback = callback;
}
@Override public void run() {
try {
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
callback.run();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
// task that just sleeps for a while
static class SleepSome implements Runnable {
static long startTime = System.currentTimeMillis();
private final long sleepTimeMs;
public SleepSome(long sleepTimeMs) {
this.sleepTimeMs = sleepTimeMs;
}
@Override public void run() {
try {
System.out.println(tdelta() + "Sleeping for " + sleepTimeMs + " ms.");
Thread.sleep(sleepTimeMs);
System.out.println(tdelta() + "Slept for " + sleepTimeMs + " ms.");
} catch (Exception e) {
e.printStackTrace();
}
}
private String tdelta() { return String.format("% 4d ", (System.currentTimeMillis() - startTime)); }
}
}
Update: if groups of tasks need to be executed serial, have a look at the adapted implementation here.
这篇关于在后台线程中运行的可编辑任务队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!