从 FasterXML 读取值时 Flink 收集器问题 [英] Flink Collector Issue While ReadValue from FasterXML
问题描述
我有 Kafka 值作为字符串,POJO 如下,
I have Kafka values as String, and POJO as below,
{"name":"John","timeStamp":"2020-08-11T13:31:31"}
class Person{
private String name;
private LocalDateTime timeStamp;
}
这个时间戳来自Kafka的字符串,并将它们转换为LocalDateTime
.
this Time Stamp comes as String from Kafka, and converting them into LocalDateTime
.
当我使用来自 FasterXML 的必需库作为 Standalone 和 objectMapper.readValue(value, Person.class)
运行程序时,它工作正常.正在转换.
When i run the program as Standalone and objectMapper.readValue(value, Person.class)
using required library from FasterXML, it works fine. It's converting.
当我从 Flink 框架中阅读以下内容时,
When I read from Flink Framework with the below,
stream.flatMap(new FlatMapFunction<String, Person>() {
public void flatMap(String value, Collector<Person> out) {
try {
out.collect(objectMapper.readValue(value, Person.class));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
}).print();
env.execute();
我遇到了以下问题,
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: com.fasterxml.jackson.databind.json.JsonMapper@1b7cc17c is not serializable. The object probably contains or references non serializable fields.
该消息显示 Person 对象不可序列化,并且我已经为 Person
类实现了 Serializable
但没有运气.而且,在下面尝试过,也不是运气.
The message shows me the Person object is not serializable, and i have implemented Serializable
for Person
class but no luck. And also, tried below, not luck too.
@JsonSerialize(using = LocalDateTimeSerializer.class)
private LocalDateTime timeStamp;
更新:
看起来像 API 的问题,我在下面的链接中读到了,
Looks like issue with API, i read in the below link,
https://issues.apache.org/jira/browse/FLINK-12113
推荐答案
异常指出 JsonMapper
实例不是 Serializable
- 如果我没记错的话,它从 2.1
版本开始,已被序列化.此外,Person
类也应该是可序列化的.
The exception states that the JsonMapper
instance is not Serializable
- if I'm not mistaken, it has been made serializable as of version 2.1
. Also, Person
class should be made serializable as well.
因此,在您的情况下,我会说您应该切换到 jackson-databind
版本 >=2.1
或者可能使 JsonMapper static
领域.
So, in your case I would say you should either switch to jackson-databind
version >=2.1
or probably make JsonMapper static
field.
如果是Person
类,只需简单地实现Serializable
接口:
In case of Person
class, just simply implement Serializable
interface:
class Person implements Serializable {
private String name;
private LocalDateTime timeStamp;
}
这篇关于从 FasterXML 读取值时 Flink 收集器问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!