ReforsiveTask执行ForkJoin时抛出StackOverflowError [英] RecursiveTask Throwing StackOverflowError While executing ForkJoin

查看:142
本文介绍了ReforsiveTask执行ForkJoin时抛出StackOverflowError的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我设计了 RecursiveTask

这是我设计的任务的代码。

Here is the code for task I designed.

public class SearchTask extends RecursiveTask<Map<Short, Long>> {

private static final long serialVersionUID = 1L;
private int majorDataThreshold = 16001;
private ConcurrentNavigableMap<Short, Long> dataMap;
private long fromRange;
private long toRange;
private boolean fromInclusive;
private boolean toInclusive;

public SearchTask(final Map<Short, Long> dataSource, final long fromRange, final long toRange,
        final boolean fromInclusive, final boolean toInclusive) {
    this.dataMap = new ConcurrentSkipListMap<>(dataSource);
    this.fromRange = fromRange;
    this.toRange = toRange;
    this.fromInclusive = fromInclusive;
    this.toInclusive = toInclusive;
}

@Override
protected Map<Short, Long> compute() {
    final int size = dataMap.size();
    // This is not a perfect RecursiveTask, because the if condition is designed to overcome a stackoverflow error when map filled with 32k data
    if (size > majorDataThreshold+1000) {
        // List<SearchTask> tasks = createSubtasks();
        // tasks.get(0).fork();
        // tasks.get(1).fork();

        // Map<Short, Long> map = new ConcurrentHashMap<>(tasks.get(0).join());
        // map.putAll(tasks.get(1).join());
        // return map;

        return ForkJoinTask.invokeAll(createSubtasks()).stream().map(ForkJoinTask::join)
                .flatMap(map -> map.entrySet().stream())
                .collect(Collectors.toConcurrentMap(Entry::getKey, Entry::getValue));
    }
    return search();
}

private List<SearchTask> createSubtasks() {
    final short lastKey = dataMap.lastKey();
    final short midkey = (short) (lastKey / 2);
    final short firstKey = dataMap.firstKey();
    final List<SearchTask> dividedTasks = new ArrayList<>();
    dividedTasks.add(
            new SearchTask(new ConcurrentSkipListMap<Short, Long>(dataMap.subMap(firstKey, true, midkey, false)),
                    fromRange, toRange, fromInclusive, toInclusive));
    dividedTasks
            .add(new SearchTask(new ConcurrentSkipListMap<Short, Long>(dataMap.subMap(midkey, true, lastKey, true)),
                    fromRange, toRange, fromInclusive, toInclusive));
    return dividedTasks;
}

private Map<Short, Long> search() {
    final Map<Short, Long> result = dataMap.entrySet().stream()
            .filter(serchPredicate(fromRange, toRange, fromInclusive, toInclusive))
            .collect(Collectors.toConcurrentMap(p -> p.getKey(), p -> p.getValue()));
    return result;
}

private static Predicate<? super Entry<Short, Long>> serchPredicate(final long fromValue, final long toValue,
        final boolean fromInclusive, final boolean toInclusive) {
    if (fromInclusive && !toInclusive)
        return p -> (p.getValue() >= fromValue && p.getValue() < toValue);
    else if (!fromInclusive && toInclusive)
        return p -> (p.getValue() > fromValue && p.getValue() <= toValue);
    else if (fromInclusive && toInclusive)
        return p -> (p.getValue() >= fromValue && p.getValue() <= toValue);
    else
        return p -> (p.getValue() > fromValue && p.getValue() < toValue);
}

此任务处理的最大数据为32000(32k)

Maximum data handled this task is 32000 (32k)

在代码中,如果通过阈值,我将拆分任务

In the code I'm splitting up tasks if it pass a Threshold

 if (size > majorDataThreshold)

当我尝试将majorDataThreshold减小到小于16001的值时,出现错误

When I try to reduce the majorDataThreshold less than 16001 value I'm getting an error

堆栈跟踪

at java.util.concurrent.RecursiveTask.exec(Unknown Source)
at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.util.concurrent.ForkJoinPool.helpStealer(Unknown Source)
at java.util.concurrent.ForkJoinPool.awaitJoin(Unknown Source)
at java.util.concurrent.ForkJoinTask.doJoin(Unknown Source)
at java.util.concurrent.ForkJoinTask.invokeAll(Unknown Source)
at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:52)
at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:1)
...........................Same trace
at java.util.concurrent.ForkJoinTask.invokeAll(Unknown Source)
at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:52)
Caused by: java.lang.StackOverflowError
    ... 1024 more
Caused by: java.lang.StackOverflowError
    ... 1024 more
    .................Same trace
Caused by: java.lang.StackOverflowError
    at java.util.Collection.stream(Unknown Source)
    at com.ed.search.framework.forkjoin.SearchTask.search(SearchTask.java:74)
    at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:56)
    at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:1)
    at java.util.concurrent.RecursiveTask.exec(Unknown Source)
    at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
    at java.util.concurrent.ForkJoinTask.doInvoke(Unknown Source)
    at java.util.concurrent.ForkJoinTask.invokeAll(Unknown Source)
    at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:52)
    at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:1)

要解决这个问题,我尝试使用

To Solve this I tried to use

Collectors.toMap()
ConcurrentHashMap
Join Manually

问题仍然没有解决

有人可以帮助我在我的 RecursiveTask 任务中找到问题所在。

Could some one help me to find what is wrong in my RecursiveTasktask.

单元测试代码

public class Container32kUniqueDataTest {

private ForkJoinRangeContainer forkJoinContianer;

@Before
public void setUp(){
    long[] data = genrateTestData();
    forkJoinContianer = new ForkJoinRangeContainer(data)
}

private long[] genrateTestData(){
    long[] data= new long[32000];
    for (int i = 0; i < 32000; i++) {
        data[i]=i+1;
    }
    return data;
}

@Test
public void runARangeQuery_forkJoin(){
    Set<Short> ids = forkJoinContianer.findIdsInRange(14, 17, true, true);
    assertEquals(true, ids.size()>0);
}
}   

容器代码的精简版本

public class ForkJoinRangeContainer {

private Map<Short, Long> dataSource = new HashMap<Short, Long>();

public ForkJoinRangeContainer(long[] data) {
    populateData(data);
}

private void populateData(final long[] data) {
    for (short i = 0; i < data.length; i++) {
        dataSource.put(i, data[i]);
    }
}

public Set<Short> findIdsInRange(final long fromValue, long toValue, boolean fromInclusive, boolean toInclusive) {
    ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
    SearchTask task = new SearchTask(dataSource, fromValue, toValue, fromInclusive, toInclusive);
    Map<Short, Long> map = forkJoinPool.invoke(task);
    forkJoinPool.shutdown();
    return map.keySet();
}

public static void main(String[] args) {

    long[] data = new long[32000];
    for (int i = 0; i < 32000; i++) {
        data[i] = i + 1;
    }
    ForkJoinRangeContainer rf2 = new ForkJoinRangeContainer(data);
    Set<Short> ids = rf2.findIdsInRange(14, 17, true, true);
    if (ids.size() > 0) {
        System.out.println("Found Ids");
    }
}


推荐答案

您被困在SearchTask
的永无休止的循环中返回ForkJoinTask.invokeAll(createSubtasks())

You’re stuck in a never-ending loop at SearchTask return ForkJoinTask.invokeAll(createSubtasks())

createSubtasks()反复创建子任务

The createSubtasks() creates subtasks over and over again with the same values since you never reduce the dataMap size.

F / J通过将对象分为左和右来工作。每个左和右都会创建具有其一半值的新左和右。这种减半一直持续到您完成工作的门槛。

F/J works by splitting an object into Left and Right. Each Left and Right creates new Left and Right with half the value thereof. This halving keeps going on until a threshold where you "do the work.’

我在编程中学到的第一课是保持简单。

The first lesson I ever learned in programming was to Keep It Simple.

您正在混合使用Map,ArrayMap,ConcurrentSkipListMap,ConcurrentNavigableMap,List,stream.Collectors,HashMap和Set以及F / J类。最令人困惑的是,它很难遵循并且通常会导致失败。简单一点更好。

You’re mixing Map, ArrayMap, ConcurrentSkipListMap, ConcurrentNavigableMap, List, stream.Collectors, HashMap and Set along with the F/J Classes. Most confusing which makes it very difficult to follow and usually leads to failure. Simple is better.

为ForkJoinTask.invokeAll()创建列表时,请在invoke()之前一次创建列表。该列表应包含完成工作所需的所有子任务,每个子任务的值是之前值的一半。不要使用流;您没有流,列表中只有几个子任务。

When you create a List for ForkJoinTask.invokeAll(), create the List at one time, before the invoke(). The List should contain all the subtasks you need to complete the work, each subtask half the value of the one before. Don’t use a stream; you don’t have a stream, just a few subtasks in a List.

要么左右拆分,要么进行Left.fork()Right.fork()。然后,每个分叉的任务都会再用一半的值进行拆分,以此类推。

Either that or split Left and Right and do Left.fork() Right.fork(). Each forked task then splits again with half the value, etc.

究竟如何减小对象dataMap的拆分大小完全取决于您。

Exactly how to reduce the object dataMap "size to split" is up to you.

这篇关于ReforsiveTask执行ForkJoin时抛出StackOverflowError的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆