Apache Kafka JDBC 连接器 - SerializationException:未知魔术字节 [英] Apache Kafka JDBC Connector - SerializationException: Unknown magic byte

查看:22
本文介绍了Apache Kafka JDBC 连接器 - SerializationException:未知魔术字节的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在尝试使用 Confluent JDBC Sink 连接器将主题中的值写回 postgres 数据库.

We are trying to write back the values from a topic to a postgres database using the Confluent JDBC Sink Connector.

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.password=xxx
tasks.max=1
topics=topic_name
auto.evolve=true
connection.user=confluent_rw
auto.create=true
connection.url=jdbc:postgresql://x.x.x.x:5432/Datawarehouse
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081

我们可以使用以下命令读取控制台中的值:

We can read the value in the console using:

kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic topic_name

模式存在并且值被 kafka-avro-console-consumer 正确反序列化,因为它没有给出错误但连接器给出了这些错误:

The schema exists and the value are correctly deserialize by kafka-avro-console-consumer because it gives no error but the connector gives those errors:

  {
  "name": "datawarehouse_sink",
  "connector": {
    "state": "RUNNING",
    "worker_id": "x.x.x.x:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "x.x.x.x:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:511)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.DataException: f_machinestate_sink\n\tat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:103)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:511)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\t... 13 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1\nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n"
    }
  ],
  "type": "sink"
}

最后的错误是:

org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

架构已在架构注册表中注册.

The schema is registered in the schema registry.

问题是否出在连接器的配置文件上?

Does the problem sit with the configuration file of the connector ?

推荐答案

错误 org.apache.kafka.common.errors.SerializationException: Unknown magic byte! 意味着关于该主题的消息是Avro 无效且无法反序列化.这可能有几个原因:

The error org.apache.kafka.common.errors.SerializationException: Unknown magic byte! means that a message on the topic was not valid Avro and could not be deserialised. There are several reasons this could be:

  1. 有些消息是 Avro,但有些则不是.如果是这种情况,您可以使用 Kafka Connect 中的错误处理功能使用如下配置忽略无效消息:

  1. Some messages are Avro, but others are not. If this is the case you can use the error handling capabilities in Kafka Connect to ignore the invalid messages using config like this:

"errors.tolerance": "all",
"errors.log.enable":true,
"errors.log.include.messages":true

  • value 是 Avro,但 key 不是.如果是这种情况,则使用适当的 key.converter.

  • The value is Avro but the key isn't. If this is the case then use the appropriate key.converter.

    更多阅读:https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/

    这篇关于Apache Kafka JDBC 连接器 - SerializationException:未知魔术字节的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

  • 查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆