Flink中不推荐使用JSONDeserializationSchema()吗? [英] is JSONDeserializationSchema() deprecated in Flink?

查看:1854
本文介绍了Flink中不推荐使用JSONDeserializationSchema()吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是Flink的新手,并且做的事情与下面的链接非常相​​似.

I am new to Flink and doing something very similar to the below link.

无法在下沉kafka流时看到消息,而在flink 1.2中看不到打印消息

我还试图将JSONDeserializationSchema()添加为我的没有密钥的Kafka输入JSON消息的反序列化器.

I am also trying to add JSONDeserializationSchema() as a deserializer for my Kafka input JSON message which is without a key.

但是我发现不存在JSONDeserializationSchema().

But I found JSONDeserializationSchema() is not present.

如果我做错了任何事情,请告诉我.

Please let me know if I am doing anything wrong.

推荐答案

JSONDeserializationSchema在之前已被弃用之后,已在Flink 1.8中删除.

JSONDeserializationSchema was removed in Flink 1.8, after having been deprecated earlier.

推荐的方法是编写一个实现DeserializationSchema<T>的解串器.这是一个示例,我已从

The recommended approach is to write a deserializer that implements DeserializationSchema<T>. Here's an example, which I've copied from the Flink Operations Playground:

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;

/**
 * A Kafka {@link DeserializationSchema} to deserialize {@link ClickEvent}s from JSON.
 *
 */
public class ClickEventDeserializationSchema implements DeserializationSchema<ClickEvent> {

    private static final long serialVersionUID = 1L;

    private static final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public ClickEvent deserialize(byte[] message) throws IOException {
        return objectMapper.readValue(message, ClickEvent.class);
    }

    @Override
    public boolean isEndOfStream(ClickEvent nextElement) {
        return false;
    }

    @Override
    public TypeInformation<ClickEvent> getProducedType() {
        return TypeInformation.of(ClickEvent.class);
    }
}

对于Kafka制作人,您将要实现KafkaSerializationSchema<T>,并且您将在同一项目中找到该示例.

For a Kafka producer you'll want to implement KafkaSerializationSchema<T>, and you'll find examples of that in that same project.

这篇关于Flink中不推荐使用JSONDeserializationSchema()吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆