Kafka Connect topic.key.ignore无法正常工作 [英] Kafka Connect topic.key.ignore not works as expected
问题描述
据我从kafka连接文档中了解,此配置应忽略metricbeat和filebeat主题的键,但不能忽略警报的键.但是kafka connect不会忽略任何密钥.
As I understand from the documentation of kafka connect this configuration should ignore the keys for metricbeat and filebeat topic but not for alarms. But kafka connect does not ignore any key.
这就是我在休息时推送到kafka-connect的完全json配置
So that's the fully json config that i pushing to kafka-connect over rest
{
"auto.create.indices.at.start": false,
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://elasticsearch:9200",
"connection.timeout.ms": 5000,
"read.timeout.ms": 5000,
"tasks.max": "5",
"topics": "filebeat,metricbeat,alarms",
"behavior.on.null.values": "delete",
"behavior.on.malformed.documents": "warn",
"flush.timeout.ms":60000,
"max.retries":42,
"retry.backoff.ms": 100,
"max.in.flight.requests": 5,
"max.buffered.records":20000,
"batch.size":4096,
"drop.invalid.message": true,
"schema.ignore": true,
"topic.key.ignore": "metricbeat,filebeat",
"key.ignore": false
"name": "elasticsearch-ecs-connector",
"type.name": "_doc",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"transforms":"routeTS",
"transforms.routeTS.type":"org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.routeTS.topic.format":"${topic}-${timestamp}",
"transforms.routeTS.timestamp.format":"YYYY.MM.dd",
"errors.tolerance": "all" ,
"errors.log.enable": false ,
"errors.log.include.messages": false,
"errors.deadletterqueue.topic.name":"logstream-dlq",
"errors.deadletterqueue.context.headers.enable":true ,
"errors.deadletterqueue.topic.replication.factor": 1
}
这是连接器启动期间的日志记录
That's the logging during start of the connector
[2020-05-01 21:07:49,960] INFO ElasticsearchSinkConnectorConfig values:
auto.create.indices.at.start = false
batch.size = 4096
behavior.on.malformed.documents = warn
behavior.on.null.values = delete
compact.map.entries = true
connection.compression = false
connection.password = null
connection.timeout.ms = 5000
connection.url = [http://elasticsearch:9200]
connection.username = null
drop.invalid.message = true
elastic.https.ssl.cipher.suites = null
elastic.https.ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
elastic.https.ssl.endpoint.identification.algorithm = https
elastic.https.ssl.key.password = null
elastic.https.ssl.keymanager.algorithm = SunX509
elastic.https.ssl.keystore.location = null
elastic.https.ssl.keystore.password = null
elastic.https.ssl.keystore.type = JKS
elastic.https.ssl.protocol = TLS
elastic.https.ssl.provider = null
elastic.https.ssl.secure.random.implementation = null
elastic.https.ssl.trustmanager.algorithm = PKIX
elastic.https.ssl.truststore.location = null
elastic.https.ssl.truststore.password = null
elastic.https.ssl.truststore.type = JKS
elastic.security.protocol = PLAINTEXT
flush.timeout.ms = 60000
key.ignore = false
linger.ms = 1
max.buffered.records = 20000
max.in.flight.requests = 5
max.retries = 42
read.timeout.ms = 5000
retry.backoff.ms = 100
schema.ignore = true
topic.index.map = []
topic.key.ignore = [metricbeat, filebeat]
topic.schema.ignore = []
type.name = _doc
write.method = insert
Iam使用Confluent Platform 5.5.0
Iam using Confluent Platform 5.5.0
推荐答案
在这里让我们回顾一下,因为对您的问题和问题陈述进行了多次)
Let's recap here, because there have been several edits to your question and problem statement :)
- 您想通过一个连接器将多个主题流式传输到Elasticsearch
- 您想将消息密钥用于某些主题作为Elasticsearch文档ID,而对于其他主题则不使用,而要使用Kafka消息坐标(topic + partition + offset)
- 您正在尝试使用
key.ignore
和topic.key.ignore
设置
这是我在以下三个主题中的测试数据: test01
, test02
, test03
:
Here's my test data in three topics, test01
, test02
, test03
:
ksql> PRINT test01 from beginning;
Key format: KAFKA_STRING
Value format: AVRO or KAFKA_STRING
rowtime: 2020/05/12 11:08:32.441 Z, key: X, value: {"COL1": 1, "COL2": "FOO"}
rowtime: 2020/05/12 11:08:32.594 Z, key: Y, value: {"COL1": 2, "COL2": "BAR"}
ksql> PRINT test02 from beginning;
Key format: KAFKA_STRING
Value format: AVRO or KAFKA_STRING
rowtime: 2020/05/12 11:08:50.865 Z, key: X, value: {"COL1": 1, "COL2": "FOO"}
rowtime: 2020/05/12 11:08:50.936 Z, key: Y, value: {"COL1": 2, "COL2": "BAR"}
ksql> PRINT test03 from beginning;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: AVRO or KAFKA_STRING
rowtime: 2020/05/12 11:16:15.166 Z, key: <null>, value: {"COL1": 1, "COL2": "FOO"}
rowtime: 2020/05/12 11:16:46.404 Z, key: <null>, value: {"COL1": 2, "COL2": "BAR"}
使用此数据,我创建了一个连接器(我使用的是ksqlDB,但与直接使用REST API的方式相同):
With this data I create a connector (I'm using ksqlDB but it's the same as if you use the REST API directly):
CREATE SINK CONNECTOR SINK_ELASTIC_TEST WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = 'http://elasticsearch:9200',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'type.name' = '_doc',
'topics' = 'test02,test01,test03',
'key.ignore' = 'false',
'topic.key.ignore'= 'test02,test03',
'schema.ignore' = 'false'
);
在Elasticsearch中创建并填充结果索引.这是文档的索引和文档ID:
The resulting indices are created and populated in Elasticsearch. Here's the index and document ID of the documents:
➜ curl -s http://localhost:9200/test01/_search \
-H 'content-type: application/json' \
-d '{ "size": 5 }' |jq -c '.hits.hits[] | [._index, ._id]'
["test01","Y"]
["test01","X"]
➜ curl -s http://localhost:9200/test02/_search \
-H 'content-type: application/json' \
-d '{ "size": 5 }' |jq -c '.hits.hits[] | [._index, ._id]'
["test02","test02+0+0"]
["test02","test02+0+1"]
➜ curl -s http://localhost:9200/test03/_search \
-H 'content-type: application/json' \
-d '{ "size": 5 }' |jq -c '.hits.hits[] | [._index, ._id]'
["test03","test03+0+0"]
["test03","test03+0+1"]
因此 key.ignore
是默认设置,而 test01
有效,这意味着消息的密钥用于文档ID.
So key.ignore
is the default and for test01
in effect, which means that the key of the messages are used for the document ID.
主题 test02
和 test03
,这意味着消息的密钥是被忽略(即实际上为 key.ignore = true
),因此文档ID是消息的主题/分区/偏移量.
Topics test02
and test03
are listed for topic.key.ignore
which means that the key of the message is ignored (i.e. in effect key.ignore=true
), and thus the document ID is the topic/partition/offset of the message.
鉴于我已经证明了确实可以正常工作,因此我建议您从头开始重新测试以再次检查您的工作.
I would recommend, given that I've proven out above that this does work, that you start your test again from scratch to double-check your working.
这篇关于Kafka Connect topic.key.ignore无法正常工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!