如何正确处理自定义MapFunction中的错误? [英] How to handle errors in custom MapFunction correctly?

查看:132
本文介绍了如何正确处理自定义MapFunction中的错误?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经为我的Apache Flink流实现了MapFunction.它正在解析传入的元素并将其转换为其他格式,但有时会出现错误(即传入的数据无效).

I have implemented MapFunction for my Apache Flink flow. It is parsing incoming elements and convert them to other format but sometimes error can appear (i.e. incoming data is not valid).

我看到了两种可能的处理方式:

I see two possible ways how to handle it:

  • 忽略无效元素,但似乎无法忽略错误,因为对于任何传入元素,我都必须提供传出元素.
  • 将传入的元素拆分为有效和无效的元素,但看来我应该为此使用其他函数.

所以,我有两个问题:

  1. 如何正确处理我的MapFunction中的错误?
  2. 如何正确实现此类转换功能?
  1. How to handle errors correctly in my MapFunction?
  2. How to implement such transformation functions correctly?

推荐答案

您可以使用FlatMapFunction代替MapFunction.这将使您仅在元素有效时才发出它.下面显示了示例实现:

You could use a FlatMapFunction instead of a MapFunction. This would allow you to only emit an element if it is valid. The following shows an example implementation:

input.flatMap(new FlatMapFunction<String, Long>() {
    @Override
    public void flatMap(String input, Collector<Long> collector) throws Exception {
        try {
            Long value = Long.parseLong(input);
            collector.collect(value);
        } catch (NumberFormatException e) {
            // ignore invalid data
        }
    }
});

这篇关于如何正确处理自定义MapFunction中的错误?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆