Kafka消费者.Net'协议消息结束组标记与预期标记不匹配。' [英] kafka consumer .net 'Protocol message end-group tag did not match expected tag.'

查看:0
本文介绍了Kafka消费者.Net'协议消息结束组标记与预期标记不匹配。'的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从卡夫卡读取数据,如您所见:

var config = new ConsumerConfig
{
    BootstrapServers = ""*******,
    GroupId = Guid.NewGuid().ToString(),
    AutoOffsetReset = AutoOffsetReset.Earliest
};
MessageParser<AdminIpoChange> parser = new(() => new AdminIpoChange());
using (var consumer = new ConsumerBuilder<Ignore, byte[]>(config).Build())
{

    consumer.Subscribe("AdminIpoChange");

    while (true)
    {
        AdminIpoChange item = new AdminIpoChange();
            var cr = consumer.Consume();
    
            item = parser.ParseFrom(new ReadOnlySpan<byte>(cr.Message.Value).ToArray());
    }

    consumer.Close();
}

我使用google protobuf发送和接收数据。此代码在解析器行中返回以下错误:

 KafkaConsumer.ConsumeAsync: Protocol message end-group tag did not match expected tag.
Google.Protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.
   at Google.Protobuf.ParsingPrimitivesMessages.CheckLastTagWas(ParserInternalState& state, UInt32 expectedTag)
   at Google.Protobuf.ParsingPrimitivesMessages.ReadGroup(ParseContext& ctx, Int32 fieldNumber, UnknownFieldSet set)
   at Google.Protobuf.UnknownFieldSet.MergeFieldFrom(ParseContext& ctx)
   at Google.Protobuf.UnknownFieldSet.MergeFieldFrom(UnknownFieldSet unknownFields, ParseContext& ctx)
   at AdminIpoChange.pb::Google.Protobuf.IBufferMessage.InternalMergeFrom(ParseContext& input) in D:MofidProjectdomainobjDebug
et6.0ProtosRlcAdminIpoChange.cs:line 213
   at Google.Protobuf.ParsingPrimitivesMessages.ReadRawMessage(ParseContext& ctx, IMessage message)
   at Google.Protobuf.CodedInputStream.ReadRawMessage(IMessage message)
   at AdminIpoChange.MergeFrom(CodedInputStream input) in D:MofidProjectdomainobjDebug
et6.0ProtosRlcAdminIpoChange.cs:line 188
   at Google.Protobuf.MessageExtensions.MergeFrom(IMessage message, Byte[] data, Boolean discardUnknownFields, ExtensionRegistry registry)
   at Google.Protobuf.MessageParser`1.ParseFrom(Byte[] data)
   at infrastructure.Queue.Kafka.KafkaConsumer.ConsumeCarefully[T](Func`2 consumeFunc, String topic, String group) in D:MofidProjectinfrastructureQueueKafkaKafkaConsumer.cs:line 168

D:MofidProjectmts.consumer.plusinDebug
et6.0mts.consumer.plus.exe (process 15516) exited with code -1001.
To automatically close the console when debugging stops, enable Tools->Options->Debugging->Automatically close the console when debugging stops.'

更新:

我的样本数据来自卡夫卡:

 - {"SymbolName":"u0641u062Fu0631","SymbolIsin":"IRo3pzAZ0002","Date":"1400/12/15","Time":"08:00-12:00","MinPrice":17726,"MaxPrice":21666,"Share":1000,"Show":false,"Operation":0,"Id":"100d8e0b54154e9d902054bff193e875","CreateDateTime":"2022-02-26T09:47:20.0134757+03:30"}

我的RLC型号:

syntax = "proto3";

message AdminIpoChange
{
 string Id =1;
 string SymbolName =2;
 string SymbolIsin =3;
 string Date =4;
 string Time=5;
 double MinPrice =6;
 double MaxPrice =7;
 int32 Share =8;
 bool Show =9;
 int32 Operation =10;
 string  CreateDateTime=11;
enum AdminIpoOperation
{
    Add = 0;
    Edit = 1;
    Delete = 2;
}

}

我的数据(字节):

7B 22 53 79 6D 62 6F 6C 4E 61 6D 65 22 3A 22 5C 75 30 36 34 31 5C 75 30 36 32 46 5C 75 30 
36 33 31 22 2C 22 53 79 6D 62 6F 6C 49 73 69 6E 22 3A 22 49 52 6F 33 70 7A 41 5A 30 30 30 
32 22 2C 22 44 61 74 65 22 3A 22 31 34 30 30 2F 31 32 2F 31 35 22 2C 22 54 69 6D 65 22 3A 
22 30 38 3A 30 30 2D 31 32 3A 30 30 22 2C 22 4D 69 6E 50 72 69 63 65 22 3A 31 37 37 32 36 
2C 22 4D 61 78 50 72 69 63 65 22 3A 32 31 36 36 36 2C 22 53 68 61 72 65 22 3A 31 30 30 30 
2C 22 53 68 6F 77 22 3A 66 61 6C 73 65 2C 22 4F 70 65 72 61 74 69 6F 6E 22 3A 30 2C 22 49 
64 22 3A 22 31 30 30 64 38 65 30 62 35 34 31 35 34 65 39 64 39 30 32 30 35 34 62 66 66 31 
39 33 65 38 37 35 22 2C 22 43 72 65 61 74 65 44 61 74 65 54 69 6D 65 22 3A 22 32 30 32 32 
2D 30 32 2D 32 36 54 30 39 3A 34 37 3A 32 30 2E 30 31 33 34 37 35 37 2B 30 33 3A 33 30 22 
7D 

推荐答案

数据肯定不是协议二进制;字节0开始一个字段号为15的组;组内是:

  • 字段4,字符串
  • 字段13,已修复32
  • 第6字段,varint
  • 字段%12,已修复%32
  • 第6字段,varint

在此之后(在字节151),遇到字段编号为6的端组令牌

关于这一点有很多惊人的事情:

  1. 您的模式不使用组(事实上,现在很难在文档中找到组的存在),所以...这一切看起来都不对劲
  2. 结束组令牌始终需要与最后一个开始组字段号匹配,但它不匹配
  3. 单个级别中的字段通常(尽管作为&Q;应该&Q;,而不是A&Q;必须&Q;)按数字顺序编写
  4. 您没有声明字段12或13
  5. 您的字段6的类型不正确-此处需要修复64,但得到的是varint
所以:毫无疑问:这些数据是……不是你想的那样。它当然不是有效的协议二进制文件。在不知道数据是如何存储的情况下,我们只能猜测,但凭直觉:让我们尝试将其解码为UTF8,看看它是什么样子:

{"SymbolName":"u0641u062Fu0631","SymbolIsin":"IRo3pzAZ0002","Date":"1400/12/15","Time":"08:00-12:00","MinPrice":17726,"MaxPrice":21666,"Share":1000,"Show":false,"Operation":0,"Id":"100d8e0b54154e9d902054bff193e875","CreateDateTime":"2022-02-26T09:47:20.0134757+03:30"}

或(格式化)

{ 
 "SymbolName":"u0641u062Fu0631",
  "SymbolIsin":"IRo3pzAZ0002",
  "Date":"1400/12/15",
  "Time":"08:00-12:00", 
  "MinPrice":17726,
  "MaxPrice":21666,
  "Share":1000,
  "Show":false,
  "Operation":0,
  "Id":"100d8e0b54154e9d902054bff193e875",
  "CreateDateTime":"2022-02-26T09:47:20.0134757+03:30"
}

糟糕!您已经将数据编写为JSON,并尝试将其解码为二进制协议。相反,将其解码为JSON,应该就没问题了。如果这是使用Protobuf JSON API编写的:使用Protobuf JSON API对其进行解码。

这篇关于Kafka消费者.Net&#39;协议消息结束组标记与预期标记不匹配。&#39;的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆