Kafka Streams - SerializationException:未知的魔法字节 [英] Kafka Streams - SerializationException: Unknown magic byte

查看:44
本文介绍了Kafka Streams - SerializationException:未知的魔法字节的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试创建一个处理 Avro 记录的 Kafka Streams 应用程序,但出现以下错误:

I am trying to create a Kafka Streams Application which processes Avro records, but I am getting the following error:

Exception in thread "streams-application-c8031218-8de9-4d55-a5d0-81c30051a829-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:74)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:91)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:567)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:900)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:801)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:719)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

我不确定是什么导致了这个错误.我只是想首先将 Avro 记录放入应用程序中,然后在那里处理它们,然后输出到另一个主题,但它似乎不起作用.我已经包含了下面应用程序中的代码.谁能看出为什么它不起作用?

I am not sure what is causing this error. I am just trying to get Avro records into the application first where they then will be processed and then output to another topic but it doesn't not seem to be working. I have included the code from the application below. Can anyone see why it is not working?

    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-application");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    Serde<String> stringSerde = Serdes.String();
    Serde<trackingReport> specificAvroTrackingReportSerde = new SpecificAvroSerde<trackingReport>();

    specificAvroTrackingReportSerde.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"), false);


    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, trackingReport> inputreports = builder.stream("intesttopic", Consumed.with(stringSerde, specificAvroTrackingReportSerde));


    KStream<String, trackingReport> outputreports = inputreports;

    String outputTopic = "outtesttopic";
    outputreports.to(outputTopic, Produced.with(stringSerde, specificAvroTrackingReportSerde));

    Topology topology = builder.build();

    KafkaStreams streams = new KafkaStreams(topology, props);
    streams.start();

推荐答案

未知的魔法字节!

Unknown magic byte!

表示您的数据不符合架构注册表预期的有线格式.

Means your data does not adhere to the wire format that's expected for the Schema Registry.

或者,换句话说,您尝试读取的数据不是 Avro,正如 Confluent Avro 反序列化器所期望的那样.

Or, in other words, the data you're trying to read, is not Avro, as expected by the Confluent Avro deserializer.

顺便说一下,您可以通过运行 kafka-avro-console-consumer 来预期相同的错误,因此您可能也想使用它进行调试

You can expect the same error by running kafka-avro-console-consumer, by the way, so you may want to debug using that too

如果您确定您的数据确实是 Avro,并且模式实际上是作为消息的一部分发送的(需要查看您的生产者代码),那么您不应该使用需要特定字节的 Confluent Avro 反序列化器消息中的格式.相反,您可以使用 ByteArrayDesrializer 并自己读取 Avro 记录,然后将其传递给 Apache Avro BinaryDecoder class.作为奖励,您可以将该逻辑提取到您自己的 Deserialzer 类中

If you are sure your data is indeed Avro, and the schema is actually sent as part of the message (would need to see your producer code), then you shouldn't use the Confluent Avro deserializers that are expecting a specific byte format in the message. Instead, you could use ByteArrayDesrializer and read the Avro record yourself, then pass it to the Apache Avro BinaryDecoder class. As a bonus, you can extract that logic into your own Deserialzer class

另外,如果输入主题是 Avro,我认为你不应该使用这个属性来读取字符串

Also, if the input topic is Avro, I don't think you should be using this property for reading strings

DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

这篇关于Kafka Streams - SerializationException:未知的魔法字节的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆