Java中最快的循环同步是什么(ExecutorService与CyclicBarrier vs.X)? [英] What is the fastest cyclic synchronization in Java (ExecutorService vs. CyclicBarrier vs. X)?

查看:140
本文介绍了Java中最快的循环同步是什么(ExecutorService与CyclicBarrier vs.X)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

哪个Java同步构造可能为并发迭代处理场景提供最佳的
性能,并且具有
固定数量的线程,如下所示?在我自己尝试
一段时间后(使用ExecutorService和CyclicBarrier)和
对结果感到有些惊讶,我会感激一些
专家建议,也许还有一些新想法。这里的现有问题确实不是主要关注性能,因此这个新问题。
提前致谢!

Which Java synchronization construct is likely to provide the best performance for a concurrent, iterative processing scenario with a fixed number of threads like the one outlined below? After experimenting on my own for a while (using ExecutorService and CyclicBarrier) and being somewhat surprised by the results, I would be grateful for some expert advice and maybe some new ideas. Existing questions here do not seem to focus primarily on performance, hence this new one. Thanks in advance!

该应用程序的核心是一个简单的迭代数据处理算法,
并行化,以便在
a Mac Pro上分散8个核心的计算负载,运行OS X 10.6和Java 1.6.0_07。要处理的数据
被分成8个块,每个块被送到Runnable,由一个固定数量的线程执行
。并行化算法是
相当简单,它在功能上可以正常工作,但是它的性能还不是我认为它可能是
。该应用程序似乎花了很多时间在系统调用同步中花了很多时间,所以在一些
分析之后,我想知道我是否选择了最合适的
同步机制。

The core of the app is a simple iterative data processing algorithm, parallelized to the spread the computational load across 8 cores on a Mac Pro, running OS X 10.6 and Java 1.6.0_07. The data to be processed is split into 8 blocks and each block is fed to a Runnable to be executed by one of a fixed number of threads. Parallelizing the algorithm was fairly straightforward, and it functionally works as desired, but its performance is not yet what I think it could be. The app seems to spend a lot of time in system calls synchronizing, so after some profiling I wonder whether I selected the most appropriate synchronization mechanism(s).

该算法的一个关键要求是它需要在
阶段进行,因此线程需要在每个阶段结束时同步。
主线程准备工作(非常低的开销),将它传递给
线程,让它们处理它,然后在所有线程
完成时继续,重新安排工作(再次非常低开销)并重复周期
。该机器专用于此任务,垃圾收集
通过使用预分配项目的每个线程池最小化,并且
可以修复线程数量(没有传入请求等,
每个CPU核心只有一个线程。)

A key requirement of the algorithm is that it needs to proceed in stages, so the threads need to sync up at the end of each stage. The main thread prepares the work (very low overhead), passes it to the threads, lets them work on it, then proceeds when all threads are done, rearranges the work (again very low overhead) and repeats the cycle. The machine is dedicated to this task, Garbage Collection is minimized by using per-thread pools of pre-allocated items, and the number of threads can be fixed (no incoming requests or the like, just one thread per CPU core).

我的第一个实现使用了ExecutorService 8个工人
个线程。该程序创建8个任务来完成工作,然后
让他们对它进行处理,大致如下:

My first implementation used an ExecutorService with 8 worker threads. The program creates 8 tasks holding the work and then lets them work on it, roughly like this:

// create one thread per CPU
executorService = Executors.newFixedThreadPool( 8 );
...
// now process data in cycles
while( ...) {
    // package data into 8 work items
    ...

    // create one Callable task per work item
    ...

    // submit the Callables to the worker threads
    executorService.invokeAll( taskList );
}

这在功能上运作良好(它应该做到的),以及
非常大的工作项确实所有8个CPU都变得高负载,因为处理算法预计允许的价格高达
(一些
工作项将比其他工作项完成得更快,然后空闲)。但是,
随着工作项变得越来越小(并且这个程序的控制权并没有真正低于
),用户CPU负载急剧缩小:

This works well functionally (it does what it should), and for very large work items indeed all 8 CPUs become highly loaded, as much as the processing algorithm would be expected to allow (some work items will finish faster than others, then idle). However, as the work items become smaller (and this is not really under the program's control), the user CPU load shrinks dramatically:

blocksize | system | user | cycles/sec
256k        1.8%    85%     1.30
64k         2.5%    77%     5.6
16k         4%      64%     22.5
4096        8%      56%     86
1024       13%      38%     227
256        17%      19%     420
64         19%      17%     948
16         19%      13%     1626

图例:
- 块大小=工作项的大小(=计算步骤)
- system =系统加载,如OS X活动监视器所示(红色条)
- 用户=用户加载,如OS X活动监视器(绿色条)所示
- 周期/秒=通过主while循环迭代,越多越好

Legend: - block size = size of the work item (= computational steps) - system = system load, as shown in OS X Activity Monitor (red bar) - user = user load, as shown in OS X Activity Monitor (green bar) - cycles/sec = iterations through the main while loop, more is better

这里关注的主要问题是系统中花费
的时间百分比很高,这似乎是由线程同步
调用驱动的。正如预期的那样,对于较小的工作项,ExecutorService.invokeAll()
将需要相对更多的努力来同步线程
与每个线程中正在执行的工作量。但是
因为ExecutorService比这个用例需要
更通用(如果有比bn多b $ b的任务,它可以为线程排队任务),我可能会有更精简的
同步构造。

The primary area of concern here is the high percentage of time spent in the system, which appears to be driven by thread synchronization calls. As expected, for smaller work items, ExecutorService.invokeAll() will require relatively more effort to sync up the threads versus the amount of work being performed in each thread. But since ExecutorService is more generic than it would need to be for this use case (it can queue tasks for threads if there are more tasks than cores), I though maybe there would be a leaner synchronization construct.

下一个实现使用CyclicBarrier进行同步在收到工作之前和完成之后线程上涨

大致如下:

The next implementation used a CyclicBarrier to sync up the threads before receiving work and after completing it, roughly as follows:

main() {
    // create the barrier
    barrier = new CyclicBarrier( 8 + 1 );

    // create Runable for thread, tell it about the barrier
    Runnable task = new WorkerThreadRunnable( barrier );

    // start the threads
    for( int i = 0; i < 8; i++ )
    {
        // create one thread per core
        new Thread( task ).start();
    }

    while( ... ) {
        // tell threads about the work
        ...

        // N threads + this will call await(), then system proceeds
        barrier.await();

        // ... now worker threads work on the work...

        // wait for worker threads to finish
        barrier.await();
    }
}

class WorkerThreadRunnable implements Runnable {
    CyclicBarrier barrier;

    WorkerThreadRunnable( CyclicBarrier barrier ) { this.barrier = barrier; }

    public void run()
    {
        while( true )
        {
            // wait for work
            barrier.await();

            // do the work
            ...

            // wait for everyone else to finish
            barrier.await();
        }
    }
}

同样,这在功能上运作良好(它做它应该做的事情),
和非常大的工作项目确实所有8个CPU变得高度
加载,和以前一样。但是,随着工作项目变小,
负载仍然急剧缩小:

Again, this works well functionally (it does what it should), and for very large work items indeed all 8 CPUs become highly loaded, as before. However, as the work items become smaller, the load still shrinks dramatically:

blocksize | system | user | cycles/sec
256k        1.9%     85%    1.30
64k         2.7%     78%    6.1
16k         5.5%     52%    25
4096        9%       29%    64
1024       11%       15%    117
256        12%        8%    169
64         12%        6.5%  285
16         12%        6%    377

对于大型工作项,同步可忽略不计,
性能与V1相同。但出乎意料的是,(高度专业化)CyclicBarrier的结果
似乎比(通用)ExecutorService的
更多:吞吐量(周期/秒)
仅为V1的1/4左右。初步结论是
,即使这似乎是广告的理想使用
CyclicBarrier的情况,它的表现比
泛型ExecutorService差得多。

For large work items, synchronization is negligible and the performance is identical to V1. But unexpectedly, the results of the (highly specialized) CyclicBarrier seem MUCH WORSE than those for the (generic) ExecutorService: throughput (cycles/sec) is only about 1/4th of V1. A preliminary conclusion would be that even though this seems to be the advertised ideal use case for CyclicBarrier, it performs much worse than the generic ExecutorService.

似乎值得尝试用简单的等待替换第一个循环障碍await()
/ notify机制:

It seemed worth a try to replace the first cyclic barrier await() with a simple wait/notify mechanism:

main() {
    // create the barrier
    // create Runable for thread, tell it about the barrier
    // start the threads

    while( ... ) {
        // tell threads about the work
        // for each: workerThreadRunnable.setWorkItem( ... );

        // ... now worker threads work on the work...

        // wait for worker threads to finish
        barrier.await();
    }
}

class WorkerThreadRunnable implements Runnable {
    CyclicBarrier barrier;
    @NotNull volatile private Callable<Integer> workItem;

    WorkerThreadRunnable( CyclicBarrier barrier ) { this.barrier = barrier; this.workItem = NO_WORK; }

    final protected void
    setWorkItem( @NotNull final Callable<Integer> callable )
    {
        synchronized( this )
        {
            workItem = callable;
            notify();
        }
    }

    public void run()
    {
        while( true )
        {
            // wait for work
            while( true )
            {
                synchronized( this )
                {
                    if( workItem != NO_WORK ) break;

                    try
                    {
                        wait();
                    }
                    catch( InterruptedException e ) { e.printStackTrace(); }
                }
            }

            // do the work
            ...

            // wait for everyone else to finish
            barrier.await();
        }
    }
}

同样,这在功能上运作良好(它做它应该做的)。

Again, this works well functionally (it does what it should).

blocksize | system | user | cycles/sec
256k        1.9%     85%    1.30
64k         2.4%     80%    6.3
16k         4.6%     60%    30.1
4096        8.6%     41%    98.5
1024       12%       23%    202
256        14%       11.6%  299
64         14%       10.0%  518
16         14.8%      8.7%  679

小工作项的吞吐量仍然比ExecutorService的
差很多,但大约是CyclicBarrier的2倍。
消除一个CyclicBarrier消除了一半的差距。

The throughput for small work items is still much worse than that of the ExecutorService, but about 2x that of the CyclicBarrier. Eliminating one CyclicBarrier eliminates half of the gap.

由于这个应用程序是在系统上运行的主要应用程序,而且如果他们不忙于工作项,则
核心空闲,
为什么不尝试忙于等待每个线程中的工作项,即使
不必要地旋转CPU。工作线程代码更改
,如下所示:

Since this app is the primary one running on the system and the cores idle anyway if they're not busy with a work item, why not try a busy wait for work items in each thread, even if that spins the CPU needlessly. The worker thread code changes as follows:

class WorkerThreadRunnable implements Runnable {
    // as before

    final protected void
    setWorkItem( @NotNull final Callable<Integer> callable )
    {
        workItem = callable;
    }

    public void run()
    {
        while( true )
        {
            // busy-wait for work
            while( true )
            {
                if( workItem != NO_WORK ) break;
            }

            // do the work
            ...

            // wait for everyone else to finish
            barrier.await();
        }
    }
}

在功能上也运作良好(它它应该做什么)。

Also works well functionally (it does what it should).

blocksize | system | user | cycles/sec
256k        1.9%     85%    1.30
64k         2.2%     81%    6.3
16k         4.2%     62%     33
4096        7.5%     40%    107
1024       10.4%     23%    210
256        12.0%    12.0%   310
64         11.9%    10.2%   550
16         12.2%     8.6%   741

对于小型工作项,与CyclicBarrier +等待/通知变体相比,吞吐量进一步增加10美元b,这不是
无关紧要。但使用ExecutorService的吞吐量仍远低于V1

For small work items, this increases throughput by a further 10% over the CyclicBarrier + wait/notify variant, which is not insignificant. But it is still much lower-throughput than V1 with the ExecutorService.

那么对于这样一个
(可能不是罕见的)问题,最好的同步机制是什么?我厌倦了编写我的
自己的同步机制来完全替换ExecutorService
(假设它太通用了,必须有一些
,仍然可以取出来使它更有效) 。
这不是我的专业领域,我担心我会花很多时间来调试它(因为我甚至不确定
我的等待/通知和忙等待对于
不确定收益,变种是正确的。

So what is the best synchronization mechanism for such a (presumably not uncommon) problem? I am weary of writing my own sync mechanism to completely replace ExecutorService (assuming that it is too generic and there has to be something that can still be taken out to make it more efficient). It is not my area of expertise and I'm concerned that I'd spend a lot of time debugging it (since I'm not even sure my wait/notify and busy wait variants are correct) for uncertain gain.

任何建议都将不胜感激。

Any advice would be greatly appreciated.

推荐答案

看起来你似乎不需要工人之间的任何同步。也许你应该考虑使用Java 7中提供的ForkJoin框架,以及一个单独的库。一些链接:

It does seem that you do not need any synchronization between the workers. Maybe you should consider using the ForkJoin framework which is available in Java 7, as well as a separate library. Some links:

  • Tutorial at Oracle
  • Original paper by Doug Lea

这篇关于Java中最快的循环同步是什么(ExecutorService与CyclicBarrier vs.X)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆