如何正确处理自定义 MapFunction 中的错误? [英] How to handle errors in custom MapFunction correctly?

查看:20
本文介绍了如何正确处理自定义 MapFunction 中的错误?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经为我的 Apache Flink 流程实现了 MapFunction.它正在解析传入的元素并将它们转换为其他格式,但有时会出现错误(即传入的数据无效).

I have implemented MapFunction for my Apache Flink flow. It is parsing incoming elements and convert them to other format but sometimes error can appear (i.e. incoming data is not valid).

我看到了两种可能的处理方式:

I see two possible ways how to handle it:

  • 忽略无效元素,但似乎我无法忽略错误,因为对于任何传入元素,我都必须提供传出元素.
  • 将传入元素拆分为有效和无效,但似乎我应该为此使用其他函数.

所以,我有两个问题:

  1. 如何在我的 MapFunction 中正确处理错误?
  2. 如何正确实现这样的转换功能?

推荐答案

您可以使用 FlatMapFunction 而不是 MapFunction.这将允许您仅发出有效的元素.下面显示了一个示例实现:

You could use a FlatMapFunction instead of a MapFunction. This would allow you to only emit an element if it is valid. The following shows an example implementation:

input.flatMap(new FlatMapFunction<String, Long>() {
    @Override
    public void flatMap(String input, Collector<Long> collector) throws Exception {
        try {
            Long value = Long.parseLong(input);
            collector.collect(value);
        } catch (NumberFormatException e) {
            // ignore invalid data
        }
    }
});

这篇关于如何正确处理自定义 MapFunction 中的错误?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆