带解码器问题的Kafka Avro Consumer [英] Kafka Avro Consumer with Decoder issues

查看:274
本文介绍了带解码器问题的Kafka Avro Consumer的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我尝试使用我各自的架构运行 Kafka Consumer with Avro 时,它返回错误AvroRuntimeException:格式错误的数据。长度为负:-40。我看到其他人也有类似的问题将字节数组转换为json Avro写入和读取,以及 Kafka Avro Binary *编码器。我还引用了这个 Consumer Group Example 有帮助,但到目前为止没有帮助这个错误..
它一直工作到这部分代码(第73行)

When I attempted to run Kafka Consumer with Avro over the data with my respective schema,it returns an error of "AvroRuntimeException: Malformed data. Length is negative: -40" . I see others have had similar issues coverting byte array to json, Avro write and read, and Kafka Avro Binary *coder. I have also referenced this Consumer Group Example, which have all been helpful, however no help with this error thus far.. It works up until this part of code (line 73)

解码器解码器= DecoderFactory.get ()。binaryDecoder(byteArrayInputStream,null);

Decoder decoder = DecoderFactory.get().binaryDecoder(byteArrayInputStream, null);

我已经尝试过其他解码器并打印出byteArrayInputStream变量的内容,这看起来我相信你会期望序列化的avro数据看(在消息中,我可以看到模式和一些数据以及一些格式错误的数据)我使用.available()方法打印出字节,返回594.我无法理解为什么会发生这种错误。 Apache Nifi用于生成具有来自hdfs的相同模式的Kafka流。我将不胜感激任何帮助。

I have tried other decoders and printed out the contents of byteArrayInputStream variable which looks how what I believe you would expect serialized avro data to look (in the message I can see the schema and some data and some malformed data) I have the printed out the Bytes available using .available() method, which returns 594. I am having trouble understanding why this error is happening. Apache Nifi is used to produce the Kafka stream with same schema from hdfs . I would appreciate any help.

推荐答案

也许问题是Nifi与Avro数据的编写(编码)方式不匹配。消费者应用程序如何读取(解码)数据。

Perhaps the problem is a mismatch between how the Avro data is written (encoded) by Nifi vs. how your consumer app is reading (decoding) the data.

简而言之,Avro的API提供了两种不同的序列化方法:

In a nutshell, Avro's API provides two different approaches to serialization:


  1. 用于创建正确的Avro 文件:对数据记录进行编码,同时将Avro架构嵌入到一种前导码中(通过 org.apache.avro.file。{DataFileWriter / DataFileReader} )。将模式嵌入到Avro文件中非常有意义,因为(a)Avro文件的有效负载通常比嵌入式Avro模式大一些,并且(b)然后您可以根据自己的内容复制或移动这些文件并且仍然可以确保你可以再次阅读它们而无需咨询某人或某事。

  2. 仅编码数据记录,即不嵌入模式(通过 org .apache.avro.io。{BinaryEncoder / BinaryDecoder} ;注意包名称的差异: io 这里与文件以上)。例如,当对正在写入Kafka主题的消息进行Avro编码时,这种方法通常很受欢迎,因为与上面的变体1相比,您不会产生将Avro架构重新嵌入到每个消息中的开销,假设您的(非常合理)策略是,对于相同的Kafka主题,消息使用相同的Avro架构进行格式化/编码。这是一个显着的优点,因为在流数据上下文中,动态数据记录通常比上面描述的静态数据Avro文件小得多(通常在100字节到几百KB之间)(通常是数百或者数千MB);因此,Avro架构的大小相对较大,因此在向Kafka写入2000个数据记录时,您不希望将其嵌入2000x。缺点是您必须以某种方式跟踪Avro架构如何映射到Kafka主题 - 或者更确切地说,您必须以某种方式跟踪编码消息的Avro架构,而不必直接嵌入架构的路径。好消息是,有一个工具可用于Kafka生态系统(Avro架构注册表)透明。因此,与变体1相比,变体2在提高便利性的同时提高了效率。

  1. For creating proper Avro files: To encode the data records but also to embed the Avro schema in a kind of preamble (via org.apache.avro.file.{DataFileWriter/DataFileReader}). Embedding the schema into Avro files makes a lot of sense because (a) typically the "payload" of Avro files is orders of magnitudes larger than the embedded Avro schema and (b) you can then copy or move those files around at your heart's content and still be sure you can read them again without having to consult someone or something.
  2. To encode only the data records, i.e. to not embed the schema (via org.apache.avro.io.{BinaryEncoder/BinaryDecoder}; note the difference in the package name: io here vs. file above). This approach is often favored when Avro-encoding messages that are being written to a Kafka topic, for example, because in comparison to variant 1 above you do not incur the overhead of re-embedding the Avro schema into every single message, assuming that your (very reasonable) policy is that, for the same Kafka topic, messages are formatted/encoded with the same Avro schema. This is a significant advantage because, in a stream data context, a data-in-motion data record is typically much smaller (commonly between 100 bytes and few hundred KB) than data-at-rest Avro files as described above (often hundreds or thousands of MB); so the size of the Avro schema is relatively large, and thus you don't want to embed it 2000x when writing 2000 data records to Kafka. The drawback is that you must "somehow" track how Avro schemas map to Kafka topics -- or more precisely, you must somehow track with which Avro schema a message was encoded without going down the path of embedding the schema directly. The good news is that there is tooling available in the Kafka ecosystem (Avro schema registry) for doing this transparently. So in comparison to variant 1, variant 2 gains on efficiency at the expense of convenience.

效果是线格式对于编码的Avro数据看起来会有所不同,具体取决于您是否使用上面的(1)或(2)。

The effect is that the "wire format" for encoded Avro data will look different depending on whether you use (1) or (2) above.

我对Apache Nifi不太熟悉,但请快速查看源代码(例如 ConvertAvroToJSON.java )向我建议它使用变体1,即它将Avro模式与Avro记录一起嵌入。但是,您的消费者代码使用 DecoderFactory.get()。binaryDEcoder(),因此变体2(没有嵌入模式)。

I am not very familiar with Apache Nifi, but a quick look at the source code (e.g. ConvertAvroToJSON.java) suggests to me that it is using variant 1, i.e. it embeds the Avro schema alongside the Avro records. Your consumer code, however, uses DecoderFactory.get().binaryDecoder() and thus variant 2 (no schema embedded).

也许这解释了你遇到的错误?

Perhaps this explains the error you have been running into?

这篇关于带解码器问题的Kafka Avro Consumer的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆