使用ExecutorService将任务确定性分配给线程 [英] Deterministic assignment of tasks to threads using ExecutorService

查看:181
本文介绍了使用ExecutorService将任务确定性分配给线程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

给定具有固定线程池的Executor服务,是否可以保证任务到线程的确定性分配?更准确地说,假设只有两个线程,即池线程0和池线程-1,并且存在要执行的2个任务的集合。我想实现的是前一个线程总是执行第一个,而后者处理剩下的一个。

Given Executor service with a fixed pool of threads, is it possible to guarantee deterministic assignment of tasks to threads? More precisely, assume there are just two threads, namely pool-thread-0 and pool-thread-1 and there is a collection of 2 tasks to be executed. What I wish to achieve is that the former thread always executes the first one, while the latter handles the remaining one.

这里是一个例子:

public static void main(String[] args) throws InterruptedException, ExecutionException {
    ExecutorService executorService = newFixedThreadPool(2,
            new ThreadFactoryBuilder().setNameFormat("pool-thread-%d").build());

    for (int i = 0; i < 5; i++) {
        List<Callable<Integer>> callables = ImmutableList.of(createCallable(1), createCallable(2));
        executorService.invokeAll(callables);

    }
}

public static Callable<Integer> createCallable(final int task) {
    return new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            currentThread().sleep(1000);
            System.out.println(Thread.currentThread().getName() + " executes task num: " + task);
            return task;
        }
    };
}

我的机器输出示例:

pool-thread-0 executes task num: 1
pool-thread-1 executes task num: 2

pool-thread-0 executes task num: 2
pool-thread-1 executes task num: 1

pool-thread-0 executes task num: 2
pool-thread-1 executes task num: 1

pool-thread-0 executes task num: 2
pool-thread-1 executes task num: 1

pool-thread-0 executes task num: 1
pool-thread-1 executes task num: 2

简而言之,I希望确保pool-thread-0总是执行第一个任务。任何帮助将非常感激。

In a nutshell, I wish to ensure that pool-thread-0 always executes the first task. Any help will be greatly appreciated!

推荐答案

ExecutorService不是为其Callable / Runnable提供线程亲和力。人们可以说这是一个点,API是让程序员处理工作描述(Callable),而不是线程处理。

ExecutorService is not designed to provide "Thread affinity" to its Callable/Runnable. One could argue "that's kind of the point", the API is there to let programmers deal with the work description (the Callable), and not the Thread handling at all.

您的设计,因为,其中有一个随机数据生成器与每个线程相关联不适合ExecutorService,三个原因我看到:

Your design, as is, where "there is a random data generator associated with each thread" is not suited to ExecutorService, for three reasons I see:


  1. 你不能控制什么线程将被创建(或销毁!)和什么时候(如果一个崩溃?池会重新创建它,但它会得到什么随机生成器)。所以我们不能推断一个可靠的方式说这个线程有这个生成器,更少第二线程有这个生成器,因为可能甚至没有第二个线程(如果每个任务如此快,

  1. You can not control what threads will be created (or destroyed!) and when (what if one crashes ? The pool will recreate it but what random generator will it get ?). So we can not infer a reliable way of saying "this thread" has "this generator", much less "the second" thread has "this generator" because there may not even be a second thread (what if each task is so quick that they are treated faster than you dispatch them ?).

您不控制什么时间执行什么任务。好了...用Executors.newFixedThreadPool,你做的扩展,他们按提交顺序分派,但对于所有你知道,操作系统调度程序可能会给线程1的所有优先级,这将结束所有的工作,

You do not control what tasks will be executed when. Well... with a Executors.newFixedThreadPool, you do to the extense that they are dispatched in order of submission, but for all you know, the OS scheduler may give all priority to thread 1, that will end up doing all the work, and thread 2 will have done nothing at all (it can be any proportion in between).

你可以传递一个数据生成器到一个线程是如果你重写执行者服务的ThreadFactory。否则,你没有访问线程实例(运行时从调用者自己的appart)。所以要关联一个特定的生成器到一个特定的线程,你必须知道你当前创建的线程号,这是很容易如果你计数线程,但困难,如果你想知道什么是Callable,这

The only way you can pass a "data generator" to a thread is if you override the ThreadFactory of the executor service. Otherwise, you have no access to the thread instance (appart from the callable themselves while running). So to associate a particular generator to a particular thread, you'd have to know which thread number you are currently creating, which is easy if you're counting threads, but difficult if you're trying to know what is the Callable that this thread is intended to (see point 2).

所以我强烈建议你定义一些其他方法将您的工作单元与数据生成器相关联,因为线程实例通常不可靠 - 至少不是通过执行服务。
例如当你说

So I'd strongly suggest that you define some other way of associating your work units with your data generators, because "Thread instance" in general is not reliable - at least not through Executor Service. E.g. when you say


我需要提供他们处理的线程和数据的组合是可重复的。

I need to provide that the combinations of threads and data they process are repeatable.

我理解您将总是分派一定数量的Callables,并且您需要每个Callables来处理由特定生成器发出的特定数据集。假设我们有一个给定数量的任务和3个生成器,任务(N)将使用生成器 N%3

I understand that you will dispatch always a certain number of Callables, and you need each of them to work on a specific set of data as issued by a specific generator. Say if we have a given number of tasks and 3 generators, task(N) will use generator N%3.

为了使结果可重复,您还需要使用相同生成器的任务不会同时执行(您希望通过线程关联实现什么)。

For the results to be repeatable, you further need that the tasks that use the same generator do not execute concurrently (what you seek to achieve with thread affinity ?).

有一定数量的模式可以实现这一点。

There are a certain number of patterns that can achieve that.

在你的执行器服务中创建3个任务,每个人监听一个 BlockingQueue (它的私有等待列表),并有自己的私有生成器。

使你的主线程成为一个生产者:当它创建工作单元(以前在你的原始设计中 Callable )时, N,将其分派到等待队列号N%3。就是这样:每个消费者将按顺序和顺序接收其自己的数据以按照您希望的顺序进行计算。你已经实现了亲和力。

Make 3 tasks in your executor service, each one listens to a BlockingQueue (its private waiting list) and has its own private generator. These are the consummers.
Make your main thread a producer : when it creates work unit (what used to be a Callable in your original design) number N, dispatch it to the waiting queue number N%3. That's it : each consummer will receive, in order and sequentially, its own data to compute, in the order you wish. You have achieved "affinity".

首先,重构你的callables有一个链接到他们需要使用的生成器。
然后,在您的主线程上,构建要为每个生成器运行的任务的列表。

从主线程分派每个生成器的第一个任务。

和每个可调用的结束,使Callable从它的列表中分派下一个工作单元。

请不要锁定你,但是,如果你从可调用项分派callables,不要等待结果,因为这将阻止Callables完成,这反过来防止新调度的执行。这是一个死锁。

First, refactor your callables to have a link to the generator they need to use. Then, on your main thread, build a list of tasks that are to be run for each generator.
Dispatch, from the main thread, the first task for each generator.
And the end of each callable, make the Callable dispatch the next work unit from its list.
Be carreful not to "lock you out", though, if you dispatch callables from callables, do not wait for results, because this will prevent Callables from finishing, which in turn prevent the newly dispatched to execute. Which is a deadlock.

使用这两种方法之一,您不能保证什么任务将首先完成或最后一个,但是您保证您分派的工作单元可预测地与您控制的数据生成器相关联,并且它们将按照您分派的顺序执行。这希望是足够的。

Doing either of these two ways, you are not guaranteed what tasks will finish first or last, but you are guaranteed that the unit of works you dispatch are predictably associated to a data generator you control, and that they will execute in the order you dispatch them. Which hopefully is enough.

这篇关于使用ExecutorService将任务确定性分配给线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆