Apache Flink - 如何使用 AWS Kinesis 发送和使用 POJO [英] Apache Flink - how to send and consume POJOs using AWS Kinesis

查看:25
本文介绍了Apache Flink - 如何使用 AWS Kinesis 发送和使用 POJO的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用 Flink 使用来自 Kinesis 的 POJO.
是否有正确发送和反序列化消息的标准?

I want to consume POJOs arriving from Kinesis with Flink.
Is there any standard for how to correctly send and deserialize the messages?

谢谢

推荐答案

我解决了:

DataStream<SamplePojo> kinesis = see.addSource(new FlinkKinesisConsumer<>(
        "my-stream",
        new POJODeserializationSchema(),
        kinesisConsumerConfig));

public class POJODeserializationSchema extends AbstractDeserializationSchema<SamplePojo> {
    private ObjectMapper mapper;

    @Override
    public SamplePojo deserialize(byte[] message) throws IOException {
        if (mapper == null) {
            mapper = new ObjectMapper();
        }

        SamplePojo retVal = mapper.readValue(message, SamplePojo.class);

        return retVal;
    }

    @Override
    public boolean isEndOfStream(SamplePojo nextElement) {
        return false;
    }
}

这篇关于Apache Flink - 如何使用 AWS Kinesis 发送和使用 POJO的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆