Kafka Connect:没有为连接器创建任务 [英] Kafka Connect: No tasks created for a connector
问题描述
我们正在使用 Debezium (MongoDB) 和 Confluent S3 连接器以分布式模式运行 Kafka Connect(Confluent Platform 5.4,即 Kafka 2.4).通过 REST API 添加新连接器时,连接器创建为 RUNNING 状态,但不会为连接器创建任何任务.
We are running Kafka Connect (Confluent Platform 5.4, ie. Kafka 2.4) in a distributed mode using Debezium (MongoDB) and Confluent S3 connectors. When adding a new connector via the REST API the connector is created in RUNNING state, but no tasks are created for the connector.
暂停和恢复连接器无济于事.当我们停止所有工人然后再次启动它们时,任务被创建并且一切都按预期运行.
Pausing and resuming the connector does not help. When we stop all workers and then start them again, the tasks are created and everything runs as it should.
该问题不是由连接器插件引起的,因为我们看到 Debezium 和 S3 连接器的行为相同.同样在调试日志中,我可以看到 Debezium 正确地从 Connector.taskConfigs() 方法返回了一个任务配置.
The issue is not caused by the connector plugins, because we see the same behaviour for both Debezium and S3 connectors. Also in debug logs I can see that Debezium is correctly returning a task configuration from the Connector.taskConfigs() method.
有人可以告诉我该怎么做,因为我们可以在不重启工作人员的情况下添加连接器吗?谢谢.
Can somebody tell me what to do se we can add connectors without restarting the workers? Thanks.
配置详情
集群有 3 个节点,具有以下connect-distributed.properties:
The cluster has 3 nodes with the following connect-distributed.properties:
bootstrap.servers=kafka-broker-001:9092,kafka-broker-002:9092,kafka-broker-003:9092,kafka-broker-004:9092
group.id=tdp-QA-connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.topic=connect-offsets-qa
offset.storage.replication.factor=3
offset.storage.partitions=5
config.storage.topic=connect-configs-qa
config.storage.replication.factor=3
status.storage.topic=connect-status-qa
status.storage.replication.factor=3
status.storage.partitions=3
offset.flush.interval.ms=10000
rest.host.name=tdp-QA-kafka-connect-001
rest.port=10083
rest.advertised.host.name=tdp-QA-kafka-connect-001
rest.advertised.port=10083
plugin.path=/opt/kafka-connect/plugins,/usr/share/java/
security.protocol=SSL
ssl.truststore.location=/etc/kafka/ssl/kafka-connect.truststore.jks
ssl.truststore.password=<secret>
ssl.endpoint.identification.algorithm=
producer.security.protocol=SSL
producer.ssl.truststore.location=/etc/kafka/ssl/kafka-connect.truststore.jks
producer.ssl.truststore.password=<secret>
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/etc/kafka/ssl/kafka-connect.truststore.jks
consumer.ssl.truststore.password=<secret>
max.request.size=20000000
max.partition.fetch.bytes=20000000
连接器配置
Debezium 示例:
Debezium example:
{
"name": "qa-mongodb-comp-converter-task|1",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.hosts": "mongodb-qa-001:27017,mongodb-qa-002:27017,mongodb-qa-003:27017",
"mongodb.name": "qa-debezium-comp",
"mongodb.ssl.enabled": true,
"collection.whitelist": "converter[.]task",
"tombstones.on.delete": true
}
}
S3 示例:
{
"name": "qa-s3-sink-task|1",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"topics": "qa-debezium-comp.converter.task",
"topics.dir": "data/env/qa",
"s3.region": "eu-west-1",
"s3.bucket.name": "<bucket-name>",
"flush.size": "15000",
"rotate.interval.ms": "3600000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "custom.kafka.connect.s3.format.plaintext.PlaintextFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility": "NONE",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter.schemas.enable": false,
"transforms": "ExtractDocument",
"transforms.ExtractDocument.type":"custom.kafka.connect.transforms.ExtractDocument$Value"
}
}
连接器是使用 curl 创建的:curl -X POST -H "Content-Type: application/json" --data @<json_file>http://
The connectors are created using curl:
curl -X POST -H "Content-Type: application/json" --data @<json_file> http:/<connect_host>:10083/connectors
推荐答案
我遇到了同样的问题,所以我更改了连接器的名称并创建了一个新的连接器,它起作用了,但我不知道这个问题的根源因为我们在 kafka-connect 日志中没有信息.
I had the same problem, so i changed the name of the connector and create a new one, it worked but I don't know the source of this problem because we have no information in kafka-connect logs.
这篇关于Kafka Connect:没有为连接器创建任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!