Flink中不推荐使用JSONDeserializationSchema()吗? [英] is JSONDeserializationSchema() deprecated in Flink?
问题描述
我是Flink的新手,并且做的事情与下面的链接非常相似.
I am new to Flink and doing something very similar to the below link.
无法在下沉kafka流时看到消息,而在flink 1.2中看不到打印消息
我还试图将JSONDeserializationSchema()添加为我的没有密钥的Kafka输入JSON消息的反序列化器.
I am also trying to add JSONDeserializationSchema() as a deserializer for my Kafka input JSON message which is without a key.
但是我发现不存在JSONDeserializationSchema().
But I found JSONDeserializationSchema() is not present.
如果我做错了任何事情,请告诉我.
Please let me know if I am doing anything wrong.
推荐答案
JSONDeserializationSchema
在之前已被弃用之后,已在Flink 1.8中删除.
JSONDeserializationSchema
was removed in Flink 1.8, after having been deprecated earlier.
推荐的方法是编写一个实现DeserializationSchema<T>
的解串器.这是一个示例,我已从
The recommended approach is to write a deserializer that implements DeserializationSchema<T>
. Here's an example, which I've copied from the Flink Operations Playground:
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
/**
* A Kafka {@link DeserializationSchema} to deserialize {@link ClickEvent}s from JSON.
*
*/
public class ClickEventDeserializationSchema implements DeserializationSchema<ClickEvent> {
private static final long serialVersionUID = 1L;
private static final ObjectMapper objectMapper = new ObjectMapper();
@Override
public ClickEvent deserialize(byte[] message) throws IOException {
return objectMapper.readValue(message, ClickEvent.class);
}
@Override
public boolean isEndOfStream(ClickEvent nextElement) {
return false;
}
@Override
public TypeInformation<ClickEvent> getProducedType() {
return TypeInformation.of(ClickEvent.class);
}
}
对于Kafka制作人,您将要实现KafkaSerializationSchema<T>
,并且您将在同一项目中找到该示例.
For a Kafka producer you'll want to implement KafkaSerializationSchema<T>
, and you'll find examples of that in that same project.
这篇关于Flink中不推荐使用JSONDeserializationSchema()吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!