Java BlockingQueue有批处理吗? [英] Java BlockingQueue with batching?

查看:159
本文介绍了Java BlockingQueue有批处理吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对与Java BlockingQueue相同的数据结构感兴趣,不同之处在于它必须能够批处理队列中的对象.换句话说,我希望生产者能够将对象放入队列,但是将消费者块放在 take()上,直到队列达到特定大小(批处理大小)为止.

然后,一旦队列达到批处理大小,生产者必须在 put()上阻塞,直到消费者消耗掉队列中的所有元素(在这种情况下,生产者将开始生产)再次,消费者封锁直到再次到达批次为止.

是否存在类似的数据结构?还是我应该写(我不介意),如果那里有东西,我就是不想浪费时间.


更新

也许可以澄清一下事情:

情况将始终如下.可以有多个生产者将项目添加到队列中,但是从队列中提取项目的消费者永远不会超过一个.

现在,问题在于并行和串行有多个这样的设置.换句话说,生产者为多个队列生产商品,而消费者本身也可以是生产者.可以更容易地将其视为生产者,消费者生产者以及最终消费者的有向图.

生产者应该阻塞直到队列为空(@Peter Lawrey)的原因是因为每个队列都将在线程中运行.如果您仅在空间可用时让它们简单地生产,就会出现这样的情况:您有太多线程试图一次处理太多东西.

也许将其与执行服务结合起来可以解决问题?

解决方案

我建议您使用 解决方案

I would suggest you use BlockingQueue.drainTo(Collection, int). You can use it with take() to ensure you get a minimum number of elements.

The advantage of using this approach is that your batch size grows dynamically with the workload and the producer doesn't have to block when the consumer is busy. i.e. it self optimises for latency and throughput.


To implement exactly as asked (which I think is a bad idea) you can use a SynchronousQueue with a busy consuming thread.

i.e. the consuming thread does a

 list.clear();
 while(list.size() < required) list.add(queue.take());
 // process list.

The producer will block when ever the consumer is busy.

这篇关于Java BlockingQueue有批处理吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆