关于生产者和消费者模式在java与阻塞队列方法 [英] Regarding producers and consumer pattern in java with blocking queue approach

查看:123
本文介绍了关于生产者和消费者模式在java与阻塞队列方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在做一个关于生产者和消费者设计模式的研究,关于java中的线程,我最近在java 5中引入了在Java 5中BlockingQueue数据结构的介绍现在简单得多,因为BlockingQueue隐式地提供了这个控制引入阻塞方法put()和take()。现在,您不需要使用wait和notify在生产者和消费者之间进行通信。 BlockingQueue put()方法将阻塞如果队列在有界队列的情况下是满的,如果队列为空,take()将阻塞。在下一节中,我们将看到生产者消费者设计模式的代码示例。我已经开发了以下程序,但也请让我知道waut()和notify()的旧风格方法,我想用旧风格方法开发相同的逻辑

I was doing a research in producers and consumer design patterns with regards to threads in java, I recently explored in java 5 with the introduction With introduction of BlockingQueue Data Structure in Java 5 Its now much simpler because BlockingQueue provides this control implicitly by introducing blocking methods put() and take(). Now you don't require to use wait and notify to communicate between Producer and Consumer. BlockingQueue put() method will block if Queue is full in case of Bounded Queue and take() will block if Queue is empty. In next section we will see a code example of Producer Consumer design pattern. I have developed the below program but please also let me know the old style approach of waut() and notify() , I want to develop the same logic with old style approach also

请告知如何实现这一点,经典的方法是使用wait()和notify()方法在生产者和消费者线程之间进行通信,并在单个条件下阻塞它们,如满队列和空队列...? / p>

Folks please advise how this can be implemented in , classical way is using wait() and notify() method to communicate between Producer and Consumer thread and blocking each of them on individual condition like full queue and empty queue...?

    import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class ProducerConsumerPattern {

    public static void main(String args[]){

     //Creating shared object
     BlockingQueue sharedQueue = new LinkedBlockingQueue();

     //Creating Producer and Consumer Thread
     Thread prodThread = new Thread(new Producer(sharedQueue));
     Thread consThread = new Thread(new Consumer(sharedQueue));

     //Starting producer and Consumer thread
     prodThread.start();
     consThread.start();
    }

}

//Producer Class in java
class Producer implements Runnable {

    private final BlockingQueue sharedQueue;

    public Producer(BlockingQueue sharedQueue) {
        this.sharedQueue = sharedQueue;
    }

    @Override
    public void run() {
        for(int i=0; i<10; i++){
            try {
                System.out.println("Produced: " + i);
                sharedQueue.put(i);
            } catch (InterruptedException ex) {
                Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }

}

//Consumer Class in Java
class Consumer implements Runnable{

    private final BlockingQueue sharedQueue;

    public Consumer (BlockingQueue sharedQueue) {
        this.sharedQueue = sharedQueue;
    }

    @Override
    public void run() {
        while(true){
            try {
                System.out.println("Consumed: "+ sharedQueue.take());
            } catch (InterruptedException ex) {
                Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }


}

Output:
Produced: 0
Produced: 1
Consumed: 0
Produced: 2
Consumed: 1
Produced: 3
Consumed: 2
Produced: 4
Consumed: 3
Produced: 5
Consumed: 4
Produced: 6
Consumed: 5
Produced: 7
Consumed: 6
Produced: 8
Consumed: 7
Produced: 9
Consumed: 8
Consumed: 9


推荐答案

我可以在我的睡眠中做这个等待通知的东西(或至少我想我可以)。 Java 1.4源提供了所有这些的美丽的例子,但他们已经转换到做一切与原子,现在是更复杂得多。等待通知确实提供了灵活性和强大的功能,尽管其他方法可以屏蔽并发性的危险,并使更简单的代码。

I can do this wait-notify stuff in my sleep (or at least I think I can). Java 1.4 source provided beautiful examples of all this, but they've switched to doing everything with atomics and it's a lot more complicated now. The wait-notify does provide flexibility and power, though the other methods can shield you from the dangers of concurrency and make for simpler code.

为了做到这一点,字段,如下:

To do this, you want some fields, like so:

private final ConcurrentLinkedQueue<Intger>  sharedQueue =
                                                    new ConcurrentLinkedQueue<>();
private volatile   boolean  waitFlag = true;

您的Producer.run将如下所示:

Your Producer.run would look like this:

public void run()  {
    for (int i = 0; i < 100000, i++)  {
        System.out.println( "Produced: " + i );
        sharedQueue.add( new Integer( i ) );
        if (waitFlag)       // volatile access is cheaper than synch.
            synchronized (sharedQueue)  { sharedQueue.notifyAll(); }
    }
}

And Consumer.run:

And Consumer.run:

public void run()  {
    waitFlag = false;
    for (;;)  {
        Integer  ic = sharedQueue.poll();
        if (ic == null)  {
            synchronized (sharedQueue)  {
                waitFlag = true;
                // An add might have come through before waitFlag was set.
                ic = sharedQueue.poll();
                if (ic == null)  {
                    try  { sharedQueue.wait(); }
                    catch (InterruptedException ex)  {}
                    waitFlag = false;
                    continue;
                }
                waitFlag = true;
            }
        }
        System.out.println( "Consumed: " + ic );
    }
}

这会保持同步到最小。如果一切顺利,每个添加只有一个可变字段。你应该能够同时运行任意数量的生产者。 (消费者会更棘手 - 你必须放弃 waitFlag 。)你可以使用不同的对象等待/ notifyAll。

This keeps synchronizing to a minimum. If all goes well, there's only one look at a volatile field per add. You should be able to run any number of producers simultaneously. (Consumer's would be trickier--you'd have to give up waitFlag.) You could use a different object for wait/notifyAll.

这篇关于关于生产者和消费者模式在java与阻塞队列方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆