AsyncFunction-以无序模式收集错误 [英] AsyncFunction - Bug collecting throwable in unorder mode

查看:100
本文介绍了AsyncFunction-以无序模式收集错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在无序模式下使用AsyncFunc经历无限循环.

可以使用以下代码进行复制

 import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.junit.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

public class AsyncTest {

    @Test
    public void test() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Integer> withTimestamps = env.fromCollection(Arrays.asList(1,2,3,4,5));

        AsyncDataStream.unorderedWait(withTimestamps, 
            (AsyncFunction<Integer, String>) (input, collector) -> {
                if (input == 3){
                    collector.collect(new RuntimeException("Test"));
                    return;
                }
                collector.collect(Collections.singleton("Ok"));
            }, 10, TimeUnit.MILLISECONDS)
            .returns(String.class)
            .print();

        env.execute("unit-test");
     }
}
 

我的猜测是 UnorderedStreamElementQueue 是无限循环的原因.

似乎将包含失败未来的 StreamElementQueueEntry 添加到其 firstSet 字段中,但是从不删除它(因为此失败未来不会触发onCompleteHandler方法). >

任何人都知道这可能是对的还是我的代码有误?

解决方案

AsyncWaitOperator中确实存在一个错误.问题在于,它对异常(用户异常以及超时异常)的结果没有反应.

在有序等待的情况下,这导致只有在完成另一个异步结果(无异常)的情况下,才会检测到异常结果.

在无序等待的情况下,这将导致StreamElementQueueEntry从未从firstSet移到completedQueue.

有一个问题.希望将在接下来的几天内修复.

I am experiencing an infinite loop using an AsyncFunc in unordered mode.

It can be reproduced using the following code

import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.junit.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

public class AsyncTest {

    @Test
    public void test() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Integer> withTimestamps = env.fromCollection(Arrays.asList(1,2,3,4,5));

        AsyncDataStream.unorderedWait(withTimestamps, 
            (AsyncFunction<Integer, String>) (input, collector) -> {
                if (input == 3){
                    collector.collect(new RuntimeException("Test"));
                    return;
                }
                collector.collect(Collections.singleton("Ok"));
            }, 10, TimeUnit.MILLISECONDS)
            .returns(String.class)
            .print();

        env.execute("unit-test");
     }
}

My guess would be that the UnorderedStreamElementQueue is the reason of the infinite loop.

It seems to add the StreamElementQueueEntry containing the failing future into its firstSet field but never removes it (as this failing future is not triggering the onCompleteHandler method).

Anyone knows if this could be right or if I am making a mistake in my code?

解决方案

There is indeed a bug in the AsyncWaitOperator. The problem is that it does not react to results which are exceptions (user exceptions as well as timeout exceptions).

In the case of ordered waiting, this causes that exceptional results are only detected if another async result has been completed (without an exception).

In the case of unordered waiting, this causes that the StreamElementQueueEntry is never moved from the firstSet into the completedQueue.

There is an issue for this problem. It will hopefully be fixed in the next couple of days.

这篇关于AsyncFunction-以无序模式收集错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆