Kafka Elasticsearch连接器时间戳 [英] Kafka Elasticsearch Connector Timestamps

查看:318
本文介绍了Kafka Elasticsearch连接器时间戳的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我看到对此进行了几次讨论例如,但我认为由于Elasticsearch中的重大更改,解决方案已过时。

I can see this has been discussed a few times here for instance but I think the solutions are out of date due to breaking changes in Elasticsearch.

我正在尝试转换长/历元字段在我的Kafka主题中的Json中,输入到通过连接器推送的Elasticsearch日期类型。

I'm trying to convert a long/epoch field in my Json in my Kafka topic to an Elasticsearch date type which is pushed through the connector.

当我尝试添加动态映射时,我的Kafka connect更新失败,因为Im试图将两个映射应用于字段_doc和kafkaconnect。我相信这是版本6的重大变化,我相信每个索引只能有一个映射。

When I try to add a dynamic mapping, my Kafka connect updates fail because Im trying to apply two mappings to a field, _doc and kafkaconnect. This was a breaking change around version 6 I believe where you can only have one mapping per index.

{
    "index_patterns": [ "depart_details" ],
  "mappings": {
    "dynamic_templates": [
      {
        "scheduled_to_date": {
          "match":   "scheduled",
          "mapping": {
            "type": "date"
          }
        }
      } 
    ]
}}

我现在专注于尝试通过将字段更改为时间戳记,时间或日期。

I've now focussed on trying to translate the message at source in the connector by changing the field to a timestamp, time or date.

    "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.TimestampConverter.field" : "scheduled",
        "transforms.TimestampConverter.target.type": "Timestamp"

但是,我尝试通过此转换器发送的任何消息都会失败,

However, any messages I try to send through this transformer fail with

Caused by: org.apache.kafka.connect.errors.DataException: Java class class java.util.Date does not have corresponding schema type.
    at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:604)
    at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:668)
    at org.apache.kafka.connect.json.JsonConverter.convertToJsonWithoutEnvelope(JsonConverter.java:574)
    at org.apache.kafka.connect.json.JsonConverter.fromConnectData(JsonConverter.java:324)
    at io.confluent.connect.elasticsearch.DataConverter.getPayload(DataConverter.java:181)
    at io.confluent.connect.elasticsearch.DataConverter.convertRecord(DataConverter.java:163)
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.tryWriteRecord(ElasticsearchWriter.java:285)
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:270)
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:169)

似乎很普通需要做,但我不看不到如何通过版本7中的此连接器将日期或时间字段输入Elastic?

Seems like a really common thing to need to do, but I don't see how to get a date or time field into Elastic through this connector in version 7?

推荐答案

Confluent文档指出ES ES连接器 ES当前不支持 7。

The Confluent documentation states that the ES connector is currently not supported with ES 7.

根据此问题,将 type.name = kafkaconnect 更改为 type.name = _doc 可能就足够了在您的连接器配置中。

According to this issue, it might suffice to change type.name=kafkaconnect to type.name=_doc in your connector configuration.

这篇关于Kafka Elasticsearch连接器时间戳的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆