如何为Kafka 2.2实现FlinkKafkaProducer序列化程序 [英] How to implement FlinkKafkaProducer serializer for Kafka 2.2

查看:106
本文介绍了如何为Kafka 2.2实现FlinkKafkaProducer序列化程序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在努力更新从Kafka读取然后写入Kafka的Flink处理器(Flink 1.9版).我们已经将此处理器编写为可以朝着Kafka 0.10.2集群运行,现在我们已经部署了一个运行2.2版的新Kafka集群.因此,我着手更新处理器以使用最新的FlinkKafkaConsumer和FlinkKafkaProducer(由Flink文档建议).但是我遇到了卡夫卡制片人的一些问题.我无法使用不赞成使用的构造函数来序列化数据(不足为奇),而且我无法在线找到有关如何实现序列化程序的任何实现或示例(所有示例都使用较旧的Kafka连接器)

I've been working on updating a Flink processor (Flink version 1.9) that reads from Kafka and then writes to Kafka. We have written this processor to run towards a Kafka 0.10.2 cluster and now we have deployed a new Kafka cluster running version 2.2. Therefore I set out to update the processor to use the latest FlinkKafkaConsumer and FlinkKafkaProducer (as suggested by the Flink docs). However I've run into some problems with the Kafka producer. I'm unable to get it to Serialize data using deprecated constructors (not surprising) and I've been unable to find any implementations or examples online about how to implement a Serializer (all the examples are using older Kafka Connectors)

当前的实现(对于Kafka 0.10.2)如下

The current implementation (for Kafka 0.10.2) is as follows

FlinkKafkaProducer010<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer010<String>(
                "playerSessions",
                new SimpleStringSchema(),
                producerProps,
                (FlinkKafkaPartitioner) null
        );

在尝试实现以下FlinkKafkaProducer时

When trying to implement the following FlinkKafkaProducer

FlinkKafkaProducer<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer<String>(
                "playerSessions",
                new SimpleStringSchema(),
                producerProps,
                null
        );

我收到以下错误:

Exception in thread "main" java.lang.NullPointerException
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:525)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:483)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:357)
    at com.ebs.flink.sessionprocessor.SessionProcessor.main(SessionProcessor.java:122)

,我一直无法弄清楚为什么. FlinkKafkaProducer的构造函数也已被弃用,当我尝试实现未弃用的构造函数时,我不知道如何序列化数据. 以下是它的外观:

and I haven't been able to figure out why. The constructor for FlinkKafkaProducer is also deprecated and when I try implementing the non-deprecated constructor I can't figure out how to serialize the data. The following is how it would look:

FlinkKafkaProducer<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer<String>(
                "playerSessions",
                new KafkaSerializationSchema<String>() {
                    @Override
                    public ProducerRecord<byte[], byte[]> serialize(String s, @Nullable Long aLong) {
                        return null;
                    }
                },
                producerProps,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE
        );

但是我不明白如何实现KafkaSerializationSchema,并且在网上或Flink文档中都找不到该示例.

But I don't understand how to implement the KafkaSerializationSchema and I find no examples of this online or in the Flink docs.

是否有人有实施此操作的经验或有关FlinkProducer为什么在该步骤中获得NullPointerException的提示?

Does anyone have any experience implementing this or any tips on why the FlinkProducer gets NullPointerException in the step?

推荐答案

如果您只是将String发送给Kafka:

If you are just sending String to Kafka:

public class ProducerStringSerializationSchema implements KafkaSerializationSchema<String>{

    private String topic;   

    public ProducerStringSerializationSchema(String topic) {
        super();
        this.topic = topic;
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(String element, Long timestamp) {
        return new ProducerRecord<byte[], byte[]>(topic, element.getBytes(StandardCharsets.UTF_8));
    }

}

用于发送Java对象:

For sending a Java Object:

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;


    public class ObjSerializationSchema implements KafkaSerializationSchema<MyPojo>{

        private String topic;   
        private ObjectMapper mapper;

        public ObjSerializationSchema(String topic) {
            super();
            this.topic = topic;
        }

        @Override
        public ProducerRecord<byte[], byte[]> serialize(MyPojo obj, Long timestamp) {
            byte[] b = null;
            if (mapper == null) {
                mapper = new ObjectMapper();
            }
             try {
                b= mapper.writeValueAsBytes(obj);
            } catch (JsonProcessingException e) {
                // TODO 
            }
            return new ProducerRecord<byte[], byte[]>(topic, b);
        }

    }

在您的代码中

.addSink(new FlinkKafkaProducer<>(producerTopic, new ObjSerializationSchema(producerTopic), 
                        params.getProperties(), FlinkKafkaProducer.Semantic.EXACTLY_ONCE));

这篇关于如何为Kafka 2.2实现FlinkKafkaProducer序列化程序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆