GPars如何知道抛出异常时所有线程都已完成? [英] How know with GPars that all threads have finished when an exception is thrown?

查看:195
本文介绍了GPars如何知道抛出异常时所有线程都已完成?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在线程抛出异常的情况下,如何等待所有未抛出异常的线程完成(因此用户在所有内容停止前都不会再次启动)?

我以几种不同的方式使用GPars,所以我需要为每个(并行集合,异步闭包和fork / join)制定一个策略。例外情况没有被埋没,他们很好地通过承诺,getChildrenResults等处理,所以这不是问题(感谢Vaclav Pech的回答)。我只需要确保主线程等待,直到任何正在运行的完成或以其他方式停止。



例如,当使用并行集合时,一些线程继续运行,而有些线程在异常之后永远不会启动。因此,很难说出有多少人在等待,或者有可能得到他们。



我的猜测是,也许有一种方法可以与线程池(在这种情况下为GParsPool)。任何建议?



谢谢!

解决方案

我相信我有这个问题的解决方案,我在彻底的测试后在应用程序中实现了它,并且工作正常。

withPool闭包作为第一个被创建的池(一个jsr166y.ForkJoinPool)传递论据。我可以抓住它并将其存储在一个变量( currentPool )中,稍后由主线程使用,如下所示:

  GParsPool.withPool {pool  - > 
currentPool = pool

当引发异常并返回主线程为了处理,我可以让它等到所有事情都完成了,像这样:

 } catch(Exception exc){
if(currentPool){
while(!currentPool.isQuiescent()){
Thread.sleep(100)
println'等待线程完成'
}


println'all done'
}

isQuiescent()似乎是一种安全的方法,以确保没有更多的工作要做。

注意,在测试期间,我还发现异常似乎没有像我原先想象的那样终止循环的执行。如果我有一个500的列表,并且做了eachParallel,他们都会跑,不管第一个是否有错误。所以我必须通过在并行循环的异常处理程序中使用currentPool.shutdownNow()来终止循环。另请参阅: GPars - 尽早终止平行集合的正确方法

以下是实际解决方案的完整简化表示:

 <$ c 
jsr166y.ForkJoinPool currentPool

AtomicInteger threadCounter = new AtomicInteger(0)
AtomicInteger threadCounterEnd = new AtomicInteger(0)

AtomicReference<例外> realException = new AtomicReference< Exception>()

try {
GParsPool.withPool {pool - >
currentPool = pool

(1..500).eachParallel {
try {
if(threadCounter.incrementAndGet()== 1){
抛出新的RuntimeException('planet blaw up!')
}

if(realException.get()!= null){
//我们在这个eachParallel - 早退出
返回
}

//做一些长时间工作
整数计数器= 0
(1..1000000).each(){计数器++}

//标记是否通过
完成threadCounterEnd.incrementAndGet()
} catch(Exception exc){
realException.compareAndSet(null, exc)

pool.shutdownNow()
throw realException
}
}

} catch(Exception exc){
//如果我们使用了pool.shutdownNow(),我们需要查看真正的异常。
//这是需要的,因为pool.shutdownNow()有时会生成一个CancellationException
//它可以掩盖导致我们执行shutdownNow()的真正异常。 $(b)if(realException.get()){
exc = realException.get()
}

if(currentPool){
while(!currentPool .isQuiescent()){
Thread.sleep(100)
println'等待线程完成'
}
}

//进一步这里的异常处理...
exc.printStackTrace()
}
}

回到我之前的例子,如果我第一次在4核机器上抛出异常,大约有5个线程排队。 shutdownNow()会在大约20个左右的线程通过后关闭,所以在顶端附近进行'quit'检查可以帮助那些20个左右的人尽快退出。



只要将它张贴在这里,以便帮助其他人,作为对我在这里得到的所有帮助的回报。谢谢!


In the case of a thread throwing an exception, how can I wait until all threads that did not throw an exception have finished (so the user doesn't launch again until everything has stopped)?

I use GPars in several different ways, so I need a strategy for each (parallel collections, async closures, and fork/join). The exceptions are not getting buried, they are nicely handled via promises, getChildrenResults, etc., so that's not an issue (thanks to Vaclav Pech's answers). I just need to make sure that the main thread waits until anything that was still running gets to complete or is otherwise stopped.

For instance, when using parallel collections, some threads continue to run, while some never launch after the exception. So it's not easy to tell how many are out there to wait on, or to get a hold of them possibly.

My guess is maybe there's a way to work with the Thread pool (GParsPool in this case). Any suggestions?

Thanks!

解决方案

I believe I have a solution for the problem, I implemented it in the application after thorough testing and it works.

The withPool closure passes in the created pool (a jsr166y.ForkJoinPool) as the first argument. I can grab that and store it off in a variable (currentPool), to be used later by the main thread, like so:

    GParsPool.withPool { pool ->
        currentPool = pool

When an exception is thrown, and goes back up to the main thread for handling, I can make it wait until everything is finished, something like this:

    } catch (Exception exc) {
        if (currentPool) {
            while (!currentPool.isQuiescent()) {
                Thread.sleep(100)
                println 'waiting for threads to finish'
            }
        }

        println 'all done'
    }

The isQuiescent() seems to be a safe way to make sure there's no more work to be done.

Note that during testing, I also found that exceptions didn't seem to terminate execution of the loop as I originally thought. If I had a list of 500 and did an eachParallel, they all ran regardless if the 1st one through had an error. So I had to terminate the loop by using currentPool.shutdownNow() inside the parallel loop's exception handler. See also: GPars - proper way to terminate a parallel collection early

Here is a complete simplified representation of the actual solution:

void example() {
    jsr166y.ForkJoinPool currentPool

    AtomicInteger threadCounter = new AtomicInteger(0)
    AtomicInteger threadCounterEnd = new AtomicInteger(0)

    AtomicReference<Exception> realException = new AtomicReference<Exception>()

    try {
        GParsPool.withPool { pool ->
            currentPool = pool

            (1..500).eachParallel {
                try {
                    if (threadCounter.incrementAndGet() == 1) {
                        throw new RuntimeException('planet blew up!')
                    }

                    if (realException.get() != null) {
                        // We had an exception already in this eachParallel - quit early
                        return
                    }

                    // Do some long work
                    Integer counter=0
                    (1..1000000).each() {counter++}

                    // Flag if we went all the way through
                    threadCounterEnd.incrementAndGet()
                } catch (Exception exc) {
                    realException.compareAndSet(null, exc)

                    pool.shutdownNow()
                    throw realException
                }
            }
        }
    } catch (Exception exc) {
        // If we used pool.shutdownNow(), we need to look at the real exception.
        // This is needed because pool.shutdownNow() sometimes generates a CancellationException
        // which can cover up the real exception that caused us to do a shutdownNow().
        if (realException.get()) {
            exc = realException.get()
        }

        if (currentPool) {
            while (!currentPool.isQuiescent()) {
                Thread.sleep(100)
                println 'waiting for threads to finish'
            }
        }

        // Do further exception handling here...
        exc.printStackTrace()
    }
}

Going back to my earlier example, if I threw an exception on the 1st time through on a 4-core machine, there were about 5 threads queued up. The shutdownNow() would cut things off after around 20 or so threads had gotten through, so having the 'quit early' check near the top helped those 20 or so quit as soon as possible.

Just posting it here in case it helps somebody else, in return for all the help I've gotten here. Thanks!

这篇关于GPars如何知道抛出异常时所有线程都已完成?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆