并行并顺序地处理Java任务 [英] Processing tasks in parallel and sequentially Java
问题描述
在我的程序中,用户可以通过界面触发不同的任务,这需要一些时间来处理.因此,它们是由线程执行的.到目前为止,我已经实现了它,这样我就拥有了一个执行程序,该执行程序的一个线程一个接一个地执行所有任务.但是现在我想对所有内容进行并行处理.
In my program, the user can trigger different tasks via an interface, which take some time to process. Therefore they are executed by threads. So far I have implemented it so that I have an executer with one thread that executes all tasks one after the other. But now I would like to parallelize everything a little bit.
即我想并行运行任务,除非它们具有相同的路径,那么我想按顺序运行它们.例如,我的池中有10个线程,当一个任务进入时,该任务应分配给当前正在使用相同路径处理任务的工作程序.如果当前没有工作人员正在处理具有相同路径的任务,那么该任务应该由当前空闲的工作人员处理.
i.e. I would like to run tasks in parallel, except if they have the same path, then I want to run them sequentially. For example, I have 10 threads in my pool and when a task comes in, the task should be assigned to the worker which is currently processing a task with the same path. If no task with the same path is currently being processed by a worker, then the task should be processed by a currently free worker.
其他信息:任务是在本地文件系统中的文件上执行的任何类型的任务.例如,重命名文件.因此,任务具有属性 path
.而且我不想在同一文件上同时执行两个任务,因此具有相同路径的此类任务应按顺序执行.
Additional info: A task is any type of task that is executed on a file in the local file system. For example, renaming a file. Therefore, the task have the attribute path
. And I don't want to execute two tasks on the same file at the same time, so such tasks with the same paths should be performed sequentially.
这是我的示例代码,但有工作要做:
Here is my sample code but there is work to do:
我的问题之一是,我需要一种安全的方法来检查工作程序当前是否正在运行并获取当前正在运行的工作程序的路径.安全起见,不会发生同时访问问题或其他线程问题.
public class TasksOrderingExecutor {
public interface Task extends Runnable {
//Task code here
String getPath();
}
private static class Worker implements Runnable {
private final LinkedBlockingQueue<Task> tasks = new LinkedBlockingQueue<>();
//some variable or mechanic to give the actual path of the running tasks??
private volatile boolean stopped;
void schedule(Task task) {
tasks.add(task);
}
void stop() {
stopped = true;
}
@Override
public void run() {
while (!stopped) {
try {
Task task = tasks.take();
task.run();
} catch (InterruptedException ie) {
// perhaps, handle somehow
}
}
}
}
private final Worker[] workers;
private final ExecutorService executorService;
/**
* @param queuesNr nr of concurrent task queues
*/
public TasksOrderingExecutor(int queuesNr) {
Preconditions.checkArgument(queuesNr >= 1, "queuesNr >= 1");
executorService = new ThreadPoolExecutor(queuesNr, queuesNr, 0, TimeUnit.SECONDS, new SynchronousQueue<>());
workers = new Worker[queuesNr];
for (int i = 0; i < queuesNr; i++) {
Worker worker = new Worker();
executorService.submit(worker);
workers[i] = worker;
}
}
public void submit(Task task) {
Worker worker = getWorker(task);
worker.schedule(task);
}
public void stop() {
for (Worker w : workers) w.stop();
executorService.shutdown();
}
private Worker getWorker(Task task) {
//check here if a running worker with a specific path exists? If yes return it, else return a free worker. How do I check if a worker is currently running?
return workers[task.getPath() //HERE I NEED HELP//];
}
}
推荐答案
您所需要的只是参与者的哈希图,并以文件路径为键.不同的参与者将并行运行,而具体的参与者将按顺序处理任务.您的解决方案是错误的,因为Worker类使用阻塞操作 take
但在有限的线程池中执行,这可能导致线程饥饿(一种死锁).演员在等待下一条消息时不会阻塞.
all you need is a hash map of actors, with file path as a key. Different actors would run in parallel, and concrete actor would handle tasks sequentially.
Your solution is wrong because Worker class uses blocking operation take
but is executed in a limited thread pool, which may lead to a thread starvation (a kind of deadlock). Actors do not block when waiting for next message.
import org.df4j.core.dataflow.ClassicActor;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
public class TasksOrderingExecutor {
public static class Task implements Runnable {
private final String path;
private final String task;
public Task(String path, String task) {
this.path = path;
this.task = task;
}
//Task code here
String getPath() {
return path;
}
@Override
public void run() {
System.out.println(path+"/"+task+" started");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
System.out.println(path+"/"+task+" stopped");
}
}
static class Worker extends ClassicActor<Task> {
@Override
protected void runAction(Task task) throws Throwable {
task.run();
}
}
private final ExecutorService executorService;
private final Map<String,Worker> workers = new HashMap<String,Worker>(){
@Override
public Worker get(Object key) {
return super.computeIfAbsent((String) key, (k) -> {
Worker res = new Worker();
res.setExecutor(executorService);
res.start();
return res;
});
}
};
/**
* @param queuesNr nr of concurrent task queues
*/
public TasksOrderingExecutor(int queuesNr) {
executorService = ForkJoinPool.commonPool();
}
public void submit(Task task) {
Worker worker = getWorker(task);
worker.onNext(task);
}
public void stop() throws InterruptedException {
for (Worker w : workers.values()) {
w.onComplete();
}
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.SECONDS);
}
private Worker getWorker(Task task) {
//check here if a runnig worker with a specific path exists? If yes return it, else return a free worker. How do I check if a worker is currently running?
return workers.get(task.getPath());
}
public static void main(String[] args) throws InterruptedException {
TasksOrderingExecutor orderingExecutor = new TasksOrderingExecutor(20);
orderingExecutor.submit(new Task("path1", "task1"));
orderingExecutor.submit(new Task("path1", "task2"));
orderingExecutor.submit(new Task("path2", "task1"));
orderingExecutor.submit(new Task("path3", "task1"));
orderingExecutor.submit(new Task("path2", "task2"));
orderingExecutor.stop();
}
}
执行协议表明具有相同密钥的任务是顺序执行的,具有不同密钥的任务是并行执行的:
The protocol of execution shows that tasks with te same key are executed sequentially and tasks with different keys are executed in parallel:
path3/task1 started
path2/task1 started
path1/task1 started
path3/task1 stopped
path2/task1 stopped
path1/task1 stopped
path2/task2 started
path1/task2 started
path2/task2 stopped
path1/task2 stopped
我使用了自己的actor库 DF4J ,但可以使用任何其他actor库.
I used my own actor library DF4J, but any other actor library can be used.
这篇关于并行并顺序地处理Java任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!