Kafka CONNECT|无法反序列化主题的数据|检索id|Subject的Avro键/值架构版本时出错错误代码:40401 [英] Kafka connect | Failed to deserialize data for topic | Error retrieving Avro key / value schema version for id | Subject not found error code: 40401

查看:20
本文介绍了Kafka CONNECT|无法反序列化主题的数据|检索id|Subject的Avro键/值架构版本时出错错误代码:40401的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

首先感谢@OneCricketeer到目前为止的支持。到目前为止,我已经尝试了这么多配置,我不知道还可以尝试什么。

使用合流connect-standalone worker.properties sink.properties访问外部流。

连接正常,我可以看到已加载偏移量:

INFO[MY_MYSQL_SINK|TASK-0][消费者客户端ID=连接器-消费者-MY_MYSQL_SINK-0,GroupID=CONNECT-MY_MY_SINK]将分区gamerboot.gamer.master.workouts.clubs.spieleranalyse-1的偏移量设置为提交的偏移量提取位置{ 偏移量=2225,偏移量纪元=Optional.Empty,currentLeader=LeaderAndEpoch{leader=Optional[kafka8.pro.someurl.net:9093(id:8机架:空)],纪元=0}}(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:844)

但之后当新消息传入时我会收到一个错误:

错误[MY_MYSQL_SINK|TASK-0]WorkerSinkTASK{id=MY_MYSQL_SINK-0}转换主题分区1中偏移量为2225、时间戳为1641459346507的消息键时出错:无法将主题gamerboot.gamer.master.workouts.clubs.spieleranalyse的数据反序列化为AVRO:
原因:org.apache.kafka.common.errors.SerializationException:检索ID的Avro密钥架构版本时出错422

原因:找不到io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:主题;错误代码:40401

我不明白。

work.properties:

key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter

sink.properties

#key.converter.enhanced.avro.schema.support=true
#key.converter=org.apache.kafka.connect.storage.StringConverter

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=https://schema-reg.pro.someurl.net

#value.converter=org.apache.kafka.connect.storage.StringConverter

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=https://schema-reg.pro.someurl.net

#key.converter.key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
#value.converter.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
#key.converter.key.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
#value.converter.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy

#pk.mode=record_key
#pk.fields=

因为MySQL中没有设置主键,所以我想记录流中的所有内容。

正如它所说的,检索ID 422的Avro密钥架构版本时出错我可以看到以下内容:

screenshot_subject_id

请不要奇怪,因为它说的是JSON,这只是我的ChromePlugin,它将其解释为json。 价值也是如此。我还尝试了sink.properties中的每一种组合,这是在那里注释的。 我还可以卷曲最新的键和值模式(如):

curl-s https://schema-reg.pro.someurl.net/subjects/gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutKey/versions/latest|jq

{
  "type": "record",
  "name": "ClubWorkoutKey",
  "namespace": "com.ad.gamerboot.kafka.models.workouts",
  "fields": [
    {
      "name": "playerId",
      "type": "string"
    },
    {
      "name": "tagId",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ]
}

当我在sink.properties中为key.Converter和value.Converter输入字符串Converter时,它更进一步。但在我看来,肯定是错的,因为阿夫罗是从这里传过来的。如果使用的是字符串,那么就会出现其他问题,我必须设置一个PK并打开删除等操作。

感谢您的支持。

*编辑:

所以,有人给了我:

topic = gamerboot.gamer.master.workouts.clubs.spieleranalyse

schema.url = https://schema-reg.pro.someurl.net

以及:架构ID url:

 https://schema-reg.pro.someurl.net/subjects/gamerboot.gamer.master.workouts-com.ad.gamerboot.kafka.models.workouts.WorkoutKickValue/versions/latest/schema

和:

https://schema-reg.pro.someurl.net/subjects/gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutKickValue/versions/latest

对我来说,这就像一个谜,我20天前就开始玩卡夫卡了。从那里我试了试URL,找到了我为主题发布的URL:

对于键:https://schema-reg.pro.someurl.net/subjects/gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutKey/versions/latest/

架构:{"subject":"gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutKey","version":1,"id":422,"schema":"{"type":"record","name":"ClubWorkoutKey","namespace":"com.ad.gamerboot.kafka.models.workouts","fields":[{"name":"playerId","type":"string"},{"name":"tagId","type":["null","string"],"default":null}]}"}

对于值:https://schema-reg.pro.someurl.net/subjects/gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutKickValue/versions/latest/

https://schema-reg.pro.someurl.net/subjects/gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutPlayerMotionValue/versions/latest/

架构:{"subject":"gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutKickValue","version":1,"id":423,"schema":"{"type":"record","name":"ClubWorkoutKickValue","namespace":"com.ad.gamerboot.kafka.models.workouts","fields":[{"name":"playerId","type":"string"},{"name":"timestamp","type":{"type":"long","logicalType":"timestamp-millis"}},{"name":"tagId","type":["null","string"],"default":null},{"name":"ballSpeed","type":["null","int"],"default":null},{"name":"ballSpeedFloat","type":["null","float"],"default":null},{"name":"ballSpeedZone","type":{"type":"enum","name":"BallSpeedZone","symbols":["COLD","MEDIUM","HOT","FIRE","INVALID"]}},{"name":"confidence","type":["null","int"],"default":null},{"name":"ingestionTime","type":["null",{"type":"long","logicalType":"timestamp-millis"}],"default":null}]}"}

和:{"subject":"gamerboot.gamer.master.club-com.ad.gamerboot.kafka.models.workouts.WorkoutPlayerMotionValue","version":1,"id":424,"schema":"{"type":"record","name":"ClubWorkoutPlayerMotionValue","namespace":"com.ad.gamerboot.kafka.models.workouts","fields":[{"name":"playerId","type":"string"},{"name":"timestamp","type":{"type":"long","logicalType":"timestamp-millis"}},{"name":"absoluteDistance","type":"float"},{"name":"averageSpeed","type":"float"},{"name":"peakSpeed","type":"float"},{"name":"tagId","type":["null","string"],"default":null},{"name":"installationId","type":["null","string"],"default":null},{"name":"averageSpeedZone","type":["null",{"type":"enum","name":"AverageSpeedZone","symbols":["SPRINT","HIGH_SPEED_RUN","RUN","JOG","WALK","STAND","INVALID"]}],"default":null,"aliases":["speedZone"]},{"name":"peakSpeedZone","type":["null",{"type":"enum","name":"PeakSpeedZone","symbols":["SPRINT","HIGH_SPEED_RUN","RUN","JOG","WALK","STAND","INVALID"]}],"default":null},{"name":"ingestionTime","type":["null",{"type":"long","logicalType":"timestamp-millis"}],"default":null}]}"}

MySQL表:

+------------------+----------------------------------------------------------------------+------+-----+---------+-------+
| Field            | Type                                                                 | Null | Key | Default | Extra |
+------------------+----------------------------------------------------------------------+------+-----+---------+-------+
| playerid         | varchar(100)                                                         | YES  |     | NULL    |       |
| timestamp        | mediumtext                                                           | YES  |     | NULL    |       |
| absoluteDistance | float                                                                | YES  |     | NULL    |       |
| avarageSpeed     | float                                                                | YES  |     | NULL    |       |
| peakSpeed        | float                                                                | YES  |     | NULL    |       |
| tagId            | varchar(50)                                                          | YES  |     | NULL    |       |
| installationId   | varchar(100)                                                         | YES  |     | NULL    |       |
| averageSpeedZone | enum('SPRINT','HIGH_SPEED_RUN','RUN','JOG','WALK','STAND','INVALID') | YES  |     | NULL    |       |
| peakSpeedZone    | enum('SPRINT','HIGH_SPEED_RUN','RUN','JOG','WALK','STAND','INVALID') | YES  |     | NULL    |       |
| ballSpeed        | int(11)                                                              | YES  |     | NULL    |       |
| ballSpeedFloat   | float                                                                | YES  |     | NULL    |       |
| ballSpeedZone    | enum('COLD','MEDIUM','HOT','FIRE','INVALID')                         | YES  |     | NULL    |       |
| confidence       | int(11)                                                              | YES  |     | NULL    |       |
| ingestionTime    | mediumtext                                                           | YES  |     | NULL    |       |
+------------------+----------------------------------------------------------------------+------+-----+---------+-------+

MySQL中需要的数据:

+--------------------------------------+---------------+------------------+--------------+-----------+----------------+----------------+------------------+---------------+-----------+----------------+---------------+------------+---------------+
| playerid                             | timestamp     | absoluteDistance | avarageSpeed | peakSpeed | tagId          | installationId | averageSpeedZone | peakSpeedZone | ballSpeed | ballSpeedFloat | ballSpeedZone | confidence | ingestionTime |
+--------------------------------------+---------------+------------------+--------------+-----------+----------------+----------------+------------------+---------------+-----------+----------------+---------------+------------+---------------+
| 59a70d45-5c00-4bb6-966d-b961b78ef5c1 | 1641495873505 |          5.76953 |       1.1543 |   1.22363 | 0104FLHBN009XD | null           | WALK             | WALK          |      NULL |           NULL | NULL          |       NULL | 1641496586458 |
| 59a70d45-5c00-4bb6-966d-b961b78ef5c1 | 1641484677624 |             NULL |         NULL |      NULL | 0104FLHBN009XD | NULL           | NULL             | NULL          |        37 |        37.0897 | COLD          |         77 | 1641484896747 |
+--------------------------------------+---------------+------------------+--------------+-----------+----------------+----------------+------------------+---------------+-----------+----------------+---------------+------------+---------------+

来自avro-控制台的数据库条目数据如下:

{"playerId":"59a70d45-5c00-4bb6-966d-b961b78ef5c1","timestamp":1641484677624,"tagId":{"string":"0104FLHBN009XD"},"ballSpeed":{"int":37},"ballSpeedFloat":{"float":37.08966},"ballSpeedZone":"COLD","confidence":{"int":77},"ingestionTime":{"long":1641484896747}}

{"playerId":"59a70d45-5c00-4bb6-966d-b961b78ef5c1","timestamp":1641495873505,"absoluteDistance":5.7695312,"averageSpeed":1.1542969,"peakSpeed":1.2236328,"tagId":{"string":"0104FLHBN009XD"},"installationId":null,"averageSpeedZone":{"com.ad.gamerboot.kafka.models.workouts.AverageSpeedZone":"WALK"},"peakSpeedZone":{"com.ad.gamerboot.kafka.models.workouts.PeakSpeedZone":"WALK"},"ingestionTime":{"long":1641496586458}}

这是一个全新的实际合流安装。几个小时前,我将avro更新为:kafka-Connect-avro-Converter:7.0.1

推荐答案

公司更改了有关RecordNameStrategy的架构。现在一切正常。

谢谢

这篇关于Kafka CONNECT|无法反序列化主题的数据|检索id|Subject的Avro键/值架构版本时出错错误代码:40401的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆