使用队列的生产者/消费者线程 [英] Producer/Consumer threads using a Queue

查看:155
本文介绍了使用队列的生产者/消费者线程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想创建某种生产者/消费者线程应用。但我不确定在两者之间实现队列的最佳方法是什么。

I'd like to create some sort of Producer/Consumer threading app. But I'm not sure what the best way to implement a queue between the two.

所以我有两个想法(两者都可能是完全错误的)。我想知道哪个更好,如果它们都吮吸那么什么是实现队列的最佳方式。这主要是我在这些例子中对队列的实现,我很关心。我正在扩展一个内部类的Queue类,并且是线程安全的。以下是两个示例,每个示例包含4个类。

So I've some up with two ideas (both of which could be entirely wrong). I would like to know which would be better and if they both suck then what would be the best way to implement the queue. It's mainly my implementation of the queue in these examples that I'm concerned about. I'm extending a Queue class that is an in house class and is thread safe. Below are two examples with 4 classes each.

主类 -

public class SomeApp
{
    private Consumer consumer;
    private Producer producer;

    public static void main (String args[])
    {
        consumer = new Consumer();
        producer = new Producer();
    }
} 

消费者类 -

public class Consumer implements Runnable
{
    public Consumer()
    {
        Thread consumer = new Thread(this);
        consumer.start();
    }

    public void run()
    {
        while(true)
        {
            //get an object off the queue
            Object object = QueueHandler.dequeue();
            //do some stuff with the object
        }
    }
}

生产者类 -

public class Producer implements Runnable
{
    public Producer()
    {
        Thread producer = new Thread(this);
        producer.start();
    }

    public void run()
    {
        while(true)
        {
            //add to the queue some sort of unique object
            QueueHandler.enqueue(new Object());
        }
    }
}

队列类 -

public class QueueHandler
{
    //This Queue class is a thread safe (written in house) class
    public static Queue<Object> readQ = new Queue<Object>(100);

    public static void enqueue(Object object)
    {
        //do some stuff
        readQ.add(object);
    }

    public static Object dequeue()
    {
        //do some stuff
        return readQ.get();
    }
}

OR

主类 -

public class SomeApp
{
    Queue<Object> readQ;
    private Consumer consumer;
    private Producer producer;

    public static void main (String args[])
    {
        readQ = new Queue<Object>(100);
        consumer = new Consumer(readQ);
        producer = new Producer(readQ);
    }
} 

消费者类 -

public class Consumer implements Runnable
{
    Queue<Object> queue;

    public Consumer(Queue<Object> readQ)
    {
        queue = readQ;
        Thread consumer = new Thread(this);
        consumer.start();
    }

    public void run()
    {
        while(true)
        {
            //get an object off the queue
            Object object = queue.dequeue();
            //do some stuff with the object
        }
    }
}

生产者类 -

public class Producer implements Runnable
{
    Queue<Object> queue;

    public Producer(Queue<Object> readQ)
    {
        queue = readQ;
        Thread producer = new Thread(this);
        producer.start();
    }

    public void run()
    {

        while(true)
        {
            //add to the queue some sort of unique object
            queue.enqueue(new Object());
        }
    }
}

队列类 -

//the extended Queue class is a thread safe (written in house) class
public class QueueHandler extends Queue<Object>
{    
    public QueueHandler(int size)
    {
        super(size); //All I'm thinking about now is McDonalds.
    }

    public void enqueue(Object object)
    {
        //do some stuff
        readQ.add();
    }

    public Object dequeue()
    {
        //do some stuff
        return readQ.get();
    }
}

去吧!

推荐答案

Java 5+拥有此类所需的所有工具。您需要:

Java 5+ has all the tools you need for this kind of thing. You will want to:


  1. 将所有制作人放在一个 ExecutorService ;

  2. 将所有消费者放入另一个消费者 ExecutorService ;

  3. 如有必要,使用 BlockingQueue

  1. Put all your Producers in one ExecutorService;
  2. Put all your Consumers in another ExecutorService;
  3. If necessary, communicate between the two using a BlockingQueue.

我说(如果有必要)(3)因为根据我的经验,这是一个不必要的步骤。您所做的就是向消费者执行者服务提交新任务。所以:

I say "if necessary" for (3) because from my experience it's an unnecessary step. All you do is submit new tasks to the consumer executor service. So:

final ExecutorService producers = Executors.newFixedThreadPool(100);
final ExecutorService consumers = Executors.newFixedThreadPool(100);
while (/* has more work */) {
  producers.submit(...);
}
producers.shutdown();
producers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
consumers.shutdown();
consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);

所以生产者直接提交给消费者

这篇关于使用队列的生产者/消费者线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆