Kafka JDBC Sink连接器为具有架构(具有可选字段)的消息提供了空指针异常 [英] Kafka JDBC Sink Connector gives a Null Pointer Exception for message with schema having an optional field
问题描述
Kafka JDBC接收器连接器为带有架构的消息提供了一个空指针异常,该架构的可选字段为'parentId'.我错过了什么吗?我正在使用开箱即用的JSONConverter和JDBC Sink Connector
Kafka JDBC Sink Connector gives a Null Pointer Exception for a message with schema having an optional field here 'parentId'. Have I missed anything? I am using out of the box JSONConverter and JDBC Sink Connector
关于Kafka主题的消息是
A message on Kafka topic is
{
"schema":{
"type":"struct",
"fields":[
{
"field":"id",
"type":"string"
},
{
"field":"type",
"type":"string"
},
{
"field":"eventId",
"type":"string"
},
{
"field":"parentId",
"type":"string",
"optional":true
},
{
"field":"created",
"type":"int64",
"name":"org.apache.kafka.connect.data.Timestamp",
"version":1
}
]
},
"payload":{
"id":"asset-1",
"type":"parcel",
"eventId":"evt-1",
"created":1501834166000
}
}
连接器具有这些属性
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.password=admin
topics=asset-topic
tasks.max=1
batch.size=1
auto.evolve=true
connection.user=admin
auto.create=true
connection.url=jdbc:postgresql://postgres-db:5432/fabricdb
value.converter=org.apache.kafka.connect.json.JsonConverter
pk.mode=record_value
pk.fields=id
但是JDBC Sink连接器无法将可选字段parentId设置为
But JDBC Sink Connector is failing for the optional field parentId as
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:698)
at org.apache.kafka.connect.json.JsonConverter.access$000(JsonConverter.java:61)
at org.apache.kafka.connect.json.JsonConverter$12.convert(JsonConverter.java:181)
at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:742)
at org.apache.kafka.connect.json.JsonConverter.jsonToConnect(JsonConverter.java:361)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:350)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:514)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
推荐答案
根据字段的JsonConverter
值的源代码,标记为optional
的字段必须位于消息有效负载中.
According to source code of JsonConverter
value for fields, that are marked as optional
has to be in message payload.
您可以在JsonConverter
类的转换jsonValue to
Object`
You can find in JsonConverter
class in method that converts jsonValue to
Object`
private static Object convertToConnect(Schema schema, JsonNode jsonValue) {
final Schema.Type schemaType;
if (schema != null) {
schemaType = schema.type();
if (jsonValue.isNull()) {
if (schema.defaultValue() != null)
return schema.defaultValue(); // any logical type conversions should already have been applied
if (schema.isOptional())
return null;
throw new DataException("Invalid null value for required " + schemaType + " field");
}
}
如果您的情况下存在模式,则摘要为:
Summaries if schema is present in your case it is:
{
"field":"parentId",
"type":"string",
"optional":true
}
值必须在消息有效负载中.可以为空,但必须为
Value has to be in the message payload. It can be null, but it has to be.
如果您查看其他站点的代码.代码,负责序列化.会添加NullNode
用作空引用.
If you look at code from other site. Code, that is responsible for serialization. It adds NullNode
for null reference.
private static JsonNode convertToJson(Schema schema, Object logicalValue) {
if (logicalValue == null) {
if (schema == null) // Any schema is valid and we don't have a default, so treat this as an optional schema
return null;
if (schema.defaultValue() != null)
return convertToJson(schema, schema.defaultValue());
if (schema.isOptional())
return JsonNodeFactory.instance.nullNode();
throw new DataException("Conversion error: null value for field that is required and has no default value");
}
这篇关于Kafka JDBC Sink连接器为具有架构(具有可选字段)的消息提供了空指针异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!