如何以多线程方式调用不同类的相同方法 [英] How to call same method of a different class in multithreaded way

查看:107
本文介绍了如何以多线程方式调用不同类的相同方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在两个类中都有一个名为process的方法,可以说CLASS-A and CLASS-B.现在,在下面的循环中,我依次调用两个类的process method,意思是一个接一个,并且工作正常,但这不是我想要的方式.

I have a method named process in two of my Classes, lets say CLASS-A and CLASS-B. Now in the below loop, I am calling process method of both of my classes sequentially meaning one by one and it works fine but that is the not the way I am looking for.

for (ModuleRegistration.ModulesHolderEntry entry : ModuleRegistration.getInstance()) {
    final Map<String, String> response = entry.getPlugin().process(outputs);

    // write to database
    System.out.println(response);
}

有什么办法,我可以用多线程方式调用我两个类的处理方法.意思是一个线程将调用CLASS-A的处理方法,第二个线程将调用CLASS-B的处理方法.

Is there any way, I can call the process method of both of my classes in a multithreaded way. Meaning one thread will call process method of CLASS-A and second thread will call process method of CLASS-B.

然后在那之后,我考虑将process方法返回的数据写入数据库.因此,我可以再有一个线程来写入数据库.

And then after that I was thinking to write the data that is being returned by the process method into the database. So I can have one more thread for writing into database.

下面是我以多线程方式提出的代码,但是某种程度上它根本没有运行.

Below is the code that I came up with in a multithreaded way but somehow it is not running at all.

public void writeEvents(final Map<String, Object> data) {

    // Three threads: one thread for the database writer, two threads for the plugin processors
    final ExecutorService executor = Executors.newFixedThreadPool(3);

    final BlockingQueue<Map<String, String>> queue = new LinkedBlockingQueue<Map<String, String>>();

    @SuppressWarnings("unchecked")
    final Map<String, String> outputs = (Map<String, String>)data.get(ModelConstants.EVENT_HOLDER);

    for (final ModuleRegistration.ModulesHolderEntry entry : ModuleRegistration.getInstance()) {
        executor.submit(new Runnable () {
            public void run() {
                final Map<String, String> response = entry.getPlugin().process(outputs);
                // put the response map in the queue for the database to read
                queue.offer(response);
            }
        });
    }

    Future<?> future = executor.submit(new Runnable () {
        public void run() {
            Map<String, String> map;
            try {
                while(true) {
                    // blocks until a map is available in the queue, or until interrupted
                    map = queue.take();
                    // write map to database
                    System.out.println(map);
                }
            } catch (InterruptedException ex) {
                // IF we're catching InterruptedException then this means that future.cancel(true)
                // was called, which means that the plugin processors are finished;
                // process the rest of the queue and then exit
                while((map = queue.poll()) != null) {
                    // write map to database
                    System.out.println(map);
                }
            }
        }
    });

    // this interrupts the database thread, which sends it into its catch block
    // where it processes the rest of the queue and exits
    future.cancel(true); // interrupt database thread

    // wait for the threads to finish
    try {
        executor.awaitTermination(5, TimeUnit.MINUTES);
    } catch (InterruptedException e) {
        //log error here
    }
}

但是,如果我删除了最后一行executor.awaitTermination(5, TimeUnit.MINUTES);,那么它开始运行良好,并且一段时间后,我总是会收到这样的错误-

But If I remove the last line executor.awaitTermination(5, TimeUnit.MINUTES); then it start running fine and after some time, I always get error like this-

JVMDUMP006I Processing dump event "systhrow", detail "java/lang/OutOfMemoryError" - please wait.
JVMDUMP032I JVM requested Heap dump using 'S:\GitViews\Stream\goldseye\heapdump.20130827.142415.16456.0001.phd' in response to an event
JVMDUMP010I Heap dump written to S:\GitViews\Stream\goldseye\heapdump.20130827.142415.16456.0001.phd
JVMDUMP006I Processing dump event "systhrow", detail "java/lang/OutOfMemoryError" - please wait.

有人可以帮我弄清楚我在上面的代码中存在的问题和错误吗?如果我按顺序运行,则不会出现任何错误,并且可以正常工作.

Can anybody help me in figuring out what's the problem and what wrong I am doing in my above code? if I am running sequentially then I don't get any errors and it works fine.

而且与我的方法相比,还有其他更好的方法吗?因为将来我可以拥有多个插件处理器,而不是两个.

我想做的是-以多线程方式调用我两个类的处理方法,然后将其写入数据库bcoz,我的处理方法将返回Map.

What I am trying to do is- Call the process method of both of my classes in a multithreaded way and then write into the database bcoz my process method will return back a Map.

在此方面,我们将为您提供帮助.并且,如果可能,我正在寻找可行的示例.感谢您的帮助,

Any help will be appreciated on this.. And I am looking for a workable example on this if possible. Thanks for the help,

推荐答案

您粘贴的代码段几乎没有问题,如果您对其进行修复,则应该可以.
1.您正在使用一个无限循环从阻塞队列中获取元素,并尝试使用Future打破它.这绝对不是一个好方法.这种方法的问题是您的数据库线程可能永远不会运行,因为调用者线程中运行的将来任务甚至可能在运行之前取消该线程.这容易出错.
-您应该以固定次数运行while循环(您已经知道那里有多少个生产者,或者您将获得响应的次数).

The code snippet you pasted has few issues, if you fix them, this should work.
1. You are using an infinite loop to fetch element from the blocking queue and trying to break this using future. This is definitely not a good approach. The problem with this approach is it is possible that your database thread would never run because it could be cancelled by the future task running in the caller thread even before it runs. This is error-prone.
- You should run the while loop fixed number of times (you already know how many producers are there or how many times you are going to get the response).

  1. 此外,提交给执行者服务的任务应该是独立的任务...这里的数据库任务取决于其他任务的执行.如果执行策略发生更改,这也可能导致死锁.例如,如果您使用单线程池执行程序,并且如果计划了数据库线程,它将阻塞等待生产者将数据添加到队列中.

  1. Also, tasks submitted to executor service should be independent tasks...here your database task is dependent on the execution of other tasks..this can also lead to deadlock if your execution policy changes..for example if you use single thread pool executor and if database thread is scheduled it would just block waiting for producers to add data in the queue.

  • 一个好方法是创建一个任务,该任务在同一线程中检索数据并更新数据库.
  • 或者首先检索所有响应,然后并行执行数据库操作

public void writeEvents(最终地图数据){

public void writeEvents(final Map data) {

final ExecutorService executor = Executors.newFixedThreadPool(3);       
@SuppressWarnings("unchecked")
final Map<String, String> outputs = (Map<String, String>)data.get(ModelConstants.EVENT_HOLDER);

for (final ModuleRegistration.ModulesHolderEntry entry : ModuleRegistration.getInstance()) {
    executor.submit(new Runnable () {
        public void run() {
            try {
                final Map<String, String> response = entry.getPlugin().process(outputs);
                //process the response and update database.
                System.out.println(map);
            } catch (Throwable e) {
                //handle execption
            } finally {
                //clean up resources
            }               
        }
    });
}

//这将等待正在运行的线程完成..这是有序的关闭. executor.shutdown(); }

// This will wait for running threads to complete ..it's an orderly shutdown. executor.shutdown(); }

这篇关于如何以多线程方式调用不同类的相同方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆