Apache Flink - 如何实现自定义反序列化器实现 DeserializationSchema [英] Apache Flink - How to implement custom Deserializer implementing DeserializationSchema

查看:196
本文介绍了Apache Flink - 如何实现自定义反序列化器实现 DeserializationSchema的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Flink,我正在使用 Kafka 连接器.我从 flink 收到的消息是一个以逗号分隔的项目列表.'a','b','c',1,0.1 ....'12:01:00.000'"其中之一包含事件时间,我想将此事件时间用于每个分区的水印(在 kafka 源中),然后将此事件时间用于会话窗口.我的情况与平常有点不同,因为据我所知,人们通常使用kafka Timestamps";和 SimpleStringSchema().在我的情况下,我必须编写自己的反序列化器来实现 DeserializationSchema 并返回一个元组或 Pojo.所以基本上用我自己的函数替换 SimpleStringSchema() .Flink 提供了一些开箱即用的反序列化器,但我真的不明白如何创建自定义反序列化逻辑.

检查flink网站我发现了这个:

我已经得到了一个例子(感谢大卫!),但我仍然不明白如何实现我的.

https://github.com/apache/flink-playgrounds/blob/master/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventDeserializationSchema.java

我真的需要一个例子来说明我如何为列表做这件事.上面指出的是 JSON,所以给了我理论和概念,但我卡在那里了.

解决方案

你应该像介绍POJO一样

public class Event 实现了Serializable {...私人长时间戳;}

并实现类似于链接中的简单反序列化器 - 您可以解析该行,或者通过逗号手动拆分消息字符串,或者使用开箱即用的 csv 阅读器,例如 opencsv,将该行解析为您的 POJO:

public class EventDeserializationSchema 实现了 DeserializationSchema{private static final long serialVersionUID = 1L;@覆盖公共 ClickEvent deserialize(byte[] message) 抛出 IOException {String line = new String(message, StandardCharsets.UTF_8);String[] 部分 = line.split(",");事件事件 = new Event();//TODO: 此处事件的部分返回事件;}@覆盖public boolean isEndOfStream(Event nextElement) {返回假;}@覆盖公共类型信息<事件>getProducedType() {返回 TypeInformation.of(Event.class);}}

I'm working with Flink and I'm using the Kafka Connector. The messages that I'm receiving from flink is a list of comma separated items. "'a','b','c',1,0.1 ....'12:01:00.000'" One of them contain the event time, I would like to use this event time for the per-partition watermarking (in the kafka source), then use this Event Time for session windowing. My case is a bit different from usual because from what i have understood people usually use "kafka Timestamps" and SimpleStringSchema(). On my case instead I have to write my own deserializer that implement DeserializationSchema and return a Tuple or Pojo. So basically substitute the SimpleStringSchema() with my own function. Flink offer out of the box some deserializers but I really don't understnd how i can create a custom deserialization logic.

Checking the flink website i have found this:

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html

I have been given an example (Thanks David!), but still i don't get how to implement mine.

https://github.com/apache/flink-playgrounds/blob/master/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventDeserializationSchema.java

I would really need an example of how I can do it for a list. The one indicated above is for JSON so is giving me the theory, the concept but i got stuck there.

解决方案

You should introduce the POJO like

public class Event implements Serializable {
    ...
    private Long timestamp;
}

and implement the simple deserializer similar to the one from the link - you can parse the line either manually splitting by comma the message string, or you an out-of-box csv readers, like opencsv, to parse the line into your POJO:

public class EventDeserializationSchema implements DeserializationSchema<Event> {

    private static final long serialVersionUID = 1L;

    @Override
    public ClickEvent deserialize(byte[] message) throws IOException {
        String line = new String(message, StandardCharsets.UTF_8);
        String[] parts = line.split(",");
        
        Event event = new Event();
        // TODO: parts to event here
        return event;
    }

    @Override
    public boolean isEndOfStream(Event nextElement) {
        return false;
    }

    @Override
    public TypeInformation<Event> getProducedType() {
        return TypeInformation.of(Event.class);
    }
}

这篇关于Apache Flink - 如何实现自定义反序列化器实现 DeserializationSchema的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆