Apache Flink - 如何使用 AWS Kinesis 发送和使用 POJO [英] Apache Flink - how to send and consume POJOs using AWS Kinesis
本文介绍了Apache Flink - 如何使用 AWS Kinesis 发送和使用 POJO的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我想使用 Flink 使用来自 Kinesis 的 POJO.
是否有正确发送和反序列化消息的标准?
I want to consume POJOs arriving from Kinesis with Flink.
Is there any standard for how to correctly send and deserialize the messages?
谢谢
推荐答案
我解决了:
DataStream<SamplePojo> kinesis = see.addSource(new FlinkKinesisConsumer<>(
"my-stream",
new POJODeserializationSchema(),
kinesisConsumerConfig));
和
public class POJODeserializationSchema extends AbstractDeserializationSchema<SamplePojo> {
private ObjectMapper mapper;
@Override
public SamplePojo deserialize(byte[] message) throws IOException {
if (mapper == null) {
mapper = new ObjectMapper();
}
SamplePojo retVal = mapper.readValue(message, SamplePojo.class);
return retVal;
}
@Override
public boolean isEndOfStream(SamplePojo nextElement) {
return false;
}
}
这篇关于Apache Flink - 如何使用 AWS Kinesis 发送和使用 POJO的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文