将给定ID的任务绑定到同一线程的线程池 [英] Thread pool that binds tasks for a given ID to the same thread

查看:396
本文介绍了将给定ID的任务绑定到同一线程的线程池的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

是否存在线程池(在Java中)的任何实现,以确保在同一线程上执行相同逻辑ID的所有任务?

Are there any implementations of a thread pool (in Java) that ensures all tasks for the same logical ID are executed on the same thread?

逻辑I'如果在给定逻辑ID的特定线程上已经执行了任务,则在同一线程上调度具有相同ID的新任务。如果没有线程为同一个ID执行任务,则可以使用任何线程。

The logic I'm after is if there is already a task being executed on a specific thread for a given logical ID, then new tasks with the same ID are scheduled on the same thread. If there are no threads executing a task for the same ID then any thread can be used.

这将允许并行执行不相关ID的任务,但是相同的ID将以串行和提交的顺序执行。

This would allow tasks for unrelated IDs to be executed in parallel, but tasks for the same ID to be executed in serial and in the order submitted.

如果没有,是否有任何关于如何扩展的建议 ThreadPoolExecutor 获得此行为(如果可能的话)?

If not, are there any suggestions on how I might extend ThreadPoolExecutor to get this behaviour (if that's even possible)?

更新

花了更长时间考虑这个问题,我实际上并不要求在同一个线程上执行相同逻辑ID的任务,只是它们不能同时执行。

Having spent longer thinking about this, I don't actually require that tasks for the same logical ID get executed on the same thread, just that they don't get executed at the same time.

这方面的一个例子是一个处理客户订单的系统,可以同时处理多个订单,但不能同一个客户(同一个客户的所有订单)必须按顺序处理。)

An example for this would be a system that processed orders for customers, where it was OK to process multiple orders at the same time, but not for the same customer (and all orders for the same customer had to be processed in order).

我现在采取的方法是使用标准的ThreadPoo lExecutor,具有自定义的 BlockingQueue ,并使用自定义包装器包装 Runnable Runnable 包装器逻辑是:

The approach I'm taking at the moment is to use a standard ThreadPoolExecutor, with a customised BlockingQueue and also wrapping the Runnable with a custom wrapper. The Runnable wrapper logic is:


  1. 以原子方式尝试向并发'运行添加ID 'set( ConcurrentHashMap )查看同一ID的任务当前是否正在运行

    • 如果添加失败,请按任务回到队列的前面并立即返回

    • 如果成功,继续

  1. Atomically attempt to add ID to concurrent 'running' set (ConcurrentHashMap) to see if a task for the same ID is currently running
    • if add fails, push the task back on to the front of the queue and return immediately
    • if succeeeds, carry on

queue的 poll()方法然后只返回具有当前不在运行集中的ID的任务。

The queue's poll() methods then only return tasks that have an ID that is not currently in the 'running' set.

问题在于,我确信会有很多我没想过的极端情况,所以它需要很多的测试。

The trouble with this is that I'm sure there are going to be a lot of corner cases that I haven't thought about, so it's going to require a lot of testing.

推荐答案

创建一个执行器服务数组,每个服务器运行一个线程,并通过y的哈希码将队列条目分配给它们我们的商品ID。该数组可以是任何大小,具体取决于您最多要使用多少线程。

Create an array of executor services running one thread each and assign your queue entries to them by the hash code of your item id. The array can be of any size, depending on how many threads at most do you want to use.

这将限制我们可以使用执行程序服务但仍然允许使用其功能在不再需要时关闭唯一的线程(使用 allowCoreThreadTimeOut(true))并根据需要重新启动它。此外,所有排队的东西都可以在不重写的情况下工作。

This will restrict that we can use from the executor service but still allows to use its capability to shut down the only thread when no longer needed (with allowCoreThreadTimeOut(true)) and restart it as required. Also, all queuing stuff will work without rewriting it.

这篇关于将给定ID的任务绑定到同一线程的线程池的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆