Kafka 连接器和架构注册表 - 检索 Avro 架构时出错 - 未找到主题 [英] Kafka connector and Schema Registry - Error Retrieving Avro Schema - Subject not found
问题描述
我有一个最终会有很多不同模式的主题.现在它只有一个.我已经通过 REST 创建了一个连接作业,如下所示:
I have a topic that will eventually have lots of different schemas on it. For now it just has the one. I've created a connect job via REST like this:
{
"name":"com.mycompany.sinks.GcsSinkConnector-auth2",
"config": {
"connector.class": "com.mycompany.sinks.GcsSinkConnector",
"topics": "auth.events",
"flush.size": 3,
"my.setting":"bar",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.deserializer":"org.apache.kafka.common.serialization.StringDerserializer",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://schema-registry-service:8081",
"value.subject.name.strategy":"io.confluent.kafka.serializers.subject.RecordNameStrategy",
"group.id":"account-archiver"
}
}
然后我使用字符串键和 avro 序列化负载向该主题推送消息.如果我在控制中心检查主题,我会看到正确的反序列化数据.尽管我在日志中看到了这一点,但查看了连接实例的输出
I then push a message to that topic with a string key and an avro serialized payload. If I inspect the topic in the control center I see the correctly deserialized data coming through. Looking at the ouput from the connect instance though I see this in the logs
RROR WorkerSinkTask{id=com.mycompany.sinks.GcsSinkConnector-auth2-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic auth.events to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 7
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:226)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:319)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:307)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:158)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:271)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.schemaVersion(AbstractKafkaAvroDeserializer.java:184)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:153)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:215)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:145)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
从这里可以看出有两个相关的问题:
You can see from here that there are two related issues:
检索 id 7 的 Avro 架构时出错
未找到主题.;错误代码:40401
让我感到困扰的是,我已将策略指定为 RecordNameStrategy,我认为应该使用魔术字节来获取模式而不是主题名称,但它在找不到主题时出错.我不确定它实际上是在寻找主题名称还是通过 ID 获取模式.无论哪种方式,通过 ssh-ing 连接到连接实例并执行 curl 以http://schema-registry-service:8081/schemas/ids/7
我确实得到了返回的模式.在此堆栈跟踪上方有一些额外的日志记录,令人失望的是,它似乎仍在使用错误的名称策略:
What bugs me is that I've specified the strategy to be RecordNameStrategy which I think should use the magic byte to go and get the schema as opposed to the topic name, but it errors on Subject not found. I'm not sure if it's actually looking for a subject name or getting a schema by the ID.
Either way by ssh-ing to the connect instance and doing a curl to
http://schema-registry-service:8081/schemas/ids/7
I do get the schema returned.
There is some additional logging above this stack trace which disappointingly looks like it's still using the wrong name strategy:
INFO AvroConverterConfig values:
schema.registry.url = [http://schema-registry-service:8081]
basic.auth.user.info = [hidden]
auto.register.schemas = false
max.schemas.per.subject = 1000
basic.auth.credentials.source = URL
schema.registry.basic.auth.user.info = [hidden]
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
有没有人知道如何解决这个问题?我正在使用以下图片:
Does anyone have any clues about how to resolve this? I'm using the following images:
- confluentinc/cp-kafka-connect:5.2.0
- confluentinc/cp-kafka:5.1.0
谢谢
推荐答案
在跟踪中,lookUpSubjectVersion
表示它试图在 /subjects/:name/versions
下进行查找> 对于那里列出的每个 ID,然后找不到 schemaId=7
(注意:not version=7),尽管从日志中不太清楚 :name 是什么
它试图在这里使用,但如果没有找到,那么您将收到 Subject not found
错误.如果我的PR被接受,主题名称会更清楚
In the trace, lookUpSubjectVersion
means it tried to do a lookup under /subjects/:name/versions
for each ID listed there, then could not find schemaId=7
(Note: not version=7), though not too clear from the logs what :name
it is trying to use here, but if that isn't found, then you'll get your Subject not found
error. If my PR was acccepted, the subject name would be more clear
我相信这可能是由于使用了 RecordNameStrategy
.查看该属性的 PR,我认为它实际上仅针对生产者/消费者代码,而不是完全在 Connect API 中.与 TopicNameStrategy
I believe this might be due to using RecordNameStrategy
. Looking at the PR for that property, I gathered it was really only tested against the producer/consumer code, and not throughly within the Connect API. As compared to the default behaviour of TopicNameStrategy
哪个,你可以看到它尝试使用
Which, you can see it tried to use
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
但是仔细观察,我认为您可能配置错误.
But looking more closely, I think you might have configured it wrong.
类似于您拥有 value.converter.schema.registry.url
的方式,您实际上需要设置 value.converter.value.subject.name.strategy
.
Simliar to how you have value.converter.schema.registry.url
, you would actually need to set value.converter.value.subject.name.strategy
instead.
这篇关于Kafka 连接器和架构注册表 - 检索 Avro 架构时出错 - 未找到主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!