Kafka连接器和架构注册表-检索Avro架构时出错-找不到主题 [英] Kafka connector and Schema Registry - Error Retrieving Avro Schema - Subject not found

查看:276
本文介绍了Kafka连接器和架构注册表-检索Avro架构时出错-找不到主题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个主题,最终将有许多不同的架构。现在,它只有一个。
我已经通过REST这样创建了一个连接作业:

I have a topic that will eventually have lots of different schemas on it. For now it just has the one. I've created a connect job via REST like this:

{
 "name":"com.mycompany.sinks.GcsSinkConnector-auth2",
 "config": {
    "connector.class": "com.mycompany.sinks.GcsSinkConnector",
    "topics": "auth.events",
    "flush.size": 3,
    "my.setting":"bar",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "key.deserializer":"org.apache.kafka.common.serialization.StringDerserializer",
    "value.converter":"io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url":"http://schema-registry-service:8081",
    "value.subject.name.strategy":"io.confluent.kafka.serializers.subject.RecordNameStrategy",
    "group.id":"account-archiver"

 }
}

然后我使用字符串键和avro序列化的有效负载将消息推送到该主题。如果我在控制中心检查主题,则会看到正确反序列化的数据通过。
尽管我在日志中看到了该连接实例的输出,但我仍然看到它

I then push a message to that topic with a string key and an avro serialized payload. If I inspect the topic in the control center I see the correctly deserialized data coming through. Looking at the ouput from the connect instance though I see this in the logs

RROR WorkerSinkTask{id=com.mycompany.sinks.GcsSinkConnector-auth2-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic auth.events to Avro:
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 7
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:226)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:319)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:307)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:158)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:271)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.schemaVersion(AbstractKafkaAvroDeserializer.java:184)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:153)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:215)
    at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:145)
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:90)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

您可以从此处看到两个相关问题:

You can see from here that there are two related issues:


  • 错误检索ng id为7的Avro模式

  • 未找到主题。错误代码:40401

  • Error retrieving Avro schema for id 7
  • Subject not found.; error code: 40401

我遇到的问题是我已将策略指定为RecordNameStrategy认为应该使用魔术字节来获取与主题名称相对的架构,但是找不到主题错误。我不确定它是在实际查找主题名称还是通过ID获取架构。
通过ssh-ing到connect实例并卷曲到
http:// schema-registry-service:8081 / schemas / ids / 7 我确实返回了架构。
在此堆栈跟踪上方还有一些其他日志记录,令人失望的是看起来它仍在使用错误的名称策略:

What bugs me is that I've specified the strategy to be RecordNameStrategy which I think should use the magic byte to go and get the schema as opposed to the topic name, but it errors on Subject not found. I'm not sure if it's actually looking for a subject name or getting a schema by the ID. Either way by ssh-ing to the connect instance and doing a curl to http://schema-registry-service:8081/schemas/ids/7 I do get the schema returned. There is some additional logging above this stack trace which disappointingly looks like it's still using the wrong name strategy:

INFO AvroConverterConfig values:
    schema.registry.url = [http://schema-registry-service:8081]
    basic.auth.user.info = [hidden]
    auto.register.schemas = false
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = URL
    schema.registry.basic.auth.user.info = [hidden]
    value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
    key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy

有人对如何解决这个问题有任何线索吗?我正在使用以下图像:

Does anyone have any clues about how to resolve this? I'm using the following images:


  • confluentinc / cp-kafka-connect:5.2.0

  • confluentinc / cp-kafka:5.1.0

谢谢

推荐答案

在跟踪中, lookUpSubjectVersion 表示它试图在 / subjects /:name / versions 列出的每个ID,则找不到 schemaId = 7 (注意:不是版本= 7),但找不到从日志中太清楚了它试图在这里使用什么:name ,但是如果找不到,那么您将没有找到主题发现错误。如果我的PR被接受,则主题名称会更清楚

In the trace, lookUpSubjectVersion means it tried to do a lookup under /subjects/:name/versions for each ID listed there, then could not find schemaId=7 (Note: not version=7), though not too clear from the logs what :name it is trying to use here, but if that isn't found, then you'll get your Subject not found error. If my PR was acccepted, the subject name would be more clear

我相信这可能是由于使用 RecordNameStrategy 造成的。 查看该物业的公关信息,我收集到它实际上只是针对生产者/消费者代码,而不是完全在Connect API中。与 TopicNameStrategy

I believe this might be due to using RecordNameStrategy. Looking at the PR for that property, I gathered it was really only tested against the producer/consumer code, and not throughly within the Connect API. As compared to the default behaviour of TopicNameStrategy

的默认行为相比,您可以看到它试图使用

Which, you can see it tried to use

value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy

但仔细看,我想你可能配置错误。

But looking more closely, I think you might have configured it wrong.

类似于您拥有 value.converter.schema.registry.url 的方式,实际上您需要设置<而是使用code> value.converter.value.subject.name.strategy 。

Simliar to how you have value.converter.schema.registry.url, you would actually need to set value.converter.value.subject.name.strategy instead.

这篇关于Kafka连接器和架构注册表-检索Avro架构时出错-找不到主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆