Kafka:编写自定义序列化程序 [英] Kafka: writing custom serializer

查看:194
本文介绍了Kafka:编写自定义序列化程序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Kafka 0.8.1构建POC。我使用自己的java类作为Kafka消息,它有一堆String数据类型。我不能使用默认的序列化程序类或Kafka库附带的String序列化程序类。我想我需要编写自己的序列化程序并将其提供给生产者属性。
如果您知道在Kafka中编写示例自定义序列化程序(在java中),请分享。
非常感谢。非常感谢。

I am trying to build a POC with Kafka 0.8.1. I am using my own java class as a Kafka message which has a bunch of String data types. I cannot use the default serializer class or the String serializer class that comes with Kafka library. I guess I need to write my own serializer and feed it to the producer properties. If you are aware of writing an example custom serializer in Kafka (in java), please do share. Appreciate a lot, thanks much.

推荐答案

编写自定义序列化程序所需的东西是:

The things required for writing a custom serializer are:


  1. 使用为通用

    编码器 >
  2. 提供 VerifiableProperties 构造函数是必需的

  1. Implement Encoder with an object specified for the generic
    • Supplying a VerifiableProperties constructor is required



为生产者声明自定义序列化程序



正如您在问题中所述,Kafka提供了一种为生产者声明特定序列化器的方法。序列化程序类在 ProducerConfig 实例中设置,该实例用于构造所需的 Producer 类。

Declaring a custom serializer for a producer

As you noted in your question, Kafka supplies a means to declare a specific serializer for a producer. The serializer class is set in a ProducerConfig instance and that instance is used to construct the desired Producer class.

如果您按照 Kafka的制作人示例您将通过属性对象构建 ProducerConfig 。构建属性文件时,请务必包括:

If you follow Kafka's Producer Example you will construct ProducerConfig via a Properties object. When building your properties file be sure to include:

props.put("serializer.class", "path.to.your.CustomSerializer");

使用类的路径,您希望Kafka在将消息附加到日志之前用它来序列化消息。

With the path to the class you want Kafka to use to serialize messages before appending them to the log.

编写Kafka可以正确解释的自定义序列化程序需要实现 Encoder [T] Kafka提供的scala类。 在java中实现特征很奇怪,但以下方法有效用于在我的项目中序列化JSON:

Writing a custom serializer that Kafka can properly interpret requires implementing the Encoder[T] scala class that Kafka provides. Implementing traits in java is weird, but the following method worked for serializing JSON in my project:

public class JsonEncoder implements Encoder<Object> {
    private static final Logger logger = Logger.getLogger(JsonEncoder.class);
    // instantiating ObjectMapper is expensive. In real life, prefer injecting the value.
    private static final ObjectMapper objectMapper = new ObjectMapper();

    public JsonEncoder(VerifiableProperties verifiableProperties) {
        /* This constructor must be present for successful compile. */
    }

    @Override
    public byte[] toBytes(Object object) {
        try {
            return objectMapper.writeValueAsString(object).getBytes();
        } catch (JsonProcessingException e) {
            logger.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
        }
        return "".getBytes();
    }
}

您的问题听起来好像您正在使用一个对象(对于附加到日志中的所有消息,我们称之为 CustomMessage )。如果是这种情况,您的序列化程序可能看起来更像这样:

Your question makes it sound like you are using one object (lets call it CustomMessage) for all messages appended to your log. If that's the case, your serializer could look more like this:

package com.project.serializer;

public class CustomMessageEncoder implements Encoder<CustomMessage> {
    public CustomMessageEncoder(VerifiableProperties verifiableProperties) {
        /* This constructor must be present for successful compile. */
    }

    @Override
    public byte[] toBytes(CustomMessage customMessage) {
        return customMessage.toBytes();
    }
}

这会使您的房产配置如下所示:

Which would leave your property config to look like this:

props.put("serializer.class", "path.to.your.CustomSerializer");

这篇关于Kafka:编写自定义序列化程序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆