在后台线程中运行的可编辑任务队列 [英] Editable queue of tasks running in background thread

查看:60
本文介绍了在后台线程中运行的可编辑任务队列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我知道这个问题已经回答了很多遍,但是我一直在努力了解它的工作原理.

因此,在我的应用程序中,用户必须能够选择要添加到队列中的项目(使用ObservableList<Task>显示在ListView中),并且每个项目都需要由ExecutorService顺序处理.

该队列也应该是可编辑的(更改顺序并从列表中删除项目).

private void handleItemClicked(MouseEvent event) {
    if (event.getClickCount() == 2) {
        File item = listView.getSelectionModel().getSelectedItem();
        Task<Void> task = createTask(item);
        facade.getTaskQueueList().add(task); // this list is bound to a ListView, where it can be edited
        Future result = executor.submit(task); 
        // where executor is an ExecutorService of which type?

        try {
            result.get();
        } catch (Exception e) {
            // ...
        }
    }
}

executor = Executors.newFixedThreadPool(1)进行了尝试,但我无法控制队列.
我读到有关ThreadPoolExecutor和队列的信息,但是由于对Concurrency还是很陌生,所以我很难理解它.

我需要在后台线程中运行该方法handleItemClicked,以使UI不会冻结,我该怎么做才是最好的方法?

总结:如何实现一个任务队列,该任务队列可以由后台线程编辑并按顺序处理?

请帮助我弄清楚

编辑 使用vanOekel的SerialTaskQueue类对我有帮助,现在我想将任务列表绑定到我的ListView.

ListProperty<Runnable> listProperty = new SimpleListProperty<>();
listProperty.set(taskQueue.getTaskList()); // getTaskList() returns the LinkedList from SerialTaskQueue
queueListView.itemsProperty().bind(listProperty); 

显然,这是行不通的,因为它需要一个ObservableList.有一种优雅的方法吗?

解决方案

我能想到的最简单的解决方案是将任务列表维护在执行程序之外,并使用回调向执行程序提供下一个任务(如果有) .不幸的是,它涉及到任务列表上的同步和AtomicBoolean来指示任务正在执行.

回调只是一个Runnable,它包装了要运行的原始任务,然后回调"以查看是否还有另一个任务要执行,如果有,请使用(后台)执行程序来执行.

需要同步,以使任务列表保持顺序并处于已知状态.任务列表可以同时由两个线程修改:通过在执行程序(后台)线程中运行的回调,以及通过UI前台线程执行的handleItemClicked方法.反过来这意味着,例如,当任务列表为空时,永远不会完全知道它.为了使任务列表保持顺序并处于已知的固定状态,需要同步任务列表.

这仍然留下一个模糊的时刻来决定何时准备执行任务.这是AtomicBoolean进入的地方:总是立即可以使用值集,并且其他任何线程都可以读取该值集,而compareAndSet方法将始终确保只有一个线程获得确定".

将同步和AtomicBoolean的使用结合在一起,可以创建一个带有关键部分"的方法,该方法可以由前台线程和后台线程同时调用以触发新任务的执行如果可能的话.下面的代码以一种可以存在这样一种方法(runNextTask)的方式进行设计和设置.优良作法是使并发代码中的关键部分"尽可能简单明了(这反过来通常会导致有效的关键部分").

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

public class SerialTaskQueue {

    public static void main(String[] args) {

        ExecutorService executor = Executors.newSingleThreadExecutor();
        // all operations on this list must be synchronized on the list itself.
        SerialTaskQueue tq = new SerialTaskQueue(executor);
        try {
            // test running the tasks one by one
            tq.add(new SleepSome(10L));
            Thread.sleep(5L);
            tq.add(new SleepSome(20L));
            tq.add(new SleepSome(30L));

            Thread.sleep(100L);
            System.out.println("Queue size: " + tq.size()); // should be empty
            tq.add(new SleepSome(10L));

            Thread.sleep(100L);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            executor.shutdownNow();
        }
    }

    // all lookups and modifications to the list must be synchronized on the list.
    private final List<Runnable> tasks = new LinkedList<Runnable>();
    // atomic boolean used to ensure only 1 task is executed at any given time
    private final AtomicBoolean executeNextTask = new AtomicBoolean(true);
    private final Executor executor;

    public SerialTaskQueue(Executor executor) {
        this.executor = executor;
    }

    public void add(Runnable task) {

        synchronized(tasks) { tasks.add(task); }
        runNextTask();
    }

    private void runNextTask() {
        // critical section that ensures one task is executed.
        synchronized(tasks) {
            if (!tasks.isEmpty()
                    && executeNextTask.compareAndSet(true, false)) {
                executor.execute(wrapTask(tasks.remove(0)));
            }
        }
    }

    private CallbackTask wrapTask(Runnable task) {

        return new CallbackTask(task, new Runnable() {
            @Override public void run() {
                if (!executeNextTask.compareAndSet(false, true)) {
                    System.out.println("ERROR: programming error, the callback should always run in execute state.");
                }
                runNextTask();
            }
        });
    }

    public int size() {
        synchronized(tasks) { return tasks.size(); }
    }

    public Runnable get(int index) {
        synchronized(tasks) { return tasks.get(index); }
    }

    public Runnable remove(int index) {
        synchronized(tasks) { return tasks.remove(index); }
    }

    // general callback-task, see https://stackoverflow.com/a/826283/3080094
    static class CallbackTask implements Runnable {

        private final Runnable task, callback;

        public CallbackTask(Runnable task, Runnable callback) {
            this.task = task;
            this.callback = callback;
        }

        @Override public void run() {
            try {
                task.run();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    callback.run();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    // task that just sleeps for a while
    static class SleepSome implements Runnable {

        static long startTime = System.currentTimeMillis();

        private final long sleepTimeMs;
        public SleepSome(long sleepTimeMs) {
            this.sleepTimeMs = sleepTimeMs;
        }
        @Override public void run() {
            try { 
                System.out.println(tdelta() + "Sleeping for " + sleepTimeMs + " ms.");
                Thread.sleep(sleepTimeMs);
                System.out.println(tdelta() + "Slept for " + sleepTimeMs + " ms.");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        private String tdelta() { return String.format("% 4d ", (System.currentTimeMillis() - startTime)); }
    }
}

更新:如果需要按顺序执行多组任务,请此处. >

I know this question was answered many times, but I'm struggling to understand how it works.

So in my application the user must be able to select items which will be added to a queue (displayed in a ListView using an ObservableList<Task>) and each item needs to be processed sequentially by an ExecutorService.

Also that queue should be editable (change the order and remove items from the list).

private void handleItemClicked(MouseEvent event) {
    if (event.getClickCount() == 2) {
        File item = listView.getSelectionModel().getSelectedItem();
        Task<Void> task = createTask(item);
        facade.getTaskQueueList().add(task); // this list is bound to a ListView, where it can be edited
        Future result = executor.submit(task); 
        // where executor is an ExecutorService of which type?

        try {
            result.get();
        } catch (Exception e) {
            // ...
        }
    }
}

Tried it with executor = Executors.newFixedThreadPool(1) but I don't have control over the queue.
I read about ThreadPoolExecutor and queues, but I'm struggling to understand it as I'm quite new to Concurrency.

I need to run that method handleItemClicked in a background thread, so that the UI does not freeze, how can I do that the best way?

Summed up: How can I implement a queue of tasks, which is editable and sequentially processed by a background thread?

Please help me figure it out

EDIT Using the SerialTaskQueue class from vanOekel helped me, now I want to bind the List of tasks to my ListView.

ListProperty<Runnable> listProperty = new SimpleListProperty<>();
listProperty.set(taskQueue.getTaskList()); // getTaskList() returns the LinkedList from SerialTaskQueue
queueListView.itemsProperty().bind(listProperty); 

Obviously this doesn't work as it's expecting an ObservableList. There is an elegant way to do it?

解决方案

The simplest solution I can think of is to maintain the task-list outside of the executor and use a callback to feed the executor the next task if it is available. Unfortunately, it involves synchronization on the task-list and an AtomicBoolean to indicate a task executing.

The callback is simply a Runnable that wraps the original task to run and then "calls back" to see if there is another task to execute, and if so, executes it using the (background) executor.

The synchronization is needed to keep the task-list in order and at a known state. The task-list can be modified by two threads at the same time: via the callback running in the executor's (background) thread and via handleItemClicked method executed via the UI foreground thread. This in turn means that it is never exactly known when the task-list is empty for example. To keep the task-list in order and at a known fixed state, synchronization of the task-list is needed.

This still leaves an ambiguous moment to decide when a task is ready for execution. This is where the AtomicBoolean comes in: a value set is always immediatly availabe and read by any other thread and the compareAndSet method will always ensure only one thread gets an "OK".

Combining the synchronization and the use of the AtomicBoolean allows the creation of one method with a "critical section" that can be called by both foreground- and background-threads at the same time to trigger the execution of a new task if possible. The code below is designed and setup in such a way that one such method (runNextTask) can exist. It is good practice to make the "critical section" in concurrent code as simple and explicit as possible (which, in turn, generally leads to an efficient "critical section").

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

public class SerialTaskQueue {

    public static void main(String[] args) {

        ExecutorService executor = Executors.newSingleThreadExecutor();
        // all operations on this list must be synchronized on the list itself.
        SerialTaskQueue tq = new SerialTaskQueue(executor);
        try {
            // test running the tasks one by one
            tq.add(new SleepSome(10L));
            Thread.sleep(5L);
            tq.add(new SleepSome(20L));
            tq.add(new SleepSome(30L));

            Thread.sleep(100L);
            System.out.println("Queue size: " + tq.size()); // should be empty
            tq.add(new SleepSome(10L));

            Thread.sleep(100L);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            executor.shutdownNow();
        }
    }

    // all lookups and modifications to the list must be synchronized on the list.
    private final List<Runnable> tasks = new LinkedList<Runnable>();
    // atomic boolean used to ensure only 1 task is executed at any given time
    private final AtomicBoolean executeNextTask = new AtomicBoolean(true);
    private final Executor executor;

    public SerialTaskQueue(Executor executor) {
        this.executor = executor;
    }

    public void add(Runnable task) {

        synchronized(tasks) { tasks.add(task); }
        runNextTask();
    }

    private void runNextTask() {
        // critical section that ensures one task is executed.
        synchronized(tasks) {
            if (!tasks.isEmpty()
                    && executeNextTask.compareAndSet(true, false)) {
                executor.execute(wrapTask(tasks.remove(0)));
            }
        }
    }

    private CallbackTask wrapTask(Runnable task) {

        return new CallbackTask(task, new Runnable() {
            @Override public void run() {
                if (!executeNextTask.compareAndSet(false, true)) {
                    System.out.println("ERROR: programming error, the callback should always run in execute state.");
                }
                runNextTask();
            }
        });
    }

    public int size() {
        synchronized(tasks) { return tasks.size(); }
    }

    public Runnable get(int index) {
        synchronized(tasks) { return tasks.get(index); }
    }

    public Runnable remove(int index) {
        synchronized(tasks) { return tasks.remove(index); }
    }

    // general callback-task, see https://stackoverflow.com/a/826283/3080094
    static class CallbackTask implements Runnable {

        private final Runnable task, callback;

        public CallbackTask(Runnable task, Runnable callback) {
            this.task = task;
            this.callback = callback;
        }

        @Override public void run() {
            try {
                task.run();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    callback.run();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    // task that just sleeps for a while
    static class SleepSome implements Runnable {

        static long startTime = System.currentTimeMillis();

        private final long sleepTimeMs;
        public SleepSome(long sleepTimeMs) {
            this.sleepTimeMs = sleepTimeMs;
        }
        @Override public void run() {
            try { 
                System.out.println(tdelta() + "Sleeping for " + sleepTimeMs + " ms.");
                Thread.sleep(sleepTimeMs);
                System.out.println(tdelta() + "Slept for " + sleepTimeMs + " ms.");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        private String tdelta() { return String.format("% 4d ", (System.currentTimeMillis() - startTime)); }
    }
}

Update: if groups of tasks need to be executed serial, have a look at the adapted implementation here.

这篇关于在后台线程中运行的可编辑任务队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆