无法使用 kafka-avro-console-consumer 读取 avro 消息.SerializationException:未知的魔法字节 [英] Unable to read avro messages using kafka-avro-console-consumer. SerializationException: Unknown magic byte

查看:45
本文介绍了无法使用 kafka-avro-console-consumer 读取 avro 消息.SerializationException:未知的魔法字节的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写一个 REST 代理,比如融合的休息代理.它接受一个 JSON 负载、模式主题和 id,然后将 JSON 负载作为 Avro 对象写入流中.当我使用 kafka-avro-console-consumer 读取消息时,出现未知幻字节"错误.

I am writing a REST proxy like the confluent rest proxy. It takes a JSON payload, schema subject, and id, and then writes the JSON payload as an Avro object into the stream. When I use kafka-avro-console-consumer to read the message, I am getting "unknown magic byte" errors.

这是我的 kafka 生产者配置:

Here is my kafka producer config:

        properties.put("client.id", LocalHostUtils.getLocalHostName(null));

        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);

        properties.put(AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS, false);
        properties.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, RecordNameStrategy.class);

        properties.put("schema.registry.url", configValuesManager.getString("dsp_kafka.schema_registry"));

        if (KafkaUtils.isKafkaEnabled()) {
            this.kafkaProducer = new KafkaProducer<String, Object>(properties);
        }

这是 REST 控制器如何将传入的 JSON 转换为 Avro

This is how the REST controller converts the incoming JSON to Avro

        Schema schema = null;
        try {
            schema = schemaRegistryClient.getBySubjectAndID(schemaSubject, schemaId);
        } catch (RestClientException e) {
            throw new IOExceptionWithCause(e);
        }

        log.debug(postContent);
        log.info("Subject/Version {}/{} -> {}", schemaSubject, schemaId, schema);
        Object data = toAvro(schema, postContent);

这是toAvro方法的实现:

    Object toAvro(Schema schema, String jsonBody) throws IOException
    {
        DatumReader<Object> reader = new GenericDatumReader<Object>(schema);
        Object object = reader.read(
                null, decoderFactory.jsonDecoder(schema, jsonBody));

        return object;

    }

然后将此对象传递给我使用上面给出的属性配置的 schemaValidatingProducer....

This object is then passed to the schemaValidatingProducer that I configured with properties given above....

        this.kafkaSchemaValidatingProducer.publish(topic, 0, UUID.randomUUID().toString(), data);

这是kafkaSchemaValidatingProducer

    public void publish(String topic, Integer partition, String key, Object data)
    {
        log.debug("publish topic={} key={} value={}", topic, key, data);

        if (!KafkaUtils.isKafkaEnabled()) {
            log.warn("Kafka is not enabled....");
            return;
        }

        ProducerRecord<String, Object> record = new ProducerRecord<String, Object>(topic, key, data);


        Future<RecordMetadata> metadataFuture = kafkaProducer.send(record, new Callback()
        {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception)
            {
                if (exception == null) {
                    log.info(metadata.toString());
                    return;
                }

                log.error("exception", exception);

            }
        });

        kafkaProducer.flush();

    }

这就是我阅读主题的方式

this is how I am reading the topic

./bin/kafka-avro-console-consumer --bootstrap-server kafka-broker1:9021 --consumer.config client-ssl.properties --topic schema-validated-topic --property print.key=true --property print.value=true --value-deserializer io.confluent.kafka.serializers.KafkaAvroDeserializer --offset earliest --skip-message-on-error --partition 0 --property schema.registry.url http://schema-regisry

这导致......

[2019-08-26 16:30:36,351] ERROR Error processing message, skipping this message:  (kafka.tools.ConsoleConsumer$:76)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

知道为什么我会收到错误的幻数错误"吗?

Any idea why I am getting the "Bad magic number error" ?

推荐答案

我找到了问题所在.原因是我没有在命令中指定密钥解串器.

I figured out the problem. It was that I was not specifying the key deserializer in my command.

这是有效的命令.

./bin/kafka-avro-console-consumer \
--bootstrap-server <bootstrap-server> \
--consumer.config client-ssl.properties \
--property schema.registry.url=<schema-registry-url> \
--topic <name-of-topic> \
--property print.key=true \
--property print.value=true \
--value-deserializer io.confluent.kafka.serializers.KafkaAvroDeserializer \
--key-deserializer org.apache.kafka.common.serialization.StringDeserializer

这篇关于无法使用 kafka-avro-console-consumer 读取 avro 消息.SerializationException:未知的魔法字节的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆