AsyncFunction - 在无序模式下收集可抛出的错误 [英] AsyncFunction - Bug collecting throwable in unorder mode

查看:14
本文介绍了AsyncFunction - 在无序模式下收集可抛出的错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在无序模式下使用 AsyncFunc 遇到无限循环.

可以使用以下代码重现

import org.apache.flink.streaming.api.datastream.AsyncDataStream;导入 org.apache.flink.streaming.api.datastream.DataStream;导入 org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;导入 org.apache.flink.streaming.api.functions.async.AsyncFunction;导入 org.junit.Test;导入 java.util.Arrays;导入 java.util.Collections;导入 java.util.concurrent.TimeUnit;公共类异步测试{@测试public void test() 抛出异常 {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<整数>withTimestamps = env.fromCollection(Arrays.asList(1,2,3,4,5));AsyncDataStream.unorderedWait(withTimestamps,(AsyncFunction) (输入,收集器) ->{如果(输入== 3){收集器.收集(新运行时异常(测试"));返回;}收集器.收集(Collections.singleton(确定"));}, 10, TimeUnit.MILLISECONDS).returns(String.class).打印();env.execute("单元测试");}}

我的猜测是 UnorderedStreamElementQueue 是无限循环的原因.

它似乎将包含失败的未来的 StreamElementQueueEntry 添加到其 firstSet 字段中,但从未将其删除(因为此失败的未来不会触发 onCompleteHandler 方法).

有谁知道这是否正确,或者我的代码是否有误?

解决方案

AsyncWaitOperator 确实存在错误.问题在于它不会对异常结果(用户异常以及超时异常)做出反应.

在有序等待的情况下,这会导致只有在另一个异步结果已经完成(没有异常)时才会检测到异常结果.

在无序等待的情况下,这会导致 StreamElementQueueEntry 永远不会从 firstSet 移动到 completedQueue.

此问题有一个问题.它有望在接下来的几天内得到修复.

I am experiencing an infinite loop using an AsyncFunc in unordered mode.

It can be reproduced using the following code

import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.junit.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

public class AsyncTest {

    @Test
    public void test() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Integer> withTimestamps = env.fromCollection(Arrays.asList(1,2,3,4,5));

        AsyncDataStream.unorderedWait(withTimestamps, 
            (AsyncFunction<Integer, String>) (input, collector) -> {
                if (input == 3){
                    collector.collect(new RuntimeException("Test"));
                    return;
                }
                collector.collect(Collections.singleton("Ok"));
            }, 10, TimeUnit.MILLISECONDS)
            .returns(String.class)
            .print();

        env.execute("unit-test");
     }
}

My guess would be that the UnorderedStreamElementQueue is the reason of the infinite loop.

It seems to add the StreamElementQueueEntry containing the failing future into its firstSet field but never removes it (as this failing future is not triggering the onCompleteHandler method).

Anyone knows if this could be right or if I am making a mistake in my code?

解决方案

There is indeed a bug in the AsyncWaitOperator. The problem is that it does not react to results which are exceptions (user exceptions as well as timeout exceptions).

In the case of ordered waiting, this causes that exceptional results are only detected if another async result has been completed (without an exception).

In the case of unordered waiting, this causes that the StreamElementQueueEntry is never moved from the firstSet into the completedQueue.

There is an issue for this problem. It will hopefully be fixed in the next couple of days.

这篇关于AsyncFunction - 在无序模式下收集可抛出的错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆