Java,通过多线程环境中的散列统一划分传入的工作 [英] Java, divide incoming work uniformly via hashing in multithreaded evnironments

查看:21
本文介绍了Java,通过多线程环境中的散列统一划分传入的工作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经实现了一个 java 代码来执行传入的任务(如 Runnable),基于它们的 hashCode 模块 nThreads.理想情况下,工作应该在这些线程中均匀分布.具体来说,我们有一个 dispatchId 作为每个 Task 的字符串.

I've implemented a java code to execute incoming tasks (as Runnable) with n Threads based on their hashCode module nThreads. The work should spread, ideally - uniformly, among those threads. Specifically, we have a dispatchId as a string for each Task.

这是这个java代码片段:

Here is this java code snippet:

int nThreads = Runtime.getRuntime().availableProcessors(); // Number of threads
Worker[] workers = new Worker[nThreads]; // Those threads, Worker is just a thread class that can run incoming tasks
...
Worker getWorker(String dispatchId) { // Get a thread for this Task
    return workers[(dispatchId.hashCode() & Integer.MAX_VALUE) % nThreads];
}

重要提示:在大多数情况下,一个 dispatchId 是:

Important: In most cases a dispatchId is:

String dispatchId = 'SomePrefix' + counter.next()

但是,我担心 nThreads 的模除不是一个好的选择,因为 nThreads 应该是一个质数,以便更均匀地分配 dispatId 键.

But, I have a concern that modulo division by nThreads is not a good choice, because nThreads should be a prime number for a more uniform distribution of dispatId keys.

关于如何更好地传播工作,还有其他选择吗?

Are there any other options on how to spread the work better?

更新 1:

每个 Worker 都有一个队列:<代码>队列任务 = new ConcurrentLinkedQueue();

Each Worker has a queue: Queue<RunnableWrapper> tasks = new ConcurrentLinkedQueue();

worker 从中获取任务并执行它们.任务可以从其他线程添加到这个队列中.

The worker gets tasks from it and executes them. Tasks can be added to this queue from other threads.

更新 2:

具有相同dispatchId的任务可以多次进入,因此我们需要通过dispatchId找到它们的线程.

Tasks with the same dispatchId can come in multiple times, therefore we need to find their thread by dispatchId.

最重要的是,每个 Worker 线程必须按顺序处理其传入的任务.因此,上面的更新1中有数据结构Queue.

Most importantly, each Worker thread must process its incoming tasks sequentially. Hence, there is data structure Queue in the update 1 above.

更新 3:此外,某些线程可能很忙,而其他线程则空闲.因此,我们需要以某种方式将队列与线程解耦,但为任务执行保持相同 dispatchId 的 FIFO 顺序.

Update 3: Also, some threads can be busy, while others are free. Thus, we need to somehow decouple Queues from Threads, but maintain the FIFO order for the same dispatchId for tasks execution.

解决方案:我已经实现了 Ben Manes 的想法(他的回答如下),代码可以在这里找到.>

Solution: I've implemented Ben Manes' idea (his answer below), the code can be found here.

推荐答案

听起来您需要按调度 id 进行 FIFO 排序,因此理想的做法是将调度队列作为抽象.这可以解释您对散列的担忧,因为它没有提供统一的分布,因为一些调度队列可能比其他调度队列更活跃,并且在工作人员之间不公平地平衡.通过将队列与工作线程分离,您可以保留 FIFO 语义并均匀分布工作.

It sounds like you need FIFO ordering per dispatch id, so the ideal would be to have dispatch queues as the abstraction. That would explain your concern about hashing as not providing uniform distribution, as some dispatch queues may be more active than others and unfairly balanced among workers. By separating the queue from the worker, you retain FIFO semantics and evenly spread out the work.

提供此抽象的非活动库是 HawtDispatch.它与 Java 6 兼容.

An inactive library that provides this abstraction is HawtDispatch. It is Java 6 compatible.

一个非常简单的 Java 8 方法是使用 CompletableFuture 作为一种排队机制,ConcurrentHashMap 用于注册,以及一个 Executor(例如 ForkJoinPool) 用于计算.参见 EventDispatcher 用于实现此想法,其中注册是显式的.如果您的调度员更加动态,那么您可能需要定期修剪地图.基本思路如下.

A very simple Java 8 approach is to use CompletableFuture as a queuing mechanism, ConcurrentHashMap for registration, and an Executor (e.g. ForkJoinPool) for computing. See EventDispatcher for an implementation of this idea, where registration is explicit. If your dispatchers are more dynamic then you may need to periodically prune the map. The basic idea is as follows.

ConcurrentMap<String, CompletableFuture<Void>> dispatchQueues = ...

public CompletableFuture<Void> dispatch(String queueName, Runnable task) {
  return dispatchQueues.compute(queueName, (k, queue) -> {
    return (queue == null)
        ? CompletableFuture.runAsync(task)
        : queue.thenRunAsync(task);
  });
}

更新 (JDK7)

上述想法的反向移植将用番石榴翻译成类似的东西,

A backport of the above idea would be translated with Guava into something like,

ListeningExecutorService executor = ...
Striped<Lock> locks = Striped.lock(256);
ConcurrentMap<String, ListenableFuture<?>> dispatchQueues = ...

public ListenableFuture<?> dispatch(String queueName, final Runnable task) {
  Lock lock = locks.get(queueName);
  lock.lock();
  try {
    ListenableFuture<?> future = dispatchQueues.get(queueName);
    if (future == null) {
      future = executor.submit(task);
    } else {
      final SettableFuture<Void> next = SettableFuture.create();
      future.addListener(new Runnable() {
        try {
          task.run();
        } finally {
          next.set(null);
        }
      }, executor);
      future = next;
    }
    dispatchQueues.put(queueName, future);
  } finally {
    lock.unlock();
  }
}

这篇关于Java,通过多线程环境中的散列统一划分传入的工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆