多线程执行,保留完成的工作项的顺序 [英] Multithreaded execution where order of finished Work Items is preserved

查看:77
本文介绍了多线程执行,保留完成的工作项的顺序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个工作单元流程,可以将它们称为工作项",这些工作项是按顺序处理的(现在).我想通过多线程工作来加快处理速度.

I have a flow of units of work, lets call them "Work Items" that are processed sequentially (for now). I'd like to speed up processing by doing the work multithreaded.

约束:这些工作项按特定顺序排列,在处理过程中该顺序无关紧要-但是一旦处理完成,必须恢复该顺序.

Constraint: Those work items come in a specific order, during processing the order is not relevant - but once processing is finished the order must be restored.

类似这样的东西:

   |.|
   |.|
   |4|
   |3|
   |2|    <- incoming queue
   |1|
  / | \
 2  1  3  <- worker threads
  \ | /
   |3|
   |2|    <- outgoing queue
   |1|

我想用Java解决此问题,最好不使用Executor Services,Futures等,但是要使用基本的并发方法,例如wait(),notify()等.

I would like to solve this problem in Java, preferably without Executor Services, Futures, etc., but with basic concurrency methods like wait(), notify(), etc.

原因是:我的工作项"非常小且细粒,它们每个完成处理的时间约为0.2毫秒.因此,我担心使用java.util.concurrent.*中的东西可能会引入大量开销并减慢我的代码速度.

Reason is: My Work Items are very small and fine grained, they finish processing in about 0.2 milliseconds each. So I fear using stuff from java.util.concurrent.* might introduce way to much overhead and slow my code down.

到目前为止,我发现的所有示例在处理过程中都保留了顺序(在我的情况下是无关紧要的),并且在处理后并不关心顺序(在我的情况下至关重要).

The examples I found so far all preserve the order during processing (which is irrelevant in my case) and didn't care about order after processing (which is crucial in my case).

推荐答案

这是我在上一个项目中解决您的问题的方法(但是 with java.util.concurrent):

This is how I solved your problem in a previous project (but with java.util.concurrent):

(1)WorkItem类执行实际的工作/处理:

(1) WorkItem class does the actual work/processing:

public class WorkItem implements Callable<WorkItem> {
    Object content;
    public WorkItem(Object content) {
        super();
        this.content = content;
    }

    public WorkItem call() throws Exception {
        // getContent() + do your processing
        return this;
    }
}

(2)此类将工作项放入队列中并启动处理:

(2) This class puts Work Items in a queue and initiates processing:

public class Producer {
    ...
    public Producer() {
        super();
        workerQueue = new ArrayBlockingQueue<Future<WorkItem>>(THREADS_TO_USE);
        completionService = new ExecutorCompletionService<WorkItem>(Executors.newFixedThreadPool(THREADS_TO_USE));
        workerThread = new Thread(new Worker(workerQueue));
        workerThread.start();
    }

    public void send(Object o) throws Exception {
        WorkItem workItem = new WorkItem(o);
        Future<WorkItem> future = completionService.submit(workItem);
        workerQueue.put(future);
    }
}

(3)处理完成后,在此处将工作项出队:

(3) Once processing is finished the Work Items are dequeued here:

public class Worker implements Runnable {
    private ArrayBlockingQueue<Future<WorkItem>> workerQueue = null;

    public Worker(ArrayBlockingQueue<Future<WorkItem>> workerQueue) {
        super();
        this.workerQueue = workerQueue;
    }

    public void run() {
        while (true) {
            Future<WorkItem> fwi = workerQueue.take(); // deqeueue it
            fwi.get(); // wait for it till it has finished processing
        }
    }
}

(4)这是您将如何使用代码中的内容并提交新作品的方法:

(4) This is how you would use the stuff in your code and submit new work:

public class MainApp {
    public static void main(String[] args) throws Exception {
        Producer p = new Producer();
        for (int i = 0; i < 10000; i++)
            p.send(i);
    }
}

这篇关于多线程执行,保留完成的工作项的顺序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆