在消费端通过 kafka 对 UUID 进行 Avro 自定义解码 [英] Avro custom decoding of UUID through kafka on consumer end

查看:35
本文介绍了在消费端通过 kafka 对 UUID 进行 Avro 自定义解码的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我编写了一个类来自定义将 UUID 类型的对象编码为要跨 kafka 和 avro 传输的字节.

I've written a class to custom encode objects of UUID type to bytes to be transported across kafka and avro.

为了使用这个类,我在目标对象中的 uuid 变量上方放置了一个 @AvroEncode(using=UUIDAsBytesEncoding.class).(这是由 apache avro 反射库实现的)

To use this class, I put an @AvroEncode(using=UUIDAsBytesEncoding.class) above the uuid variable in my target object. (This is being implemented by the apache avro reflect library)

我很难弄清楚如何让我的消费者自动使用自定义解码器.(还是我必须进去手动解码?).

I'm having difficulty figuring out how to have my consumer automatically use the custom decoder. (or do I have to go in and manually decode it?).

这是我的 UUIDAsBytesEncoder 扩展 CustomEncoding 类:

Here is my UUIDAsBytesEncoder extends CustomEncoding class:

public class UUIDAsBytesEncoding extends CustomEncoding<UUID> {

    public UUIDAsBytesEncoding() {
        List<Schema> union = Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.BYTES));
        union.get(1).addProp("CustomEncoding", "UUIDAsBytesEncoding");

        schema = Schema.createUnion(union);
    }

    @Override
    protected void write(Object datum, Encoder out) throws IOException {
        if(datum != null) {
            // encode the position of the data in the union
            out.writeLong(1);

            // convert uuid to bytes
            byte[] bytes = new byte[16];
            Conversion.uuidToByteArray(((UUID) datum),bytes,0,16);

            // encode length of data
            out.writeLong(16);

            // write the data
            out.writeBytes(bytes);
        } else {
            // position of null in union
            out.writeLong(0);
        }
    }

    @Override
    protected UUID read(Object reuse, Decoder in) throws IOException {
        System.out.println("READING");
        Long size = in.readLong();
        Long leastSig = in.readLong();
        Long mostSig = in.readLong();
        return new UUID(mostSig, leastSig);
    }
}

write 方法和编码运行良好,但在反序列化时从未调用 read 方法.我将如何在消费者中实现这一点?

The write method and encoding work well, but the read method is never getting called on deserialization. How would I implement this in a consumer?

注册表上的架构如下所示:

The schema on the registry looks like:

{"type":"record","name":"Request","namespace":"xxxxxxxx.xxx.xxx","fields":[{"name":"password","type":"string"},{"name":"email","type":"string"},{"name":"id","type":["null",{"type":"bytes","CustomEncoding":"UUIDAsBytesEncoding"}],"default":null}]}`

{"type":"record","name":"Request","namespace":"xxxxxxx.xxx.xxx","fields":[{"name":"password","type":"string"},{"name":"email","type":"string"},{"name":"id","type":["null",{"type":"bytes","CustomEncoding":"UUIDAsBytesEncoding"}],"default":null}]} `

如果消费者不能自动使用该信息来使用 UUIDAsBytesEncoding 读取方法,那么我如何在我的消费者中找到标有该标签的数据?

If the consumer can't automatically use that information to use the UUIDAsBytesEncoding read method, then how would I find the data marked with that tag in my consumer?

我也在使用融合模式注册表.

I am using the confluent schema-registry as well.

任何帮助将不胜感激!

推荐答案

最终找到了解决方案.编码不正确——内置的 writeBytes() 方法会自动为您写入长度.

Ended up finding the solution. The encoding was incorrect-- the built in writeBytes() method automatically writes the length for you.

然后在消费者中,我们必须通过一个 GenericDatumWriter 来做,写入一个二进制流,然后用一个 ReflectDatumReader 从二进制流中读取.这将自动调用 UUIAsBytesEncoding read() 方法并反序列化 UUID.

Then in the consumer, we must do go to through a GenericDatumWriter, write to a binary stream, and then read from the binary stream with a ReflectDatumReader. This will automatically call the UUIAsBytesEncoding read() method and deserialize the UUID.

我的消费者看起来像这样(作为消费者组执行服务的一部分 此处的演练):

My consumer would look something like this (as part of a consumer group executor service walkthrough here):

/**
 * Start a single consumer instance
 * This will use the schema built into the IndexedRecord to decode and create key/value for the message
 */
public void run() {
    ConsumerIterator it = this.stream.iterator();
    while (it.hasNext()) {
        MessageAndMetadata messageAndMetadata = it.next();
        try {
            String key = (String) messageAndMetadata.key();
            IndexedRecord value = (IndexedRecord) messageAndMetadata.message();

            ByteArrayOutputStream bytes = new ByteArrayOutputStream();

            GenericDatumWriter<Object> genericRecordWriter = new GenericDatumWriter<>(value.getSchema());
            genericRecordWriter.write(value, EncoderFactory.get().directBinaryEncoder(bytes, null));

            ReflectDatumReader<T> reflectDatumReader = new ReflectDatumReader<>(value.getSchema());
            T newObject = reflectDatumReader.read(null, DecoderFactory.get().binaryDecoder(bytes.toByteArray(), null));
            IOUtils.closeQuietly(bytes);

            System.out.println("************CONSUMED:  " + key + ": "+ newObject);

        } catch(SerializationException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    System.out.println("Shutting down Thread: " + this.threadNumber);
}

然后新的 UUIDAsBytesEncoding 看起来像:

Then the new UUIDAsBytesEncoding would look like:

public class UUIDAsBytesEncoding extends CustomEncoding<UUID> {

    public UUIDAsBytesEncoding() {
        List<Schema> union = Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.BYTES));
        union.get(1).addProp("CustomEncoding", "UUIDAsBytesEncoding");

        schema = Schema.createUnion(union);
    }

    @Override
    protected void write(Object datum, Encoder out) throws IOException {
        if(datum != null) {
            // encode the position of the data in the union
            out.writeLong(1);

            // convert uuid to bytes
            byte[] bytes = new byte[16];
            Conversion.uuidToByteArray(((UUID) datum), bytes, 0, 16);

            // write the data
            out.writeBytes(bytes);
        } else {
            // position of null in union
            out.writeLong(0);
        }
    }

    @Override
    protected UUID read(Object reuse, Decoder in) throws IOException {
        // get index in union
        int index = in.readIndex();
        if (index == 1) {
            // read in 16 bytes of data
            ByteBuffer b = ByteBuffer.allocate(16);
            in.readBytes(b);

            // convert
            UUID uuid = Conversion.byteArrayToUuid(b.array(), 0);

            return uuid;
        } else {
            // no uuid present
            return null;
        }
    }
}

这也是如何实现 CustomEncoding avro 类的示例.当前版本的 avro 没有内置 UUID 序列化器,因此这是该问题的解决方案.

This is also an example of how to implement a CustomEncoding avro class. The current version of avro does not have a UUID serializer built in, so this is a solution to that problem.

这篇关于在消费端通过 kafka 对 UUID 进行 Avro 自定义解码的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆