尝试使用 Kafka Connect 在 Elasticsearch 中索引 kafka 主题 [英] Trying to index kafka topic in Elasticsearch with Kafka Connect

查看:40
本文介绍了尝试使用 Kafka Connect 在 Elasticsearch 中索引 kafka 主题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想将一个主题从 avro 中的 kafka 索引到 elasticsearch 格式,但是我的时间戳字段有问题无法识别elasticsearch 作为日期格式字段.

I want to index a topic from kafka in avro to elasticsearch format but I have problems with my timestamp field to be recognized by elasticsearch as date format field.

我对连接器使用了以下配置.

I have used the following configuration for the connector.

   {
          "name": "es-sink-barchart-10",
      "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://localhost:8081",
        "value.converter.schema.registry.url": "http://localhost:8081",

        "connection.url": "http://localhost:9200",

        "type.name":"type.name=kafka-connect",

        "topics": "exchange_avro_01",

        "topic.index.map": "exchange_avro_01:exchange_barchart",

        "key.ignore": "true"
     }
    }

原始字段是 bigint 类型,我希望目标字段是具有任何有效格式的日期类型和 elasticsearch.我定义了一个动态模板,尝试通过以下方式解决:

The original field is bigint type and I want the target field to be date type with any valid format with elasticsearch. I have defined a dynamic template to try to solve it in the following way:

curl -XPUT "http://localhost:9200/_template/kafkaconnect/" -H 'Content-Type: application/json' -d'
{
  "index_patterns": "exchange*",
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0
  },
  "mappings": {
    "kafka-connect": {
      "dynamic_templates": [
    {
          "dates": {
        "match_mapping_type": "long",
            "match": "TIME",
            "mapping": {
              "type": "date",
          "format": "yyyy-MM-dd HH:mm:ss"
            }
          }
        }
      ]
     ,
      "properties": {
          "CLOSE": {
            "type": "double"
          },
         .
         .
         .
        }
      }

    }
  }
}'

当我加载上述连接器时,没有任何内容被索引到 elasticsearch.

When I load the connector described above nothing is indexed to elasticsearch.

有什么帮助吗?

推荐答案

如果你的源代码是一个 bigint,那么它可能是一个 epoch.如果它是一个纪元,那么这将不起作用:

If your source is a bigint then presumably it's an epoch. If it's an epoch, then this won't work:

"mapping": {
      "type": "date",
      "format": "yyyy-MM-dd HH:mm:ss"
        }

因为你告诉 Elasticsearch 日期格式是 yyyy-MM-dd HH:mm:ss(它不是).

because you're telling Elasticsearch that the date format is yyyy-MM-dd HH:mm:ss (which it isn't).

因此,请尝试此操作(暂时省略您的自定义映射;先使其正常工作,然后再将其添加回来):

So instead, try this (omitting your custom mapping for the moment; get this working first and then add that back in):

{
  "index_patterns": "exchange*",
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0
  },
  "mappings": {
    "kafka-connect": {
      "dynamic_templates": [
        {
          "dates": {
            "match": "TIME",
            "mapping": {
              "type": "date"
            } } } ] } } }

另请参考:https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-field-mapping.html#date-detection

没有索引到elasticsearch.

nothing is indexed to elasticsearch.

检查 Kafka Connect 工作日志和 Elasticsearch 日志是否有任何错误.

Check the Kafka Connect worker log and the Elasticsearch log for any errors.

这篇关于尝试使用 Kafka Connect 在 Elasticsearch 中索引 kafka 主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆