具有解码器问题的 Kafka Avro 消费者 [英] Kafka Avro Consumer with Decoder issues

查看:50
本文介绍了具有解码器问题的 Kafka Avro 消费者的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我尝试使用我各自的架构对数据运行 Kafka Consumer with Avro 时,它返回一个错误 "AvroRuntimeException: Malformed data. Length is negative: -40" .我看到其他人也有类似的问题 将字节数组转换为 jsonAvro 写入和读取,以及 Kafka Avro Binary *coder.我还参考了这个 Consumer Group Example,其中包含所有很有帮助,但是到目前为止对这个错误没有帮助..它一直工作到这部分代码(第 73 行)

When I attempted to run Kafka Consumer with Avro over the data with my respective schema,it returns an error of "AvroRuntimeException: Malformed data. Length is negative: -40" . I see others have had similar issues converting byte array to json, Avro write and read, and Kafka Avro Binary *coder. I have also referenced this Consumer Group Example, which have all been helpful, however no help with this error thus far.. It works up until this part of code (line 73)

Decoder 解码器 = DecoderFactory.get().binaryDecoder(byteArrayInputStream, null);

Decoder decoder = DecoderFactory.get().binaryDecoder(byteArrayInputStream, null);

我尝试了其他解码器并打印出 byteArrayInputStream 变量的内容,它看起来像我相信您期望序列化的 avro 数据的样子(在消息中我可以看到架构和一些数据和一些格式错误的数据)我有使用 .available() 方法打印出可用的字节数,该方法返回 594.我无法理解为什么会发生此错误.Apache Nifi 用于从 hdfs 生成具有相同架构的 Kafka 流.我将不胜感激.

I have tried other decoders and printed out the contents of byteArrayInputStream variable which looks how what I believe you would expect serialized avro data to look (in the message I can see the schema and some data and some malformed data) I have the printed out the Bytes available using .available() method, which returns 594. I am having trouble understanding why this error is happening. Apache Nifi is used to produce the Kafka stream with same schema from hdfs . I would appreciate any help.

推荐答案

也许问题在于 Nifi 写入(编码)Avro 数据的方式与您的消费者应用读取(解码)数据的方式不匹配.

Perhaps the problem is a mismatch between how the Avro data is written (encoded) by Nifi vs. how your consumer app is reading (decoding) the data.

简而言之,Avro 的 API 提供了两种不同的序列化方法:

In a nutshell, Avro's API provides two different approaches to serialization:

  1. 为了创建适当的 Avro 文件:对数据记录进行编码,同时将 Avro 模式嵌入到一种序言中(通过 org.apache.avro.file.{DataFileWriter/DataFileReader}).将模式嵌入到 Avro 文件中很有意义,因为 (a) 通常 Avro 文件的有效负载"比嵌入的 Avro 模式大几个数量级,并且 (b) 然后您可以复制或移动这些文件,随心所欲并且仍然确保您可以再次阅读它们,而无需咨询某人或某事.
  2. 只对数据记录进行编码,即不嵌入模式(通过org.apache.avro.io.{BinaryEncoder/BinaryDecoder};注意包名的区别:io 与上面的 file).例如,当对写入 Kafka 主题的消息进行 Avro 编码时,这种方法通常很受欢迎,因为与上面的变体 1 相比,您不会产生将 Avro 模式重新嵌入到每条消息中的开销,假设您的(非常合理)策略是,对于相同的 Kafka 主题,消息使用相同的 Avro 模式进行格式化/编码.这是一个显着的优势,因为在流数据上下文中,动态数据记录通常比上述静态数据 Avro 文件小得多(通常在 100 字节到几百 KB 之间)(通常为数百或数千 MB);因此 Avro 模式的大小相对较大,因此您不想在将 2000 条数据记录写入 Kafka 时将其嵌入 2000 倍.缺点是你必须以某种方式"跟踪 Avro 模式如何映射到 Kafka 主题——或者更准确地说,你必须以某种方式跟踪消息是用哪个 Avro 模式编码的,而不是直接沿着嵌入模式的路径.好消息是有Kafka 生态系统(Avro 模式注册表)中可用的工具 用于执行此操作透明地.因此,与变体 1 相比,变体 2 以牺牲便利性为代价提高了效率.
  1. For creating proper Avro files: To encode the data records but also to embed the Avro schema in a kind of preamble (via org.apache.avro.file.{DataFileWriter/DataFileReader}). Embedding the schema into Avro files makes a lot of sense because (a) typically the "payload" of Avro files is orders of magnitudes larger than the embedded Avro schema and (b) you can then copy or move those files around at your heart's content and still be sure you can read them again without having to consult someone or something.
  2. To encode only the data records, i.e. to not embed the schema (via org.apache.avro.io.{BinaryEncoder/BinaryDecoder}; note the difference in the package name: io here vs. file above). This approach is often favored when Avro-encoding messages that are being written to a Kafka topic, for example, because in comparison to variant 1 above you do not incur the overhead of re-embedding the Avro schema into every single message, assuming that your (very reasonable) policy is that, for the same Kafka topic, messages are formatted/encoded with the same Avro schema. This is a significant advantage because, in a stream data context, a data-in-motion data record is typically much smaller (commonly between 100 bytes and few hundred KB) than data-at-rest Avro files as described above (often hundreds or thousands of MB); so the size of the Avro schema is relatively large, and thus you don't want to embed it 2000x when writing 2000 data records to Kafka. The drawback is that you must "somehow" track how Avro schemas map to Kafka topics -- or more precisely, you must somehow track with which Avro schema a message was encoded without going down the path of embedding the schema directly. The good news is that there is tooling available in the Kafka ecosystem (Avro schema registry) for doing this transparently. So in comparison to variant 1, variant 2 gains on efficiency at the expense of convenience.

效果是编码的 Avro 数据的有线格式"看起来会有所不同,具体取决于您使用的是上面的 (1) 还是 (2).

The effect is that the "wire format" for encoded Avro data will look different depending on whether you use (1) or (2) above.

我对 Apache Nifi 不是很熟悉,但是可以快速浏览一下源代码(例如 ConvertAvroToJSON.java) 向我建议它正在使用变体 1,即它在 Avro 记录旁边嵌入了 Avro 模式.但是,您的消费者代码使用 DecoderFactory.get().binaryDecoder(),因此使用变体 2(未嵌入架构).

I am not very familiar with Apache Nifi, but a quick look at the source code (e.g. ConvertAvroToJSON.java) suggests to me that it is using variant 1, i.e. it embeds the Avro schema alongside the Avro records. Your consumer code, however, uses DecoderFactory.get().binaryDecoder() and thus variant 2 (no schema embedded).

也许这可以解释您遇到的错误?

Perhaps this explains the error you have been running into?

这篇关于具有解码器问题的 Kafka Avro 消费者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆