发布者/订阅者模式的并发实现 [英] The concurrent implementation of a publisher/subscriber pattern

查看:160
本文介绍了发布者/订阅者模式的并发实现的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用Java实现各种发布商/订阅者模式,并且目前运行的想法不多。

I want to implement a variety of of a publisher/subscriber pattern using Java and currently running out of ideas.

有1个发布商和N个订阅者发布对象,则每个订阅者需要以正确的顺序处理每个对象一次且仅一次。发布商和每个订阅者都在自己的线程中运行。

There is 1 publisher and N subscribers, the publisher publish objects then each subscriber need to process the each of the objects once and only once in the correct order. The publisher and each subscriber run in their own thread.

在我原来的实现中,每个订阅者都有自己的阻塞队列,发布者将对象放入每个订阅者的队列。这工作正常,但发布商将被阻止,如果任何订阅者的队列已满。这导致性能的降低,因为每个订户在处理对象时花费不同的时间。

In my original implementation, each subscriber has its own blocking queue, and the publisher put objects into each of the subscriber's queue. This works ok but the publisher will be blocked if any of the subscriber's queue is full. This leads to degration of performance as each subscriber takes different time in processing the object.

然后在另一个实现中,发布者将对象保存在其自己的队列中。与对象一起,AtomicInteger计数器与其相关联,具有在那里的订阅者数量。然后,每个订户将窥探队列并减少计数器,并在计数器达到零时将其从队列中删除。

Then in another implementation, the publisher hold the object in its own queue. Along with the object, an AtomicInteger counter is associated with it with the number of subscribers out there. Each subscriber then peek the queue and decrease the counter, and remove it from the queue when the counter reach zero.

以这种方式,发布者不受阻塞,但现在订阅者需要等待对方处理对象,从队列中删除对象对象可以被偷看。

In this way the publisher is free from blocking but now the subscriber will need to wait for each other to process the object, removing the object from the queue, before the next object can be peeked.

有更好的方法吗?我认为这应该是一个很常见的模式。

Is there any better way to do this? I assume this should be a quite common pattern.

推荐答案

发布者发布对象,则每个订阅者需要以正确的顺序处理每个对象一次并且仅一次。发布商和每个订阅者都在自己的主题中运行。

There is 1 publisher and N subscribers, the publisher publish objects then each subscriber need to process the each of the objects once and only once in the correct order. The publisher and each subscriber run in their own thread.

我将更改此架构。我最初考虑每个订户的队列,但我不喜欢这种机制。例如,如果第一个订阅者运行时间较长,则所有作业都将在该队列中结束,您只能执行1个工作线程。

I would change this architecture. I initially considered the queue per subscriber but I don't like that mechanism. For example, if the first subscriber takes longer to run, all of the jobs will end up in that queue and you will only be doing 1 thread of work.

必须按顺序运行订阅者,我会有一个线程池,它通过所有订阅者运行每条消息。对订阅者的调用需要是可重入的,这可能是不可能的。

Since you have to run the subscribers in order, I'd have a pool of threads which run each message through all of the subscribers. The calls to the subscribers will need to be reentrant which may not be possible.

所以你会有一个10个线程池(让我们说),每一个从发布商的队列,并执行类似以下内容:

So you would have a pool of 10 threads (let's say) and each one dequeues from the publisher's queue, and does something like the following:

public void run() {
    while (!shutdown && !Thread.currentThread().isInterrupted()) {
        Article article = publisherQueue.take();
        for (Subscriber subscriber : subscriberList) {
           subscriber.process(article);
        }
    }
}

这篇关于发布者/订阅者模式的并发实现的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆