从 FasterXML 读取值时 Flink 收集器问题 [英] Flink Collector Issue While ReadValue from FasterXML

查看:18
本文介绍了从 FasterXML 读取值时 Flink 收集器问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有 Kafka 值作为字符串,POJO 如下,

I have Kafka values as String, and POJO as below,

{"name":"John","timeStamp":"2020-08-11T13:31:31"}

class Person{
    
    private String name;    

    private LocalDateTime timeStamp; 
}

这个时间戳来自Kafka的字符串,并将它们转换为LocalDateTime.

this Time Stamp comes as String from Kafka, and converting them into LocalDateTime.

当我使用来自 FasterXML 的必需库作为 Standalone 和 objectMapper.readValue(value, Person.class) 运行程序时,它工作正常.正在转换.

When i run the program as Standalone and objectMapper.readValue(value, Person.class) using required library from FasterXML, it works fine. It's converting.

当我从 Flink 框架中阅读以下内容时,

When I read from Flink Framework with the below,

 stream.flatMap(new FlatMapFunction<String, Person>() {
            public void flatMap(String value, Collector<Person> out) {
                try {
                    out.collect(objectMapper.readValue(value, Person.class));
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }
        }).print();
        env.execute();

我遇到了以下问题,

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: com.fasterxml.jackson.databind.json.JsonMapper@1b7cc17c is not serializable. The object probably contains or references non serializable fields.

该消息显示 Person 对象不可序列化,并且我已经为 Person 类实现了 Serializable 但没有运气.而且,在下面尝试过,也不是运气.

The message shows me the Person object is not serializable, and i have implemented Serializable for Person class but no luck. And also, tried below, not luck too.

@JsonSerialize(using = LocalDateTimeSerializer.class)
private LocalDateTime timeStamp; 

更新:

看起来像 API 的问题,我在下面的链接中读到了,

Looks like issue with API, i read in the below link,

https://issues.apache.org/jira/browse/FLINK-12113

推荐答案

异常指出 JsonMapper 实例不是 Serializable - 如果我没记错的话,它从 2.1 版本开始,已被序列化.此外,Person 类也应该是可序列化的.

The exception states that the JsonMapper instance is not Serializable - if I'm not mistaken, it has been made serializable as of version 2.1. Also, Person class should be made serializable as well.

因此,在您的情况下,我会说您应该切换到 jackson-databind 版本 >=2.1 或者可能使 JsonMapper static领域.

So, in your case I would say you should either switch to jackson-databind version >=2.1 or probably make JsonMapper static field.

如果是Person 类,只需简单地实现Serializable 接口:

In case of Person class, just simply implement Serializable interface:

class Person implements Serializable {
    
    private String name;    

    private LocalDateTime timeStamp; 
}

这篇关于从 FasterXML 读取值时 Flink 收集器问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆