Kafka Elasticsearch连接器时间戳 [英] Kafka Elasticsearch Connector Timestamps
问题描述
我看到对此进行了几次讨论例如,但我认为由于Elasticsearch中的重大更改,解决方案已过时。
I can see this has been discussed a few times here for instance but I think the solutions are out of date due to breaking changes in Elasticsearch.
我正在尝试转换长/历元字段在我的Kafka主题中的Json中,输入到通过连接器推送的Elasticsearch日期类型。
I'm trying to convert a long/epoch field in my Json in my Kafka topic to an Elasticsearch date type which is pushed through the connector.
当我尝试添加动态映射时,我的Kafka connect更新失败,因为Im试图将两个映射应用于字段_doc和kafkaconnect。我相信这是版本6的重大变化,我相信每个索引只能有一个映射。
When I try to add a dynamic mapping, my Kafka connect updates fail because Im trying to apply two mappings to a field, _doc and kafkaconnect. This was a breaking change around version 6 I believe where you can only have one mapping per index.
{
"index_patterns": [ "depart_details" ],
"mappings": {
"dynamic_templates": [
{
"scheduled_to_date": {
"match": "scheduled",
"mapping": {
"type": "date"
}
}
}
]
}}
我现在专注于尝试通过将字段更改为时间戳记,时间或日期。
I've now focussed on trying to translate the message at source in the connector by changing the field to a timestamp, time or date.
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.field" : "scheduled",
"transforms.TimestampConverter.target.type": "Timestamp"
但是,我尝试通过此转换器发送的任何消息都会失败,
However, any messages I try to send through this transformer fail with
Caused by: org.apache.kafka.connect.errors.DataException: Java class class java.util.Date does not have corresponding schema type.
at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:604)
at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:668)
at org.apache.kafka.connect.json.JsonConverter.convertToJsonWithoutEnvelope(JsonConverter.java:574)
at org.apache.kafka.connect.json.JsonConverter.fromConnectData(JsonConverter.java:324)
at io.confluent.connect.elasticsearch.DataConverter.getPayload(DataConverter.java:181)
at io.confluent.connect.elasticsearch.DataConverter.convertRecord(DataConverter.java:163)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.tryWriteRecord(ElasticsearchWriter.java:285)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:270)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:169)
似乎很普通需要做,但我不看不到如何通过版本7中的此连接器将日期或时间字段输入Elastic?
Seems like a really common thing to need to do, but I don't see how to get a date or time field into Elastic through this connector in version 7?
推荐答案
Confluent文档指出ES ES连接器 ES当前不支持 7。
The Confluent documentation states that the ES connector is currently not supported with ES 7.
根据此问题,将 type.name = kafkaconnect
更改为 type.name = _doc
可能就足够了在您的连接器配置中。
According to this issue, it might suffice to change type.name=kafkaconnect
to type.name=_doc
in your connector configuration.
这篇关于Kafka Elasticsearch连接器时间戳的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!