如何为 Kafka 2.2 实现 FlinkKafkaProducer 序列化器 [英] How to implement FlinkKafkaProducer serializer for Kafka 2.2

查看:31
本文介绍了如何为 Kafka 2.2 实现 FlinkKafkaProducer 序列化器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直致力于更新从 Kafka 读取然后写入 Kafka 的 Flink 处理器(Flink 1.9 版).我们已经编写了这个处理器来运行 Kafka 0.10.2 集群,现在我们已经部署了一个运行 2.2 版的新 Kafka 集群.因此,我开始更新处理器以使用最新的 FlinkKafkaConsumer 和 FlinkKafkaProducer(按照 Flink 文档的建议).但是,我遇到了 Kafka 制作人的一些问题.我无法使用不推荐使用的构造函数(并不奇怪)来序列化数据,而且我无法在网上找到任何关于如何实现序列化程序的实现或示例(所有示例都使用较旧的 Kafka 连接器)

I've been working on updating a Flink processor (Flink version 1.9) that reads from Kafka and then writes to Kafka. We have written this processor to run towards a Kafka 0.10.2 cluster and now we have deployed a new Kafka cluster running version 2.2. Therefore I set out to update the processor to use the latest FlinkKafkaConsumer and FlinkKafkaProducer (as suggested by the Flink docs). However I've run into some problems with the Kafka producer. I'm unable to get it to Serialize data using deprecated constructors (not surprising) and I've been unable to find any implementations or examples online about how to implement a Serializer (all the examples are using older Kafka Connectors)

目前的实现(对于Kafka 0.10.2)如下

The current implementation (for Kafka 0.10.2) is as follows

FlinkKafkaProducer010<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer010<String>(
                "playerSessions",
                new SimpleStringSchema(),
                producerProps,
                (FlinkKafkaPartitioner) null
        );

当尝试实现以下 FlinkKafkaProducer 时

When trying to implement the following FlinkKafkaProducer

FlinkKafkaProducer<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer<String>(
                "playerSessions",
                new SimpleStringSchema(),
                producerProps,
                null
        );

我收到以下错误:

Exception in thread "main" java.lang.NullPointerException
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:525)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:483)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:357)
    at com.ebs.flink.sessionprocessor.SessionProcessor.main(SessionProcessor.java:122)

我一直无法弄清楚为什么.FlinkKafkaProducer 的构造函数也已弃用,当我尝试实现未弃用的构造函数时,我不知道如何序列化数据.以下是它的外观:

and I haven't been able to figure out why. The constructor for FlinkKafkaProducer is also deprecated and when I try implementing the non-deprecated constructor I can't figure out how to serialize the data. The following is how it would look:

FlinkKafkaProducer<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer<String>(
                "playerSessions",
                new KafkaSerializationSchema<String>() {
                    @Override
                    public ProducerRecord<byte[], byte[]> serialize(String s, @Nullable Long aLong) {
                        return null;
                    }
                },
                producerProps,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE
        );

但我不明白如何实现 KafkaSerializationSchema 并且我在网上或 Flink 文档中找不到这方面的示例.

But I don't understand how to implement the KafkaSerializationSchema and I find no examples of this online or in the Flink docs.

有没有人有任何实现这个的经验或关于为什么 FlinkProducer 在步骤中得到 NullPointerException 的任何提示?

Does anyone have any experience implementing this or any tips on why the FlinkProducer gets NullPointerException in the step?

推荐答案

如果你只是向 Kafka 发送 String:

If you are just sending String to Kafka:

public class ProducerStringSerializationSchema implements KafkaSerializationSchema<String>{

    private String topic;   

    public ProducerStringSerializationSchema(String topic) {
        super();
        this.topic = topic;
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(String element, Long timestamp) {
        return new ProducerRecord<byte[], byte[]>(topic, element.getBytes(StandardCharsets.UTF_8));
    }

}

用于发送 Java 对象:

For sending a Java Object:

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;


    public class ObjSerializationSchema implements KafkaSerializationSchema<MyPojo>{

        private String topic;   
        private ObjectMapper mapper;

        public ObjSerializationSchema(String topic) {
            super();
            this.topic = topic;
        }

        @Override
        public ProducerRecord<byte[], byte[]> serialize(MyPojo obj, Long timestamp) {
            byte[] b = null;
            if (mapper == null) {
                mapper = new ObjectMapper();
            }
             try {
                b= mapper.writeValueAsBytes(obj);
            } catch (JsonProcessingException e) {
                // TODO 
            }
            return new ProducerRecord<byte[], byte[]>(topic, b);
        }

    }

在你的代码中

.addSink(new FlinkKafkaProducer<>(producerTopic, new ObjSerializationSchema(producerTopic), 
                        params.getProperties(), FlinkKafkaProducer.Semantic.EXACTLY_ONCE));

这篇关于如何为 Kafka 2.2 实现 FlinkKafkaProducer 序列化器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆