Java,通过在多线程evnironments中的哈希统一传入的工作 [英] Java, divide incoming work uniformly via hashing in multithreaded evnironments

查看:431
本文介绍了Java,通过在多线程evnironments中的哈希统一传入的工作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经实现了一个java代码来执行传入的任务( Runnable )与n Threads基于他们的hashCode模块 nThreads 。工作应该在这些线程之间理想地均匀地传播。
具体来说,我们有一个 dispatchId 作为每个任务的字符串。



snippet:

  int nThreads = Runtime.getRuntime()。availableProcessors //线程数量
Worker [] workers = new Worker [nThreads]; //这些线程,Worker只是一个可以运行传入任务的线程类
...
worker getWorker(String dispatchId){//为此任务获取线程
return workers [ dispatchId.hashCode()& Integer.MAX_VALUE)%nThreads];重要提示:在大多数情况下,dispatchId是:

 字符串dispatchId ='SomePrefix'+ counter.next()

但是,我担心由nThreads进行模除法不是一个好的选择,因为nThreads应该是一个更均匀分布dispatId键的素数。



更新1:



每个Worker都有一个队列:
队列< RunnableWrapper> tasks = new ConcurrentLinkedQueue();



工人从中获取任务并执行它们。



更新2:



具有相同 dispatchId 的任务可能会出现多次,因此我们需要通过 dispatchId 找到他们的线程。 p>

最重要的是,每个Worker线程都必须按顺序处理它的传入任务。因此,在上面的更新1中有数据结构Queue。



Update 3:
此外,而其他是免费的。因此,我们需要以某种方式将队列与线程解耦,但是为任务执行保持相同的 dispatchId 的FIFO顺序。



解决方案:
我实现了Ben Manes的想法(下面的答案),代码可以在这里

解决方案

这听起来像你需要每个dispatch id的FIFO排序,所以理想的是有dispatch队列作为抽象。这将解释你对散列为不提供均匀分布的关注,因为一些调度队列可能比其他调度队列更活跃,并且在工作者之间不公平地平衡。



提供此抽象的非活动库是 HawtDispatch 。它与Java 6兼容。



一个非常简单的Java 8方法是使用 CompletableFuture 作为排队机制, ConcurrentHashMap 用于注册,而Executor(例如 ForkJoinPool )。请参阅 EventDispatcher 来实现此构思,其中注册是显式的。如果您的调度员更具动态性,那么您可能需要定期修剪地图。基本想法如下。

  ConcurrentMap< String,CompletableFuture< Void> dispatchQueues = ... 

public CompletableFuture< Void> dispatch(String queueName,Runnable task){
return dispatchQueues.compute(queueName,(k,queue) - > {
return(queue == null)
?CompletableFuture.runAsync )
:queue.thenRunAsync(task);
});
}

更新(JDK7) b
$ b

上面的想法的一个反向端口将被Guava翻译成类似

  ListeningExecutorService executor = ... 
Striped< Lock> locks = Striped.lock(256);
ConcurrentMap< String,ListenableFuture<?>> dispatchQueues = ...

public ListenableFuture<?> dispatch(String queueName,final Runnable task){
Lock lock = locks.get(queueName);
lock.lock();
try {
ListenableFuture<?> future = dispatchQueues.get(queueName);
if(future == null){
future = executor.submit(task);
} else {
final SettableFuture< Void> next = SettableFuture.create();
future.addListener(new Runnable(){
try {
task.run();
} finally {
next.set(null);
}
},executor);
future = next;
}
dispatchQueues.put(queueName,future);
} finally {
lock.unlock();
}
}


I've implemented a java code to execute incoming tasks (as Runnable) with n Threads based on their hashCode module nThreads. The work should spread, ideally - uniformly, among those threads. Specifically, we have a dispatchId as a string for each Task.

Here is this java code snippet:

int nThreads = Runtime.getRuntime().availableProcessors(); // Number of threads
Worker[] workers = new Worker[nThreads]; // Those threads, Worker is just a thread class that can run incoming tasks
...
Worker getWorker(String dispatchId) { // Get a thread for this Task
    return workers[(dispatchId.hashCode() & Integer.MAX_VALUE) % nThreads];
}

Important: In most cases a dispatchId is:

String dispatchId = 'SomePrefix' + counter.next()

But, I have a concern that modulo division by nThreads is not a good choice, because nThreads should be a prime number for a more uniform distribution of dispatId keys.

Are there any other options on how to spread the work better?

Update 1:

Each Worker has a queue: Queue<RunnableWrapper> tasks = new ConcurrentLinkedQueue();

The worker gets tasks from it and executes them. Tasks can be added to this queue from other threads.

Update 2:

Tasks with the same dispatchId can come in multiple times, therefore we need to find their thread by dispatchId.

Most importantly, each Worker thread must process its incoming tasks sequentially. Hence, there is data structure Queue in the update 1 above.

Update 3: Also, some threads can be busy, while others are free. Thus, we need to somehow decouple Queues from Threads, but maintain the FIFO order for the same dispatchId for tasks execution.

Solution: I've implemented Ben Manes' idea (his answer below), the code can be found here.

解决方案

It sounds like you need FIFO ordering per dispatch id, so the ideal would be to have dispatch queues as the abstraction. That would explain your concern about hashing as not providing uniform distribution, as some dispatch queues may be more active than others and unfairly balanced among workers. By separating the queue from the worker, you retain FIFO semantics and evenly spread out the work.

An inactive library that provides this abstraction is HawtDispatch. It is Java 6 compatible.

A very simple Java 8 approach is to use CompletableFuture as a queuing mechanism, ConcurrentHashMap for registration, and an Executor (e.g. ForkJoinPool) for computing. See EventDispatcher for an implementation of this idea, where registration is explicit. If your dispatchers are more dynamic then you may need to periodically prune the map. The basic idea is as follows.

ConcurrentMap<String, CompletableFuture<Void>> dispatchQueues = ...

public CompletableFuture<Void> dispatch(String queueName, Runnable task) {
  return dispatchQueues.compute(queueName, (k, queue) -> {
    return (queue == null)
        ? CompletableFuture.runAsync(task)
        : queue.thenRunAsync(task);
  });
}

Update (JDK7)

A backport of the above idea would be translated with Guava into something like,

ListeningExecutorService executor = ...
Striped<Lock> locks = Striped.lock(256);
ConcurrentMap<String, ListenableFuture<?>> dispatchQueues = ...

public ListenableFuture<?> dispatch(String queueName, final Runnable task) {
  Lock lock = locks.get(queueName);
  lock.lock();
  try {
    ListenableFuture<?> future = dispatchQueues.get(queueName);
    if (future == null) {
      future = executor.submit(task);
    } else {
      final SettableFuture<Void> next = SettableFuture.create();
      future.addListener(new Runnable() {
        try {
          task.run();
        } finally {
          next.set(null);
        }
      }, executor);
      future = next;
    }
    dispatchQueues.put(queueName, future);
  } finally {
    lock.unlock();
  }
}

这篇关于Java,通过在多线程evnironments中的哈希统一传入的工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆