Kafka 连接器和架构注册表 - 检索 Avro 架构时出错 - 未找到主题 [英] Kafka connector and Schema Registry - Error Retrieving Avro Schema - Subject not found

本文介绍了Kafka 连接器和架构注册表 - 检索 Avro 架构时出错 - 未找到主题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个最终会有很多不同模式的主题.现在它只有一个.我已经通过 REST 创建了一个连接作业,如下所示:

I have a topic that will eventually have lots of different schemas on it. For now it just has the one. I've created a connect job via REST like this:

{
 "name":"com.mycompany.sinks.GcsSinkConnector-auth2",
 "config": {
    "connector.class": "com.mycompany.sinks.GcsSinkConnector",
    "topics": "auth.events",
    "flush.size": 3,
    "my.setting":"bar",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "key.deserializer":"org.apache.kafka.common.serialization.StringDerserializer",
    "value.converter":"io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url":"http://schema-registry-service:8081",
    "value.subject.name.strategy":"io.confluent.kafka.serializers.subject.RecordNameStrategy",
    "group.id":"account-archiver"

 }
}

然后我使用字符串键和 avro 序列化负载向该主题推送消息.如果我在控制中心检查主题,我会看到正确的反序列化数据.尽管我在日志中看到了这一点,但查看了连接实例的输出

I then push a message to that topic with a string key and an avro serialized payload. If I inspect the topic in the control center I see the correctly deserialized data coming through. Looking at the ouput from the connect instance though I see this in the logs

RROR WorkerSinkTask{id=com.mycompany.sinks.GcsSinkConnector-auth2-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic auth.events to Avro:
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 7
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:226)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:319)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:307)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:158)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:271)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.schemaVersion(AbstractKafkaAvroDeserializer.java:184)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:153)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:215)
    at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:145)
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:90)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

从这里可以看出有两个相关的问题:

You can see from here that there are two related issues:

  • 检索 id 7 的 Avro 架构时出错
  • 未找到主题.;错误代码:40401

让我感到困扰的是,我已将策略指定为 RecordNameStrategy,我认为应该使用魔术字节来获取模式而不是主题名称,但它在找不到主题时出错.我不确定它实际上是在寻找主题名称还是通过 ID 获取模式.无论哪种方式,通过 ssh-ing 连接到连接实例并执行 curl 以http://schema-registry-service:8081/schemas/ids/7 我确实得到了返回的模式.在此堆栈跟踪上方有一些额外的日志记录,令人失望的是,它似乎仍在使用错误的名称策略:

What bugs me is that I've specified the strategy to be RecordNameStrategy which I think should use the magic byte to go and get the schema as opposed to the topic name, but it errors on Subject not found. I'm not sure if it's actually looking for a subject name or getting a schema by the ID. Either way by ssh-ing to the connect instance and doing a curl to http://schema-registry-service:8081/schemas/ids/7 I do get the schema returned. There is some additional logging above this stack trace which disappointingly looks like it's still using the wrong name strategy:

INFO AvroConverterConfig values:
    schema.registry.url = [http://schema-registry-service:8081]
    basic.auth.user.info = [hidden]
    auto.register.schemas = false
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = URL
    schema.registry.basic.auth.user.info = [hidden]
    value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
    key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy

有没有人知道如何解决这个问题?我正在使用以下图片:

Does anyone have any clues about how to resolve this? I'm using the following images:

  • confluentinc/cp-kafka-connect:5.2.0
  • confluentinc/cp-kafka:5.1.0

谢谢

推荐答案

在跟踪中,lookUpSubjectVersion 表示它试图在 /subjects/:name/versions 下进行查找> 对于那里列出的每个 ID,然后找不到 schemaId=7(注意:not version=7),尽管从日志中不太清楚 :name 是什么 它试图在这里使用,但如果没有找到,那么您将收到 Subject not found 错误.如果我的PR被接受,主题名称会更清楚

In the trace, lookUpSubjectVersion means it tried to do a lookup under /subjects/:name/versions for each ID listed there, then could not find schemaId=7 (Note: not version=7), though not too clear from the logs what :name it is trying to use here, but if that isn't found, then you'll get your Subject not found error. If my PR was acccepted, the subject name would be more clear

我相信这可能是由于使用了 RecordNameStrategy.查看该属性的 PR,我认为它实际上仅针对生产者/消费者代码,而不是完全在 Connect API 中.与 TopicNameStrategy

I believe this might be due to using RecordNameStrategy. Looking at the PR for that property, I gathered it was really only tested against the producer/consumer code, and not throughly within the Connect API. As compared to the default behaviour of TopicNameStrategy

哪个,你可以看到它尝试使用

Which, you can see it tried to use

value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy

但是仔细观察,我认为您可能配置错误.

But looking more closely, I think you might have configured it wrong.

类似于您拥有 value.converter.schema.registry.url 的方式,您实际上需要设置 value.converter.value.subject.name.strategy.

Simliar to how you have value.converter.schema.registry.url, you would actually need to set value.converter.value.subject.name.strategy instead.

这篇关于Kafka 连接器和架构注册表 - 检索 Avro 架构时出错 - 未找到主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆