Apache Kafka JDBC 连接器 - SerializationException:未知魔术字节 [英] Apache Kafka JDBC Connector - SerializationException: Unknown magic byte
问题描述
我们正在尝试使用 Confluent JDBC Sink 连接器将主题中的值写回 postgres 数据库.
We are trying to write back the values from a topic to a postgres database using the Confluent JDBC Sink Connector.
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.password=xxx
tasks.max=1
topics=topic_name
auto.evolve=true
connection.user=confluent_rw
auto.create=true
connection.url=jdbc:postgresql://x.x.x.x:5432/Datawarehouse
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
我们可以使用以下命令读取控制台中的值:
We can read the value in the console using:
kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic topic_name
模式存在并且值被 kafka-avro-console-consumer
正确反序列化,因为它没有给出错误但连接器给出了这些错误:
The schema exists and the value are correctly deserialize by kafka-avro-console-consumer
because it gives no error but the connector gives those errors:
{
"name": "datawarehouse_sink",
"connector": {
"state": "RUNNING",
"worker_id": "x.x.x.x:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "x.x.x.x:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:511)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.DataException: f_machinestate_sink\n\tat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:103)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:511)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\t... 13 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1\nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n"
}
],
"type": "sink"
}
最后的错误是:
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
架构已在架构注册表中注册.
The schema is registered in the schema registry.
问题是否出在连接器的配置文件上?
Does the problem sit with the configuration file of the connector ?
推荐答案
错误 org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
意味着关于该主题的消息是Avro 无效且无法反序列化.这可能有几个原因:
The error org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
means that a message on the topic was not valid Avro and could not be deserialised. There are several reasons this could be:
有些消息是 Avro,但有些则不是.如果是这种情况,您可以使用 Kafka Connect 中的错误处理功能使用如下配置忽略无效消息:
Some messages are Avro, but others are not. If this is the case you can use the error handling capabilities in Kafka Connect to ignore the invalid messages using config like this:
"errors.tolerance": "all",
"errors.log.enable":true,
"errors.log.include.messages":true
value 是 Avro,但 key 不是.如果是这种情况,则使用适当的 key.converter
.
The value is Avro but the key isn't. If this is the case then use the appropriate key.converter
.
更多阅读:https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/
这篇关于Apache Kafka JDBC 连接器 - SerializationException:未知魔术字节的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!