生产者消费者 - ExecutorService& ArrayBlockingQueue [英] producer consumer - ExecutorService & ArrayBlockingQueue

查看:371
本文介绍了生产者消费者 - ExecutorService& ArrayBlockingQueue的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想知道如果我对生产者消费者设计的理解是正确的,使用ExecutorService& ArrayBlockingQueue。我知道有不同的方式来实现这个设计,但我想,最后,这取决于问题本身。

I wish to know if my understanding of the producer consumer design is correct by using the ExecutorService & ArrayBlockingQueue. I understand there are different ways to implement this design but I guess, at the end, it depends on the problem itself.

我不得不面对的问题是:我有一个ONE生产者从一个大文件(6GB)读取;它逐行读取并将每行转换为一个对象。它把对象放在一个ArrayBlockingQueue中。

The problem I had to confront is this: I have a ONE producer who reads from a big file (6GB); it reads line by line and converts every line to an object. It places the object in an ArrayBlockingQueue.

消费者(几个)从ArrayBlockingQueue中获取对象,并将其持久化到数据库中。

The consumers (few) take the object from the ArrayBlockingQueue and persist this to the database.

现在,显然生产者比消费者快得多;它需要几秒钟才能将每行转换为一个对象,但对于消费者来说需要更长的时间。

Now, obviously the producer is much faster than the consumer; it takes fractions of seconds to convert each line to an object but for the consumers it takes longer time.

所以...如果我想通过这样做加快这个过程:我创建了2个类ProducerThread和ConsumerThread,他们共享ArrayBlockingQueue。在2之间协调的线程如下:

So...if I wish to speedup this process by doing this: I created 2 classed 'ProducerThread' and 'ConsumerThread' they share the ArrayBlockingQueue. The Thread that coordinate between the 2 looks like this:

@Override
public void run()
{
    try{

        ArrayBlockingQueue<Ticket> queue = new ArrayBlockingQueue<Ticket>(40);
        ExecutorService threadPool = Executors.newFixedThreadPool(8);

        threadPool.execute(new SaleConsumerThread("NEW YORK", queue)); 
        threadPool.execute(new SaleConsumerThread("PARIS", queue));
        threadPool.execute(new SaleConsumerThread("TEL AVIV", queue));
        threadPool.execute(new SaleConsumerThread("HONG KONG", queue));
        threadPool.execute(new SaleConsumerThread("LONDON", queue));
        threadPool.execute(new SaleConsumerThread("BERLIN", queue));
        threadPool.execute(new SaleConsumerThread("AMSTERDAM", queue));

        Future producerStatus = threadPool.submit(new SaleProducerThread(progressBar, file, queue)); 
        producerStatus.get(); 
        threadPool.shutdown();   

    }catch(Exception exp)
    {
        exp.printStackTrace();
    }
}

我的问题是:


  1. 上述设计实际上是否同时使用每个线程?我的电脑是两个2.4GHz四核。

  1. Would the design above actually use each thread concurrently? My computer is Two 2.4GHz Quad-Core.

我不确定Future和.get()的用途是什么?

I'm not sure what does the Future and the .get() are for?



    顺便说一句,结果是快的(考虑第一个版本是顺序的,花了3小时)现在需要〜40分钟(但也许有改进的余地)

The result, by the way, are fast (consider the first version was sequential and it took 3hr) now it takes ~40 min (but maybe there're room for improvement).

感谢任何指针

推荐答案

花费大量时间等待IO以及在CPU中花费了多少时间。我怀疑你的主要瓶颈是数据库,你需要看看如何使导入更有效率。您可以尝试批量更新,因为这可以提高吞吐量。

I would look at how much time is spent waiting for IO and how much time is spent in CPU. I suspect your main bottleneck is the database and you need to look at how you can make the import more efficient. You could try batching your updates as this can improve throughput.

这篇关于生产者消费者 - ExecutorService&amp; ArrayBlockingQueue的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆