Confluent kafka 连接elasticsearch 文档ID 创建 [英] Confluent kafka connect elasticsearch document ID creation

查看:27
本文介绍了Confluent kafka 连接elasticsearch 文档ID 创建的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 confluent 来连接我的数据库和 ES,但出现异常:

I am using confluent for to connect my DB and ES getting exception as:

org.apache.kafka.connect.errors.DataException: STRUCT is not supported as the document id.
    at io.confluent.connect.elasticsearch.DataConverter.convertKey(DataConverter.java:75)
    at io.confluent.connect.elasticsearch.DataConverter.convertRecord(DataConverter.java:84)
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:210)
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:119)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

我在 kafka-connect-JDBC 中的配置是:.

name=task-view-list-stage
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10 
connection.url=jdbc:postgresql://localhost:5432/postgres?user=postgres&password=test
table.types=TABLE
query=select * from employee_master
mode=timestamp+incrementing
incrementing.column.name=employee_master_id
timestamp.column.name=modified_date
validate.non.null=false
topic.prefix=my-id-app

我的 kafka-connect Elasticsearch 配置是:

name=es-id-view
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=my-id-app
topics.key.ignore=false
transforms=InsertKey
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=employee_master_id
connection.url=http://localhost:9200
type.name=type_id

我的表结构是:

employee_master_id | emp_name | modified_date
-----------------------------------------------------------
1                  |  Bala    |  "2017-05-18 11:51:46.721182+05:30"
-------------------------------------------------------------------
2                  |  murugan | "2017-05-21 15:59:11.443901+05:30"
-------------------------------------------------------------------

请帮我解决这个问题

推荐答案

ValueToKey 一样,您需要 ExtractField 将键从对象转换为普通字段:

As well as ValueToKey you need ExtractField to convert the key from an object to a plain field:

transforms=InsertKey,ExtractId
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=employee_master_id    
transforms.ExtractId.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.ExtractId.field=employee_master_id

这篇关于Confluent kafka 连接elasticsearch 文档ID 创建的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆