Java并发:多生产者一个消费者 [英] java concurrency: multi-producer one-consumer

查看:311
本文介绍了Java并发:多生产者一个消费者的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我遇到的情况是,不同的线程填充一个队列(生产者),而一个消费者从该队列中检索元素.我的问题是,当从队列中检索这些元素之一时,某些元素会丢失(缺少信号?).生产者代码为:

I have a situation where different threads populate a queue (producers) and one consumer retrieve element from this queue. My problem is that when one of these elements are retrieved from the queue some is missed (missing signal?). The producers code is:

class Producer implements Runnable {

    private Consumer consumer;

    Producer(Consumer consumer) { this.consumer = consumer; }

    @Override
public void run() {
    consumer.send("message");
  }
}

它们是通过以下方式创建和运行的:

and they are created and run with:

ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 20; i++) {
  executor.execute(new Producer(consumer));
}

消费者代码为:

class Consumer implements Runnable {

private Queue<String> queue = new ConcurrentLinkedQueue<String>();

void send(String message) {
    synchronized (queue) {
        queue.add(message);
        System.out.println("SIZE: " + queue.size());
        queue.notify();
    }
}

@Override
public void run() {
    int counter = 0;
    synchronized (queue) {
    while(true) {
        try {
            System.out.println("SLEEP");
                queue.wait(10);
        } catch (InterruptedException e) {
                Thread.interrupted();
        }
        System.out.println(counter);
        if (!queue.isEmpty()) {             
            queue.poll();
            counter++;
        }
    }
    }
}

}

运行代码时,有时会添加20个元素并检索20个元素,但在其他情况下,检索到的元素少于20个.您知道如何解决该问题吗?

When the code is run I get sometimes 20 elements added and 20 retrieved, but in other cases the elements retrieved are less than 20. Any idea how to fix that?

推荐答案

我建议您使用BlockingQueue而不是Queue. LinkedBlockingDeque可能是您的理想选择.

I'd suggest you use a BlockingQueue instead of a Queue. A LinkedBlockingDeque might be a good candidate for you.

您的代码如下:

void send(String message) {
    synchronized (queue) {
        queue.put(message);
        System.out.println("SIZE: " + queue.size());
    }
}

然后您只需要

queue.take()

在您的使用者线程上

这个想法是.take()将阻塞,直到队列中有可用的项,然后精确返回一个(这是我认为您的实现遭受的问题:轮询时缺少通知). .put()负责为您执行所有通知.无需等待/通知.

The idea is that .take() will block until an item is available in the queue and then return exactly one (which is where I think your implementation suffers: missing notification while polling). .put() is responsible for doing all the notifications for you. No wait/notifies needed.

这篇关于Java并发:多生产者一个消费者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆