Flink反序列化的两个问题 [英] Two questions about Flink deserializing

查看:100
本文介绍了Flink反序列化的两个问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 Flink 和集群计算的新手.我花了一整天的时间试图在 Flink 上正确解析来自 Kafka 的愚蠢流,但没有结果:这有点令人沮丧......我在 kafka 中有一个用字符串键标识的 JSON-LD 消息流.我只是想在 Flink 中检索它们,然后用不同的键将消息分开.

I'm a very newbie of Flink and cluster computing. I spent all day trying to parse correctly on Flink a stupid stream from Kafka with NONE results: It's a bit frustrating... I've in kafka a stream of JSON-LD messages identified with a string key. I simply would like to retrieve them in Flink and then separate messages with different keys.

1)最初我考虑将消息作为字符串而不是 JSON-LD 发送.我虽然更容易...

1) Initially I considered to send messages as String instead of JSON-LD. I though was easier...

我尝试了所有反序列化器,但没有一个有效.简单的反序列化器显然有效,但它完全忽略了键.

I tried every deserialiser but none works. The simple deserialiser obsviously works but it completely ignore keys.

我相信我必须使用(Flink 显然只有两个支持键的反序列化器):

I believed I had to use (Flink apparently has just two deserialiser which support keys):

DataStream<Object> stream = env
            .addSource(new FlinkKafkaConsumer010<>("topicTest", new TypeInformationKeyValueSerializationSchema(String.class, String.class, env.getConfig()), properties))
            .rebalance();

    stream.print();

但我得到:

06/12/2017 02:09:12 来源:自定义来源(4/4)切换到失败java.io.EOF异常在 org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:306)

06/12/2017 02:09:12 Source: Custom Source(4/4) switched to FAILED java.io.EOFException at org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:306)

如何在不丢失密钥的情况下接收流消息?

How can I receive stream messages without lose keys?

2)我的 kafka 生产者是用 javascript 实现的,因为 Flink 支持 JSONDeserialization 我虽然直接在 kafka 中发送 JSON 对象.我不确定这是否适用于 JSON-LD,但我已经使用过:

2) My kafka producer is implemented in javascript, since Flink support JSONDeserialization I though to send in kafka directly JSON Object. I'm not sure that's works correctly with JSON-LD but I've used:

json.parse(jsonld_message)

将消息序列化为 json.然后我用通常的字符串键发送了这个.

to serialize as json the message. Then I sent this with usual string key.

但在 Flink 中此代码不起作用:

But in Flink this code doesn't work:

DataStream<ObjectNode> stream = env
            .addSource(new FlinkKafkaConsumer010<>("topicTest", new JSONKeyValueDeserializationSchema(false), properties))
            .rebalance();

    stream.print();

举个

JsonParserException.

JsonParserException.

我认为第一种方法更简单,我更喜欢它,因为它允许同时考虑一个问题(第一:接收数据,第二:我猜用外部库重新转换 JSON-LD 中的字符串).

I think first approach is simpler and I prefer it because allows to consider one problem at time (first: receive data, second: reconvert string in JSON-LD with external library I guess).

推荐答案

SOLVED:

最后我决定实现一个实现 KeyedDeserializedSchema 接口的自定义反序列化器.

Finally I decided to implement a custom deserializer implementing the KeyedDeserializedSchema interface.

这篇关于Flink反序列化的两个问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆