与独立消费者并发处理单个InputStream [英] Concurrent processing of single InputStream with independent consumers

查看:178
本文介绍了与独立消费者并发处理单个InputStream的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要产生N个消费者线程,它们同时处理相同的InputStream,例如 - 以某种方式变换,计算校验和或数字签名等。这些消费者不依赖于彼此,并且他们都使用第三方库接受InputStream作为数据源。

I need to spawn N consumer threads, which process same InputStream concurrently, e.g - transform it somehow, calculate checksum or digital signature etc. These consumers do not depend on each other and all of them are using third-party libraries, which accept InputStream as source of data.

所以我可以做的是 - 创建一些InputStream的实现,它将

So what I can do is - create some implementation of InputStream, which will


  • 从父流中读取数据块

  • 取消阻止消费者

  • 等待每个消费者读取整个数据块

  • 阅读下一个区块

  • read chunk of data from "parent" stream
  • unblock consumers
  • wait until every consumer read the whole chunk
  • read next chunk

看起来很简单,某些消费者死亡,实施所有InputStream方法,使用障碍/锁存器等控制消费者自身的分支/加入等。

while being looking simple, it may rise various problems like livelock when certain consumer dies, implement all of InputStream methods, control fork/join of consumers themselves using barriers/latches etc.

一个伙伴告诉我,

我更喜欢使用足够成熟的东西(google搜索没有结果,所以我的google-fu不够好? )或不打扰并将整个源流复制到临时文件中,并将其用作数据源。后者的解决方案似乎更可靠,但最终可能会创建千兆字节文件(例如在处理流式音频时)。

I'd prefer either use something mature enough (googling didn't come with results thus, my google-fu isn't good enough?) or don't bother and copy entire "source" stream into a temporary file and use it as source of data. The latter solution seems to be more reliable, but may end up in creation of gigabyte files (when processing streaming audio for example).

推荐答案

p>我看到它,你应该至少有一种缓冲,所以不同的消费者可以通过流以不同的速度移动,没有一切都在不断被目前最慢的消费者陷入困境。这基本上确保最坏的性能和并发的很少的好处。

The way I see it, you should have at least some kind of buffering so different consumers can move through the stream at different pace without everything being constantly bogged down by the currently slowest consumer. That basically ensures worst-case performance and very little benefit of concurrency.

你可以说,用到目前为止使用它的消费者标记每个块,然后删除那些被完全用完的消费者。也许这可以通过每个消费者持有对还没有使用的每个块的引用来实现,这将允许GC自动地处理使用的块。生产者可能保留一个 WeakReference 的列表到块,所以它有一个句柄的数量尚未使用,并基于它的节制。

You could, say, tag each chunk with the consumers that have used it so far and then delete those that are completely used up. Maybe this could be achieved by each consumer holding a reference to each chunk it hasn't yet used, which would allow GC to automatically take care of used chunks. The producer might keep a list of WeakReferences to the chunks so it has a handle on the number of chunks yet to be used and base its throttling on that.

我也在想每个线程有一个单独的 InputStream 实例,它在内部与生产者通信 InputStream 。这样你有一个简单的解决方案你的活锁危险: try ... finally {is.close(); } - 死亡消费者关闭自己的输入流。

I am also thinking about having a separate InputStream instance per thread, which internally communicates with the producer InputStream. This way you have an easy solution for your livelock hazard: try ... finally { is.close(); } -- the dying consumer closes its own inputstream. This is communicated to the producer.

每个消费者使用 ArrayBlockingQueue 我有一些想法。在确保所有消费者都适当地喂养,而不使生产者阻塞或忙等待时会有一些困难。

I have some ideas with using an ArrayBlockingQueue per consumer. There would be some difficulty in ensuring that all consumers are properly fed, without making the producer either block or busy-wait.

这篇关于与独立消费者并发处理单个InputStream的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆